Skip to content

Commit fccd710

Browse files
committed
Add QueuePurgeAsync
1 parent 076bddd commit fccd710

File tree

6 files changed

+88
-6
lines changed

6 files changed

+88
-6
lines changed

projects/RabbitMQ.Client/client/api/IChannel.cs

+8-6
Original file line numberDiff line numberDiff line change
@@ -558,14 +558,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
558558
/// <returns>Returns the number of messages purged during deletion.</returns>
559559
void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);
560560

561-
/// <summary>
562-
/// Purge a queue of messages.
563-
/// </summary>
564-
/// <remarks>
565-
/// Returns the number of messages purged.
566-
/// </remarks>
561+
/// <summary>Asynchronously purge a queue of messages.</summary>
562+
/// <param name="queue">The queue.</param>
563+
/// <returns>Returns the number of messages purged.</returns>
567564
uint QueuePurge(string queue);
568565

566+
/// <summary>Asynchronously purge a queue of messages.</summary>
567+
/// <param name="queue">The queue.</param>
568+
/// <returns>Returns the number of messages purged.</returns>
569+
ValueTask<uint> QueuePurgeAsync(string queue);
570+
569571
/// <summary>
570572
/// Unbind a queue from an exchange.
571573
/// </summary>

projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs

+27
Original file line numberDiff line numberDiff line change
@@ -374,4 +374,31 @@ public override void HandleCommand(in IncomingCommand cmd)
374374
}
375375
}
376376
}
377+
378+
internal class QueuePurgeAsyncRpcContinuation : AsyncRpcContinuation<uint>
379+
{
380+
public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
381+
{
382+
}
383+
384+
public override void HandleCommand(in IncomingCommand cmd)
385+
{
386+
try
387+
{
388+
if (cmd.CommandId == ProtocolCommandId.QueuePurgeOk)
389+
{
390+
var method = new Client.Framing.Impl.QueuePurgeOk(cmd.MethodBytes.Span);
391+
_tcs.TrySetResult(method._messageCount);
392+
}
393+
else
394+
{
395+
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
396+
}
397+
}
398+
finally
399+
{
400+
cmd.ReturnMethodBuffer();
401+
}
402+
}
403+
}
377404
}

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

+3
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,9 @@ public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty)
574574
public uint QueuePurge(string queue)
575575
=> InnerChannel.QueuePurge(queue);
576576

577+
public ValueTask<uint> QueuePurgeAsync(string queue)
578+
=> InnerChannel.QueuePurgeAsync(queue);
579+
577580
public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
578581
{
579582
ThrowIfDisposed();

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

+19
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,25 @@ public uint QueuePurge(string queue)
14661466
return _Private_QueuePurge(queue, false);
14671467
}
14681468

1469+
public async ValueTask<uint> QueuePurgeAsync(string queue)
1470+
{
1471+
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
1472+
try
1473+
{
1474+
var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout);
1475+
Enqueue(k);
1476+
1477+
var method = new QueuePurge(queue, false);
1478+
await ModelSendAsync(method).ConfigureAwait(false);
1479+
1480+
return await k;
1481+
}
1482+
finally
1483+
{
1484+
_rpcSemaphore.Release();
1485+
}
1486+
}
1487+
14691488
public abstract void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
14701489

14711490
public abstract void TxCommit();

projects/Unit/APIApproval.Approve.verified.txt

+1
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,7 @@ namespace RabbitMQ.Client
465465
System.Threading.Tasks.ValueTask<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty);
466466
void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);
467467
uint QueuePurge(string queue);
468+
System.Threading.Tasks.ValueTask<uint> QueuePurgeAsync(string queue);
468469
void QueueUnbind(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments);
469470
void TxCommit();
470471
void TxRollback();

projects/Unit/TestBasicPublish.cs

+30
Original file line numberDiff line numberDiff line change
@@ -292,5 +292,35 @@ public void TestPropertiesRountrip_Headers()
292292
Assert.Equal("World", response);
293293
}
294294
}
295+
296+
[Fact]
297+
public async Task TestQueuePurgeAsync()
298+
{
299+
const int messageCount = 1024;
300+
var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
301+
var cf = new ConnectionFactory { DispatchConsumersAsync = true };
302+
using IConnection connection = cf.CreateConnection();
303+
using IChannel channel = connection.CreateChannel();
304+
305+
await channel.ConfirmSelectAsync();
306+
307+
QueueDeclareOk q = await channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
308+
string queueName = q.QueueName;
309+
310+
var publishTask = Task.Run(async () =>
311+
{
312+
for (int i = 0; i < messageCount; i++)
313+
{
314+
byte[] body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
315+
await channel.BasicPublishAsync(string.Empty, queueName, body);
316+
}
317+
publishSyncSource.SetResult(true);
318+
});
319+
320+
await channel.WaitForConfirmsOrDieAsync();
321+
Assert.True(await publishSyncSource.Task);
322+
323+
Assert.Equal((uint)messageCount, await channel.QueuePurgeAsync(queueName));
324+
}
295325
}
296326
}

0 commit comments

Comments
 (0)