Skip to content

Commit d29c4f1

Browse files
authored
fix: the watch command should be executed in advance out of the pipeline of the transaction (#325)
1 parent 26c0592 commit d29c4f1

File tree

7 files changed

+119
-39
lines changed

7 files changed

+119
-39
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ then it is guaranted to hash to the same slot (and thus always live on the same
189189

190190
So, whilst it's not possible in Redis cluster to perform a transction on the keys `foo` and `bar`,
191191
it _is_ possible to perform a transaction on the keys `{tag}foo` and `{tag}bar`.
192-
To perform such transactions on this gem, use `hashtag:
192+
To perform such transactions on this gem, use the hashtag:
193193

194194
```ruby
195195
cli.multi do |tx|

lib/redis_client/cluster.rb

+15-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require 'redis_client/cluster/router'
77
require 'redis_client/cluster/transaction'
88
require 'redis_client/cluster/pinning_node'
9+
require 'redis_client/cluster/optimistic_locking'
910

1011
class RedisClient
1112
class Cluster
@@ -91,9 +92,18 @@ def pipelined
9192
end
9293

9394
def multi(watch: nil)
94-
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, watch)
95-
yield transaction
96-
transaction.execute
95+
if watch.nil? || watch.empty?
96+
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
97+
yield transaction
98+
transaction.execute
99+
else
100+
locking = ::RedisClient::Cluster::OptimisticLocking.new(watch, @router)
101+
locking.watch do |c|
102+
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, c)
103+
yield transaction
104+
transaction.execute
105+
end
106+
end
97107
end
98108

99109
def pubsub
@@ -102,11 +112,11 @@ def pubsub
102112

103113
# TODO: This isn't an official public interface yet. Don't use in your production environment.
104114
# @see https://github.com/redis-rb/redis-cluster-client/issues/299
105-
def with(key: nil, hashtag: nil, write: true, _retry_count: 0, &_)
115+
def with(key: nil, hashtag: nil, write: true)
106116
key = process_with_arguments(key, hashtag)
107117
node_key = @router.find_node_key_by_key(key, primary: write)
108118
node = @router.find_node(node_key)
109-
yield ::RedisClient::Cluster::PinningNode.new(node)
119+
node.with { |c| yield ::RedisClient::Cluster::PinningNode.new(c) }
110120
end
111121

112122
def close

lib/redis_client/cluster/node_key.rb

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ def build_from_uri(uri)
3131
def build_from_host_port(host, port)
3232
"#{host}#{DELIMITER}#{port}"
3333
end
34+
35+
def build_from_client(client)
36+
"#{client.config.host}#{DELIMITER}#{client.config.port}"
37+
end
3438
end
3539
end
3640
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# frozen_string_literal: true
2+
3+
require 'redis_client'
4+
require 'redis_client/cluster/key_slot_converter'
5+
require 'redis_client/cluster/transaction'
6+
7+
class RedisClient
8+
class Cluster
9+
class OptimisticLocking
10+
def initialize(keys, router)
11+
@node = find_node!(keys, router)
12+
@keys = keys
13+
end
14+
15+
def watch
16+
@node.with do |c|
17+
c.call('WATCH', *@keys)
18+
reply = yield(c)
19+
c.call('UNWATCH')
20+
reply
21+
end
22+
end
23+
24+
private
25+
26+
def find_node!(keys, router)
27+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" unless safe?(keys)
28+
29+
node_key = router.find_primary_node_key(['WATCH', *keys])
30+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node" if node_key.nil?
31+
32+
router.find_node(node_key)
33+
end
34+
35+
def safe?(keys)
36+
return false if keys.empty?
37+
38+
slots = keys.map do |k|
39+
return false if k.nil? || k.empty?
40+
41+
::RedisClient::Cluster::KeySlotConverter.convert(k)
42+
end
43+
44+
slots.uniq.size == 1
45+
end
46+
end
47+
end
48+
end

lib/redis_client/cluster/transaction.rb

+16-33
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
require 'redis_client'
44
require 'redis_client/cluster/pipeline'
5-
require 'redis_client/cluster/key_slot_converter'
5+
require 'redis_client/cluster/node_key'
66

77
class RedisClient
88
class Cluster
99
class Transaction
1010
ConsistencyError = Class.new(::RedisClient::Error)
1111

12-
def initialize(router, command_builder, watch)
12+
def initialize(router, command_builder, node = nil)
1313
@router = router
1414
@command_builder = command_builder
15-
@watch = watch
1615
@retryable = true
1716
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
1817
@pending_commands = []
19-
@node = nil
18+
@node = node
19+
prepare_tx unless @node.nil?
2020
end
2121

2222
def call(*command, **kwargs, &block)
@@ -62,7 +62,6 @@ def execute
6262

6363
raise ArgumentError, 'empty transaction' if @pipeline._empty?
6464
raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil?
65-
raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch?
6665

6766
settle
6867
end
@@ -74,42 +73,25 @@ def defer(&block)
7473
nil
7574
end
7675

77-
def watch?
78-
!@watch.nil? && !@watch.empty?
79-
end
80-
81-
def safe_watch?
82-
return true unless watch?
83-
return false if @node.nil?
84-
85-
slots = @watch.map do |k|
86-
return false if k.nil? || k.empty?
87-
88-
::RedisClient::Cluster::KeySlotConverter.convert(k)
89-
end
90-
91-
return false if slots.uniq.size != 1
92-
93-
@router.find_primary_node_by_slot(slots.first) == @node
94-
end
95-
9676
def prepare(command)
9777
return true unless @node.nil?
9878

9979
node_key = @router.find_primary_node_key(command)
10080
return false if node_key.nil?
10181

10282
@node = @router.find_node(node_key)
103-
@pipeline.call('WATCH', *@watch) if watch?
83+
prepare_tx
84+
true
85+
end
86+
87+
def prepare_tx
10488
@pipeline.call('MULTI')
10589
@pending_commands.each(&:call)
10690
@pending_commands.clear
107-
true
10891
end
10992

11093
def settle
11194
@pipeline.call('EXEC')
112-
@pipeline.call('UNWATCH') if watch?
11395
send_transaction(@node, redirect: true)
11496
end
11597

@@ -133,11 +115,12 @@ def send_pipeline(client, redirect:)
133115
end
134116
end
135117

136-
offset = watch? ? 2 : 1
137-
coerce_results!(replies[-offset], offset)
118+
return if replies.last.nil?
119+
120+
coerce_results!(replies.last)
138121
end
139122

140-
def coerce_results!(results, offset)
123+
def coerce_results!(results, offset: 1)
141124
results.each_with_index do |result, index|
142125
if result.is_a?(::RedisClient::CommandError)
143126
result._set_command(@pipeline._commands[index + offset])
@@ -167,12 +150,12 @@ def handle_command_error!(commands, err)
167150
end
168151

169152
def ensure_the_same_node!(commands)
153+
expected_node_key = NodeKey.build_from_client(@node)
154+
170155
commands.each do |command|
171156
node_key = @router.find_primary_node_key(command)
172157
next if node_key.nil?
173-
174-
node = @router.find_node(node_key)
175-
next if @node == node
158+
next if node_key == expected_node_key
176159

177160
raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}"
178161
end

test/redis_client/cluster/test_node_key.rb

+14
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,20 @@ def test_build_from_host_port
5151
assert_equal(c[:want], got, "Case: #{idx}")
5252
end
5353
end
54+
55+
def test_build_from_client
56+
dummy_client = Struct.new(:config, keyword_init: true)
57+
dummy_config = Struct.new(:host, :port, keyword_init: true)
58+
dummy = dummy_client.new(config: dummy_config.new(host: '127.0.0.1', port: '6379'))
59+
60+
[
61+
{ client: dummy, want: '127.0.0.1:6379' },
62+
{ client: ::RedisClient.new(host: '127.0.0.1', port: '6379'), want: '127.0.0.1:6379' }
63+
].each_with_index do |c, idx|
64+
got = ::RedisClient::Cluster::NodeKey.build_from_client(c[:client])
65+
assert_equal(c[:want], got, "Case: #{idx}")
66+
end
67+
end
5468
end
5569
end
5670
end

test/redis_client/test_cluster.rb

+21
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,27 @@ def test_transaction_with_block
341341
assert_equal(%w[a11 b22 c33], got)
342342
end
343343

344+
def test_transaction_in_race_condition
345+
@client.call('MSET', '{key}1', '1', '{key}2', '2')
346+
347+
another = Fiber.new do
348+
cli = new_test_client
349+
cli.call('MSET', '{key}1', '3', '{key}2', '4')
350+
cli.close
351+
Fiber.yield
352+
end
353+
354+
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
355+
another.resume
356+
v1 = @client.call('GET', '{key}1')
357+
v2 = @client.call('GET', '{key}1')
358+
tx.call('SET', '{key}1', v2)
359+
tx.call('SET', '{key}2', v1)
360+
end
361+
362+
assert_equal(%w[3 4], @client.call('MGET', '{key}1', '{key}2'))
363+
end
364+
344365
def test_pubsub_without_subscription
345366
pubsub = @client.pubsub
346367
assert_nil(pubsub.next_event(0.01))

0 commit comments

Comments
 (0)