Skip to content

Commit a28441a

Browse files
committed
Implement async for simple AMQP methods
Related to: * #1345 * #1308 * #970 * #843 Implement QueueDeleteAsync, ExchangeDeclareAsync and ExchangeDeleteAsync. Refactoring to come. Fix public API Move rpc continuations to their own files Add continuation timeouts to new AsyncRpcContinuations classes. Add ExchangeBindAsync to interface Fixup compilation after rebase Add QueueDeleteOk to API Add ExchangeBindAsync Add `[Collection("IntegrationFixture")]` to test class that needs it, move test to an integration fixture class dotnet format fix Add exchange-exchange binding to test Add QueueBindAsync and add to tests. Add ExchangeUnbindAsync method Add BasicRejectAsync Add BasicCancelAsync Add BasicConsumeAsync Added BasicAckAsync and BasicQosAsync
1 parent a0321c6 commit a28441a

17 files changed

+1200
-242
lines changed

projects/OAuth2Test/TestOAuth2.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private async Task Publish(IChannel publisher)
148148
private async ValueTask<IChannel> declareConsumer()
149149
{
150150
IChannel subscriber = _connection.CreateChannel();
151-
await subscriber.QueueDeclareAsync("testqueue", true, false, false, arguments: null);
151+
await subscriber.QueueDeclareAsync(queue: "testqueue", passive: false, true, false, false, arguments: null);
152152
subscriber.QueueBind("testqueue", Exchange, "hello");
153153
return subscriber;
154154
}

projects/RabbitMQ.Client/client/api/IChannel.cs

+123-19
Original file line numberDiff line numberDiff line change
@@ -151,37 +151,61 @@ public interface IChannel : IDisposable
151151
/// </remarks>
152152
event EventHandler<ShutdownEventArgs> ChannelShutdown;
153153

154-
/// <summary>
155-
/// Acknowledge one or more delivered message(s).
156-
/// </summary>
154+
/// <summary>Acknknowledges one or more messages.</summary>
155+
/// <param name="deliveryTag">The delivery tag.</param>
156+
/// <param name="multiple">Ack all messages up to the delivery tag if set to <c>true</c>.</param>
157157
void BasicAck(ulong deliveryTag, bool multiple);
158158

159-
/// <summary>
160-
/// Delete a Basic content-class consumer.
161-
/// </summary>
159+
/// <summary>Asynchronously acknknowledges one or more messages.</summary>
160+
/// <param name="deliveryTag">The delivery tag.</param>
161+
/// <param name="multiple">Ack all messages up to the delivery tag if set to <c>true</c>.</param>
162+
ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);
163+
164+
/// <summary>Cancel a Basic content-class consumer.</summary>
165+
/// <param name="consumerTag">The consumer tag.</param>
162166
void BasicCancel(string consumerTag);
163167

168+
/// <summary>Asynchronously cancel a Basic content-class consumer.</summary>
169+
/// <param name="consumerTag">The consumer tag.</param>
170+
ValueTask BasicCancelAsync(string consumerTag);
171+
164172
/// <summary>
165173
/// Same as BasicCancel but sets nowait to true and returns void (as there
166174
/// will be no response from the server).
167175
/// </summary>
176+
/// <param name="consumerTag">The consumer tag.</param>
168177
void BasicCancelNoWait(string consumerTag);
169178

170179
/// <summary>Start a Basic content-class consumer.</summary>
171-
string BasicConsume(
172-
string queue,
173-
bool autoAck,
174-
string consumerTag,
175-
bool noLocal,
176-
bool exclusive,
177-
IDictionary<string, object> arguments,
178-
IBasicConsumer consumer);
180+
/// <param name="queue">The queue.</param>
181+
/// <param name="autoAck">If set to <c>true</c>, automatically ack messages.</param>
182+
/// <param name="consumerTag">The consumer tag.</param>
183+
/// <param name="noLocal">If set to <c>true</c>, this consumer will not receive messages published by the same connection.</param>
184+
/// <param name="exclusive">If set to <c>true</c>, the consumer is exclusive.</param>
185+
/// <param name="arguments">Consumer arguments.</param>
186+
/// <param name="consumer">The consumer, an instance of <see cref="IBasicConsumer"/></param>
187+
/// <returns></returns>
188+
string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);
189+
190+
/// <summary>Asynchronously start a Basic content-class consumer.</summary>
191+
/// <param name="queue">The queue.</param>
192+
/// <param name="autoAck">If set to <c>true</c>, automatically ack messages.</param>
193+
/// <param name="consumerTag">The consumer tag.</param>
194+
/// <param name="noLocal">If set to <c>true</c>, this consumer will not receive messages published by the same connection.</param>
195+
/// <param name="exclusive">If set to <c>true</c>, the consumer is exclusive.</param>
196+
/// <param name="arguments">Consumer arguments.</param>
197+
/// <param name="consumer">The consumer, an instance of <see cref="IBasicConsumer"/></param>
198+
/// <returns></returns>
199+
ValueTask<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);
179200

180201
/// <summary>
181202
/// Retrieve an individual message, if
182203
/// one is available; returns null if the server answers that
183-
/// no messages are currently available. See also <see cref="IChannel.BasicAck"/>.
204+
/// no messages are currently available. See also <see cref="IChannel.BasicAck" />.
184205
/// </summary>
206+
/// <param name="queue">The queue.</param>
207+
/// <param name="autoAck">If set to <c>true</c>, automatically ack the message.</param>
208+
/// <returns><see cref="BasicGetResult"/></returns>
185209
BasicGetResult BasicGet(string queue, bool autoAck);
186210

187211
/// <summary>Reject one or more delivered message(s).</summary>
@@ -238,23 +262,41 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
238262
/// <summary>
239263
/// Configures QoS parameters of the Basic content-class.
240264
/// </summary>
265+
/// <param name="prefetchSize">Size of the prefetch in bytes.</param>
266+
/// <param name="prefetchCount">The prefetch count.</param>
267+
/// <param name="global">If set to <c>true</c>, use global prefetch.
268+
/// See the <seealso href="https://www.rabbitmq.com/consumer-prefetch.html#overview">Consumer Prefetch documentation</seealso>.</param>
241269
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
242270

271+
/// <summary>
272+
/// Configures QoS parameters of the Basic content-class.
273+
/// </summary>
274+
/// <param name="prefetchSize">Size of the prefetch in bytes.</param>
275+
/// <param name="prefetchCount">The prefetch count.</param>
276+
/// <param name="global">If set to <c>true</c>, use global prefetch.
277+
/// See the <seealso href="https://www.rabbitmq.com/consumer-prefetch.html#overview">Consumer Prefetch documentation</seealso>.</param>
278+
ValueTask BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global);
279+
243280
/// <summary>
244281
/// Indicates that a consumer has recovered.
245282
/// Deprecated. Should not be used.
246283
/// </summary>
284+
[Obsolete]
247285
void BasicRecover(bool requeue);
248286

249287
/// <summary>
250288
/// Indicates that a consumer has recovered.
251289
/// Deprecated. Should not be used.
252290
/// </summary>
291+
[Obsolete]
253292
void BasicRecoverAsync(bool requeue);
254293

255294
/// <summary> Reject a delivered message.</summary>
256295
void BasicReject(ulong deliveryTag, bool requeue);
257296

297+
/// <summary> Reject a delivered message.</summary>
298+
ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue);
299+
258300
/// <summary>Close this session.</summary>
259301
/// <param name="replyCode">The reply code to send for closing (See under "Reply Codes" in the AMQP specification).</param>
260302
/// <param name="replyText">The reply text to send for closing.</param>
@@ -276,6 +318,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
276318
/// </remarks>
277319
void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
278320

321+
/// <summary>
322+
/// Asynchronously binds an exchange to an exchange.
323+
/// </summary>
324+
/// <remarks>
325+
/// <para>
326+
/// Routing key must be shorter than 255 bytes.
327+
/// </para>
328+
/// </remarks>
329+
ValueTask ExchangeBindAsync(string destination, string source, string routingKey, IDictionary<string, object> arguments);
330+
279331
/// <summary>
280332
/// Like ExchangeBind but sets nowait to true.
281333
/// </summary>
@@ -289,10 +341,17 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
289341
/// <summary>Declare an exchange.</summary>
290342
/// <remarks>
291343
/// The exchange is declared non-passive and non-internal.
292-
/// The "nowait" option is not exercised.
344+
/// The "nowait" option is not used.
293345
/// </remarks>
294346
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
295347

348+
/// <summary>Asynchronously declare an exchange.</summary>
349+
/// <remarks>
350+
/// The exchange is declared non-internal.
351+
/// The "nowait" option is not used.
352+
/// </remarks>
353+
ValueTask ExchangeDeclareAsync(string exchange, string type, bool passive, bool durable, bool autoDelete, IDictionary<string, object> arguments);
354+
296355
/// <summary>
297356
/// Same as ExchangeDeclare but sets nowait to true and returns void (as there
298357
/// will be no response from the server).
@@ -315,6 +374,19 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
315374
/// </summary>
316375
void ExchangeDelete(string exchange, bool ifUnused);
317376

377+
/*
378+
* TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
379+
/// <summary>
380+
/// Asynchronously delete an exchange.
381+
/// </summary>
382+
ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused);
383+
*/
384+
385+
/// <summary>
386+
/// Asynchronously delete an exchange.
387+
/// </summary>
388+
ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused);
389+
318390
/// <summary>
319391
/// Like ExchangeDelete but sets nowait to true.
320392
/// </summary>
@@ -328,6 +400,14 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
328400
/// </remarks>
329401
void ExchangeUnbind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
330402

403+
/// <summary>
404+
/// Asynchronously unbind an exchange from an exchange.
405+
/// </summary>
406+
/// <remarks>
407+
/// Routing key must be shorter than 255 bytes.
408+
/// </remarks>
409+
ValueTask ExchangeUnbindAsync(string destination, string source, string routingKey, IDictionary<string, object> arguments);
410+
331411
/// <summary>
332412
/// Like ExchangeUnbind but sets nowait to true.
333413
/// </summary>
@@ -348,6 +428,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
348428
/// </remarks>
349429
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
350430

431+
/// <summary>
432+
/// Asynchronously bind a queue to an exchange.
433+
/// </summary>
434+
/// <remarks>
435+
/// <para>
436+
/// Routing key must be shorter than 255 bytes.
437+
/// </para>
438+
/// </remarks>
439+
ValueTask QueueBindAsync(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
440+
351441
/// <summary>Same as QueueBind but sets nowait parameter to true.</summary>
352442
/// <remarks>
353443
/// <para>
@@ -370,11 +460,12 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
370460
/// Asynchronously declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
371461
/// </summary>
372462
/// <param name="queue">The name of the queue. Pass an empty string to make the server generate a name.</param>
463+
/// <param name="passive">Set to <code>true</code> to passively declare the queue (i.e. check for its existence)</param>
373464
/// <param name="durable">Should this queue will survive a broker restart?</param>
374465
/// <param name="exclusive">Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.</param>
375466
/// <param name="autoDelete">Should this queue be auto-deleted when its last consumer (if any) unsubscribes?</param>
376467
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
377-
ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
468+
ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
378469

379470
/// <summary>
380471
/// Declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
@@ -411,17 +502,30 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
411502
uint ConsumerCount(string queue);
412503

413504
/// <summary>
414-
/// Delete a queue.
505+
/// Deletes a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
506+
/// </summary>
507+
/// <param name="queue">The name of the queue.</param>
508+
/// <param name="ifUnused">Only delete the queue if it is unused.</param>
509+
/// <param name="ifEmpty">Only delete the queue if it is empty.</param>
510+
/// <returns>Returns the number of messages purged during deletion.</returns>
511+
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
512+
513+
/// <summary>
514+
/// Asynchronously deletes a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
415515
/// </summary>
416516
/// <remarks>
417517
///Returns the number of messages purged during queue deletion.
418518
/// </remarks>
419-
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
519+
ValueTask<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty);
420520

421521
/// <summary>
422522
///Same as QueueDelete but sets nowait parameter to true
423523
///and returns void (as there will be no response from the server)
424524
/// </summary>
525+
/// <param name="queue">The name of the queue.</param>
526+
/// <param name="ifUnused">Only delete the queue if it is unused.</param>
527+
/// <param name="ifEmpty">Only delete the queue if it is empty.</param>
528+
/// <returns>Returns the number of messages purged during deletion.</returns>
425529
void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);
426530

427531
/// <summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

+14-4
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,12 @@ public override void BasicAck(ulong deliveryTag, bool multiple)
234234
ChannelSend(new BasicAck(deliveryTag, multiple));
235235
}
236236

237+
public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple)
238+
{
239+
var method = new BasicAck(deliveryTag, multiple);
240+
return ModelSendAsync(method);
241+
}
242+
237243
public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
238244
{
239245
ChannelSend(new BasicNack(deliveryTag, multiple, requeue));
@@ -254,6 +260,12 @@ public override void BasicReject(ulong deliveryTag, bool requeue)
254260
ChannelSend(new BasicReject(deliveryTag, requeue));
255261
}
256262

263+
public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue)
264+
{
265+
var method = new BasicReject(deliveryTag, requeue);
266+
return ModelSendAsync(method);
267+
}
268+
257269
public override void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
258270
{
259271
ChannelRpc(new QueueUnbind(queue, exchange, routingKey, arguments), ProtocolCommandId.QueueUnbindOk);
@@ -295,13 +307,11 @@ protected override bool DispatchAsynchronous(in IncomingCommand cmd)
295307
}
296308
case ProtocolCommandId.BasicCancelOk:
297309
{
298-
HandleBasicCancelOk(in cmd);
299-
return true;
310+
return HandleBasicCancelOk(in cmd);
300311
}
301312
case ProtocolCommandId.BasicConsumeOk:
302313
{
303-
HandleBasicConsumeOk(in cmd);
304-
return true;
314+
return HandleBasicConsumeOk(in cmd);
305315
}
306316
case ProtocolCommandId.BasicGetEmpty:
307317
{

0 commit comments

Comments
 (0)