Skip to content

Commit a864a54

Browse files
author
Michael Klishin
committed
Use condition variable to be notified of RPC responses
Earlier version suffered from a race condition under load that caused Bunny continuation to never unblock.
1 parent c2c2e9f commit a864a54

File tree

1 file changed

+19
-11
lines changed

1 file changed

+19
-11
lines changed

ruby/rpc_client.rb

+19-11
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,48 @@
22
# encoding: utf-8
33

44
require "bunny"
5+
require "thread"
56

67
conn = Bunny.new(:automatically_recover => false)
78
conn.start
89

910
ch = conn.create_channel
1011

12+
1113
class FibonacciClient
1214
attr_reader :reply_queue
15+
attr_accessor :response, :call_id
16+
attr_reader :lock, :condition
1317

1418
def initialize(ch, server_queue)
1519
@ch = ch
1620
@x = ch.default_exchange
1721

1822
@server_queue = server_queue
1923
@reply_queue = ch.queue("", :exclusive => true)
24+
25+
26+
@lock = Mutex.new
27+
@condition = ConditionVariable.new
28+
that = self
29+
30+
@reply_queue.subscribe do |delivery_info, properties, payload|
31+
if properties[:correlation_id] == that.call_id
32+
that.response = payload.to_i
33+
that.lock.synchronize{that.condition.signal}
34+
end
35+
end
2036
end
2137

2238
def call(n)
23-
correlation_id = self.generate_uuid
39+
self.call_id = self.generate_uuid
2440

2541
@x.publish(n.to_s,
2642
:routing_key => @server_queue,
27-
:correlation_id => correlation_id,
43+
:correlation_id => call_id,
2844
:reply_to => @reply_queue.name)
2945

30-
response = nil
31-
@reply_queue.subscribe(:block => true) do |delivery_info, properties, payload|
32-
if properties[:correlation_id] == correlation_id
33-
response = payload.to_i
34-
35-
delivery_info.consumer.cancel
36-
end
37-
end
38-
46+
lock.synchronize{condition.wait(lock)}
3947
response
4048
end
4149

0 commit comments

Comments
 (0)