Skip to content

Commit 61c3703

Browse files
committed
Use a queue to wait for Puback packets rather than polling
This commit changes the `publish` method to use a queue to wait for Puback packets rather than polling a hash. Every time the read loop gets data or a timeout from `IO.select`, it will send a message to everyone waiting for a `Puback` packet. If the we're within the deadline, then the loop executes again, if we got a packet, we'll return the packet, and if we're outside the deadline, a `-1` is returned. This upside is that this patch speeds up the publish method by over 100x. Here is the benchmark: ```ruby require "securerandom" require "mqtt" require "benchmark/ips" require "stackprof" client = MQTT::Client.new(username: 'testuser', password: 'testpasswd', client_id: "client_#{SecureRandom.hex(10)}", host: '127.0.0.1') client.connect Benchmark.ips do |x| x.report("send message") { i = rand(1..10) topic = "to/timebox#{i}/cameras" msg = "message #{Time.now} for timebox#{i}" client.publish(topic, msg, true, 1) } end ``` Before this patch: ``` $ ruby -I lib thing.rb Warming up -------------------------------------- send message 8.000 i/100ms Calculating ------------------------------------- send message 85.042 (± 4.7%) i/s - 432.000 in 5.089261s ``` After this patch: ``` $ ruby -I lib thing.rb Warming up -------------------------------------- send message 915.000 i/100ms Calculating ------------------------------------- send message 9.453k (± 4.5%) i/s - 47.580k in 5.043716s ``` The downside is that the timeout isn't exact. Since `IO.select` times out every `0.5` seconds (according to the `SELECT_TIMEOUT` constant), the deadline in the `publish` method could be missed by that amount of time. Refs njh#115
1 parent 878639e commit 61c3703

File tree

2 files changed

+61
-9
lines changed

2 files changed

+61
-9
lines changed

lib/mqtt/client.rb

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ def disconnect(send_msg = true)
298298
send_packet(packet)
299299
end
300300
@socket.close unless @socket.nil?
301+
handle_close
301302
@socket = nil
302303
end
303304

@@ -324,18 +325,30 @@ def publish(topic, payload = '', retain = false, qos = 0)
324325

325326
return if qos.zero?
326327

327-
Timeout.timeout(@ack_timeout) do
328-
while connected?
328+
queue = Queue.new
329+
330+
wait_for_puback packet.id, queue
331+
332+
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @ack_timeout
333+
334+
loop do
335+
response = queue.pop
336+
case response
337+
when :read_timeout
338+
if Process.clock_gettime(Process::CLOCK_MONOTONIC) > deadline
339+
return -1
340+
end
341+
when :close
342+
return -1
343+
else
329344
@pubacks_semaphore.synchronize do
330-
return res if @pubacks.delete(packet.id)
345+
@pubacks.delete packet.id
331346
end
332-
# FIXME: make threads communicate with each other, instead of polling
333-
# (using a pipe and select ?)
334-
sleep 0.01
347+
break
335348
end
336349
end
337350

338-
-1
351+
res
339352
end
340353

341354
# Send a subscribe message for one or more topics on the MQTT server.
@@ -459,10 +472,17 @@ def receive_packet
459472
unless @socket.nil?
460473
@socket.close
461474
@socket = nil
475+
handle_close
462476
end
463477
Thread.current[:parent].raise(exp)
464478
end
465479

480+
def wait_for_puback(id, queue)
481+
@pubacks_semaphore.synchronize do
482+
@pubacks[id] = queue
483+
end
484+
end
485+
466486
def handle_packet(packet)
467487
if packet.class == MQTT::Packet::Publish
468488
# Add to queue
@@ -471,13 +491,25 @@ def handle_packet(packet)
471491
@last_ping_response = Time.now
472492
elsif packet.class == MQTT::Packet::Puback
473493
@pubacks_semaphore.synchronize do
474-
@pubacks[packet.id] = packet
494+
@pubacks[packet.id] << packet
475495
end
476496
end
477497
# Ignore all other packets
478498
# FIXME: implement responses for QoS 2
479499
end
480500

501+
def handle_timeouts
502+
@pubacks_semaphore.synchronize do
503+
@pubacks.each_value { |q| q << :read_timeout }
504+
end
505+
end
506+
507+
def handle_close
508+
@pubacks_semaphore.synchronize do
509+
@pubacks.each_value { |q| q << :close }
510+
end
511+
end
512+
481513
def keep_alive!
482514
return unless @keep_alive > 0 && connected?
483515

spec/mqtt_client_spec.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,26 @@
571571
end
572572

573573
describe "when calling the 'publish' method" do
574+
class ClientWithPubackInjection < MQTT::Client
575+
def initialize
576+
super(:host => 'localhost')
577+
@injected_pubacks = {}
578+
end
579+
580+
def inject_puback(packet)
581+
@injected_pubacks[packet.id] = packet
582+
end
583+
584+
def wait_for_puback(id, queue)
585+
packet = @injected_pubacks.fetch(id) {
586+
return super
587+
}
588+
queue << packet
589+
end
590+
end
591+
592+
let(:client) { ClientWithPubackInjection.new }
593+
574594
before(:each) do
575595
client.instance_variable_set('@socket', socket)
576596
end
@@ -973,7 +993,7 @@ def inject_packet(opts={})
973993

974994
def inject_puback(packet_id)
975995
packet = MQTT::Packet::Puback.new(:id => packet_id)
976-
client.instance_variable_get('@pubacks')[packet_id] = packet
996+
client.inject_puback packet
977997
end
978998

979999
end

0 commit comments

Comments
 (0)