Skip to content

Commit a1fb1ee

Browse files
authored
push bulk with dynamic queues (sidekiq#6552)
Allow payloads with mixed queues.
1 parent 7239276 commit a1fb1ee

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

lib/sidekiq/client.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,16 @@ def atomic_push(conn, payloads)
269269
[at, Sidekiq.dump_json(hash)]
270270
})
271271
else
272-
queue = payloads.first["queue"]
273272
now = Time.now.to_f
274-
to_push = payloads.map { |entry|
275-
entry["enqueued_at"] = now
276-
Sidekiq.dump_json(entry)
277-
}
278-
conn.sadd("queues", [queue])
279-
conn.lpush("queue:#{queue}", to_push)
273+
grouped_queues = payloads.group_by { |job| job["queue"] }
274+
conn.sadd("queues", grouped_queues.keys)
275+
grouped_queues.each do |queue, grouped_payloads|
276+
to_push = grouped_payloads.map { |entry|
277+
entry["enqueued_at"] = now
278+
Sidekiq.dump_json(entry)
279+
}
280+
conn.lpush("queue:#{queue}", to_push)
281+
end
280282
end
281283
end
282284
end

test/client_test.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ def call(worker_class, job, queue, redis)
5858
end
5959
end
6060

61+
class MiddlewareDynamicQueue
62+
def call(worker_class, job, queue, redis)
63+
job["queue"] = job["args"].first.odd? ? "odd_queue" : "even_queue"
64+
yield
65+
end
66+
end
67+
6168
class DJob < BaseJob
6269
end
6370

@@ -479,6 +486,18 @@ def call(worker_klass, msg, q, r)
479486
refute result[0]
480487
assert_match(/[0-9a-f]{12}/, result[1])
481488
end
489+
490+
it "push bulk sends worker in correct queue" do
491+
@client.middleware do |chain|
492+
chain.add MiddlewareDynamicQueue
493+
end
494+
495+
@client.push_bulk("class" => MyJob, "args" => 3.times.map { [_1] })
496+
even_queue = Sidekiq::Queue.new("even_queue")
497+
odd_queue = Sidekiq::Queue.new("odd_queue")
498+
assert_equal 2, even_queue.size
499+
assert_equal 1, odd_queue.size
500+
end
482501
end
483502

484503
describe "inheritance" do

0 commit comments

Comments
 (0)