Skip to content

Commit ce2b00a

Browse files
authored
fix: Pub/Sub command bugs (#284)
1 parent d7b508a commit ce2b00a

File tree

3 files changed

+51
-2
lines changed

3 files changed

+51
-2
lines changed

lib/redis_client/cluster/pub_sub.rb

+23
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ def initialize(router, command_builder)
5252

5353
def call(*args, **kwargs)
5454
_call(@command_builder.generate(args, kwargs))
55+
nil
5556
end
5657

5758
def call_v(command)
5859
_call(@command_builder.generate(command))
60+
nil
5961
end
6062

6163
def close
@@ -86,10 +88,31 @@ def next_event(timeout = nil)
8688
private
8789

8890
def _call(command)
91+
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
92+
when 'subscribe', 'psubscribe', 'ssubscribe' then call_to_single_state(command)
93+
when 'unsubscribe', 'punsubscribe' then call_to_all_states(command)
94+
when 'sunsubscribe' then call_for_sharded_states(command)
95+
else call_to_single_state(command)
96+
end
97+
end
98+
99+
def call_to_single_state(command)
89100
node_key = @router.find_node_key(command)
90101
try_call(node_key, command)
91102
end
92103

104+
def call_to_all_states(command)
105+
@state_dict.each_value { |s| s.call_v(command) }
106+
end
107+
108+
def call_for_sharded_states(command)
109+
if command.size == 1
110+
call_to_all_states(command)
111+
else
112+
call_to_single_state(command)
113+
end
114+
end
115+
93116
def try_call(node_key, command, retry_count: 1)
94117
add_state(node_key).call(command)
95118
rescue ::RedisClient::CommandError => e

lib/redis_client/cluster/router.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,18 @@ def send_script_command(method, command, args, &block) # rubocop:disable Metrics
288288

289289
def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
290290
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
291-
when 'channels' then @node.call_all(method, command, args).flatten.uniq.sort_by(&:to_s).then(&TSF.call(block))
291+
when 'channels'
292+
@node.call_all(method, command, args).flatten.uniq.sort_by(&:to_s).then(&TSF.call(block))
293+
when 'shardchannels'
294+
@node.call_replicas(method, command, args).flatten.uniq.sort_by(&:to_s).then(&TSF.call(block))
295+
when 'numpat'
296+
@node.call_all(method, command, args).select { |e| e.is_a?(Integer) }.sum.then(&TSF.call(block))
292297
when 'numsub'
293298
@node.call_all(method, command, args).reject(&:empty?).map { |e| Hash[*e] }
294299
.reduce({}) { |a, e| a.merge(e) { |_, v1, v2| v1 + v2 } }.then(&TSF.call(block))
295-
when 'numpat' then @node.call_all(method, command, args).select { |e| e.is_a?(Integer) }.sum.then(&TSF.call(block))
300+
when 'shardnumsub'
301+
@node.call_replicas(method, command, args).reject(&:empty?).map { |e| Hash[*e] }
302+
.reduce({}) { |a, e| a.merge(e) { |_, v1, v2| v1 + v2 } }.then(&TSF.call(block))
296303
else assign_node(command).public_send(method, *args, command, &block)
297304
end
298305
end

test/redis_client/test_cluster.rb

+19
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ def test_global_pubsub
273273
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
274274
Fiber.yield(channel)
275275
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
276+
pubsub.call('UNSUBSCRIBE')
276277
pubsub.close
277278
end
278279

@@ -289,6 +290,7 @@ def test_global_pubsub_without_timeout
289290
assert_equal(want, got)
290291
Fiber.yield('my-global-published-channel')
291292
Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first)
293+
pubsub.call('UNSUBSCRIBE')
292294
pubsub.close
293295
end
294296

@@ -304,6 +306,7 @@ def test_global_pubsub_with_multiple_channels
304306
10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) }
305307
Fiber.yield
306308
Fiber.yield(collect_messages(pubsub, size: 10))
309+
pubsub.call('UNSUBSCRIBE')
307310
pubsub.close
308311
end
309312

@@ -325,6 +328,7 @@ def test_sharded_pubsub
325328
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
326329
Fiber.yield(channel)
327330
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
331+
pubsub.call('SUNSUBSCRIBE')
328332
pubsub.close
329333
end
330334

@@ -347,6 +351,7 @@ def test_sharded_pubsub_without_timeout
347351
assert_equal(want, got)
348352
Fiber.yield('my-sharded-published-channel')
349353
Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first)
354+
pubsub.call('SUNSUBSCRIBE')
350355
pubsub.close
351356
end
352357

@@ -367,6 +372,7 @@ def test_sharded_pubsub_with_multiple_channels
367372
10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) }
368373
Fiber.yield
369374
Fiber.yield(collect_messages(pubsub, size: 10))
375+
pubsub.call('SUNSUBSCRIBE')
370376
pubsub.close
371377
end
372378

@@ -376,6 +382,19 @@ def test_sharded_pubsub_with_multiple_channels
376382
10.times { |i| assert_equal(['smessage', "s-chan#{i}", i.to_s], got[i]) }
377383
end
378384

385+
def test_other_pubsub_commands
386+
assert_instance_of(Array, @client.call('pubsub', 'channels'))
387+
assert_instance_of(Integer, @client.call('pubsub', 'numpat'))
388+
assert_instance_of(Hash, @client.call('pubsub', 'numsub'))
389+
assert_instance_of(Array, @client.call('pubsub', 'shardchannels')) if TEST_REDIS_MAJOR_VERSION >= 7
390+
assert_instance_of(Hash, @client.call('pubsub', 'shardnumsub')) if TEST_REDIS_MAJOR_VERSION >= 7
391+
ps = @client.pubsub
392+
assert_nil(ps.call('unsubscribe'))
393+
assert_nil(ps.call('punsubscribe'))
394+
assert_nil(ps.call('sunsubscribe')) if TEST_REDIS_MAJOR_VERSION >= 7
395+
ps.close
396+
end
397+
379398
def test_dedicated_commands # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
380399
10.times { |i| @client.call('SET', "key#{i}", i) }
381400
wait_for_replication

0 commit comments

Comments
 (0)