Skip to content

Commit 8cb791f

Browse files
authored
fix: ensure recoverability for pubsub (#383)
1 parent b0dc2e3 commit 8cb791f

File tree

4 files changed

+108
-17
lines changed

4 files changed

+108
-17
lines changed

bin/pubsub

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
require 'bundler/setup'
5+
require 'redis_cluster_client'
6+
7+
module PubSubDebug
8+
module_function
9+
10+
def spawn_publisher(cli, chan)
11+
Thread.new(cli, chan) do |r, c|
12+
role = ' Publisher'
13+
i = 0
14+
15+
loop do
16+
handle_errors(role) do
17+
msg = format('%05d', i)
18+
r.call('spublish', c, msg)
19+
log "#{role}: sent: #{msg}"
20+
i += 1
21+
end
22+
ensure
23+
sleep 1.0
24+
end
25+
rescue StandardError => e
26+
log "#{role}: dead: #{e.class}: #{e.message}"
27+
raise
28+
end
29+
end
30+
31+
def spawn_subscriber(cli, chan) # rubocop:disable Metrics/AbcSize
32+
Thread.new(cli, chan) do |r, c|
33+
role = 'Subscriber'
34+
ps = nil
35+
36+
loop do
37+
ps = r.pubsub
38+
ps.call('ssubscribe', c)
39+
log "#{role}: done: subscription started to #{c}"
40+
break
41+
rescue StandardError => e
42+
log "#{role}: init: #{e.class}: #{e.message}"
43+
ps&.close
44+
ensure
45+
sleep 1.0
46+
end
47+
48+
loop do
49+
handle_errors('Subscriber') do
50+
e = ps.next_event(0.01)
51+
log "#{role}: recv: #{e.nil? ? 'nil' : e}"
52+
end
53+
ensure
54+
sleep 1.0
55+
end
56+
rescue StandardError => e
57+
log "#{role}: dead: #{e.class}: #{e.message}"
58+
raise
59+
end
60+
end
61+
62+
def handle_errors(role)
63+
yield
64+
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
65+
log "#{role}: recv: #{e.class}"
66+
rescue RedisClient::CommandError => e
67+
log "#{role}: recv: #{e.class}: #{e.message}"
68+
raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served')
69+
rescue StandardError => e
70+
log "#{role}: recv: #{e.class}: #{e.message}"
71+
raise
72+
end
73+
74+
def log(msg)
75+
print "#{msg}\n"
76+
end
77+
end
78+
79+
clients = Array.new(2) { RedisClient.cluster(connect_with_original_config: true).new_client }
80+
threads = []
81+
channel = 'chan1'
82+
83+
Signal.trap(:INT) do
84+
threads.each(&:exit)
85+
clients.each(&:close)
86+
PubSubDebug.log("\nBye bye")
87+
exit 0
88+
end
89+
90+
threads << PubSubDebug.spawn_subscriber(clients[0], channel)
91+
threads << PubSubDebug.spawn_publisher(clients[1], channel)
92+
threads.each(&:join)

lib/redis_client/cluster/pub_sub.rb

+11-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'redis_client'
4+
require 'redis_client/cluster/errors'
45
require 'redis_client/cluster/normalized_cmd_name'
56

67
class RedisClient
@@ -91,10 +92,8 @@ def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/Cycloma
9192
when ::RedisClient::CommandError
9293
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
9394

94-
@router.renew_cluster_state
9595
break start_over
9696
when ::RedisClient::ConnectionError
97-
@router.renew_cluster_state
9897
break start_over
9998
when StandardError then raise event
10099
when Array then break event
@@ -151,25 +150,23 @@ def calc_max_duration(timeout)
151150
def handle_connection_error(node_key, ignore: false)
152151
yield
153152
rescue ::RedisClient::ConnectionError
154-
@state_dict[node_key].close
153+
@state_dict[node_key]&.close
155154
@state_dict.delete(node_key)
156155
@router.renew_cluster_state
157156
raise unless ignore
158157
end
159158

160159
def start_over
161-
@state_dict.each_value(&:close)
162-
@state_dict.clear
163-
@commands.each do |command|
164-
loop do
165-
_call(command)
166-
break
167-
rescue ::RedisClient::ConnectionError
168-
sleep 1.0
169-
end
160+
loop do
161+
@router.renew_cluster_state
162+
@state_dict.each_value(&:close)
163+
@state_dict.clear
164+
@queue.clear
165+
@commands.each { |command| _call(command) }
166+
break
167+
rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::NodeMightBeDown
168+
sleep 1.0
170169
end
171-
172-
nil
173170
end
174171
end
175172
end

lib/redis_client/cluster/router.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
2929
@pool = pool
3030
@client_kwargs = kwargs
3131
@node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs)
32-
renew_cluster_state
32+
@node.reload!
3333
@command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout)
3434
@command_builder = @config.command_builder
3535
end
@@ -241,6 +241,8 @@ def node_keys
241241

242242
def renew_cluster_state
243243
@node.reload!
244+
rescue ::RedisClient::Cluster::InitialSetupError
245+
# ignore
244246
end
245247

246248
def close

test/cluster_controller.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ def flush_all_data(clients)
270270
print_debug("#{c.config.host}:#{c.config.port} ... FLUSHALL")
271271
rescue ::RedisClient::CommandError, ::RedisClient::ReadOnlyError
272272
# READONLY You can't write against a read only replica.
273-
rescue ::RedisClient::CannotConnectError => e
273+
rescue ::RedisClient::ConnectionError => e
274274
print_debug("#{c.config.host}:#{c.config.port} ... FLUSHALL: #{e.class}: #{e.message}")
275275
end
276276
end
@@ -279,7 +279,7 @@ def reset_cluster(clients)
279279
clients.each do |c|
280280
c.call('CLUSTER', 'RESET', 'HARD')
281281
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER RESET HARD")
282-
rescue ::RedisClient::CannotConnectError => e
282+
rescue ::RedisClient::ConnectionError => e
283283
print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER RESET HARD: #{e.class}: #{e.message}")
284284
end
285285
end

0 commit comments

Comments
 (0)