Skip to content

Commit 1bb16f3

Browse files
committed
Implement async for simple AMQP methods
Related to: * #1345 * #1308 * #970 * #843 Implement QueueDeleteAsync, ExchangeDeclareAsync and ExchangeDeleteAsync. Refactoring to come. Fix public API Move rpc continuations to their own files Add continuation timeouts to new AsyncRpcContinuations classes. Add ExchangeBindAsync to interface
1 parent eb8e031 commit 1bb16f3

9 files changed

+518
-101
lines changed

projects/RabbitMQ.Client/client/api/IChannel.cs

+46-3
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
276276
/// </remarks>
277277
void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
278278

279+
/// <summary>
280+
/// Asynchronously binds an exchange to an exchange.
281+
/// </summary>
282+
/// <remarks>
283+
/// <para>
284+
/// Routing key must be shorter than 255 bytes.
285+
/// </para>
286+
/// </remarks>
287+
ValueTask ExchangeBindAsync(string destination, string source, string routingKey, IDictionary<string, object> arguments);
288+
279289
/// <summary>
280290
/// Like ExchangeBind but sets nowait to true.
281291
/// </summary>
@@ -289,10 +299,17 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
289299
/// <summary>Declare an exchange.</summary>
290300
/// <remarks>
291301
/// The exchange is declared non-passive and non-internal.
292-
/// The "nowait" option is not exercised.
302+
/// The "nowait" option is not used.
293303
/// </remarks>
294304
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
295305

306+
/// <summary>Asynchronously declare an exchange.</summary>
307+
/// <remarks>
308+
/// The exchange is declared non-passive and non-internal.
309+
/// The "nowait" option is not exercised.
310+
/// </remarks>
311+
ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
312+
296313
/// <summary>
297314
/// Same as ExchangeDeclare but sets nowait to true and returns void (as there
298315
/// will be no response from the server).
@@ -315,6 +332,19 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
315332
/// </summary>
316333
void ExchangeDelete(string exchange, bool ifUnused);
317334

335+
/*
336+
* TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
337+
/// <summary>
338+
/// Asynchronously delete an exchange.
339+
/// </summary>
340+
ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused);
341+
*/
342+
343+
/// <summary>
344+
/// Asynchronously delete an exchange.
345+
/// </summary>
346+
ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused);
347+
318348
/// <summary>
319349
/// Like ExchangeDelete but sets nowait to true.
320350
/// </summary>
@@ -411,17 +441,30 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
411441
uint ConsumerCount(string queue);
412442

413443
/// <summary>
414-
/// Delete a queue.
444+
/// Deletes a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
445+
/// </summary>
446+
/// <param name="queue">The name of the queue.</param>
447+
/// <param name="ifUnused">Only delete the queue if it is unused.</param>
448+
/// <param name="ifEmpty">Only delete the queue if it is empty.</param>
449+
/// <returns>Returns the number of messages purged during deletion.</returns>
450+
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
451+
452+
/// <summary>
453+
/// Asynchronously deletes a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
415454
/// </summary>
416455
/// <remarks>
417456
///Returns the number of messages purged during queue deletion.
418457
/// </remarks>
419-
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
458+
ValueTask<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty);
420459

421460
/// <summary>
422461
///Same as QueueDelete but sets nowait parameter to true
423462
///and returns void (as there will be no response from the server)
424463
/// </summary>
464+
/// <param name="queue">The name of the queue.</param>
465+
/// <param name="ifUnused">Only delete the queue if it is unused.</param>
466+
/// <param name="ifEmpty">Only delete the queue if it is empty.</param>
467+
/// <returns>Returns the number of messages purged during deletion.</returns>
425468
void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);
426469

427470
/// <summary>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
namespace RabbitMQ.Client
33+
{
34+
/// <summary>
35+
/// Represents Queue deletion information.
36+
/// </summary>
37+
public class QueueDeleteOk
38+
{
39+
private readonly uint _messageCount;
40+
41+
/// <summary>
42+
/// Creates a new instance of <see cref="QueueDeleteOk"/>.
43+
/// </summary>
44+
/// <param name="messageCount">Message count.</param>
45+
public QueueDeleteOk(uint messageCount)
46+
{
47+
_messageCount = messageCount;
48+
}
49+
50+
/// <summary>
51+
/// Count of messages purged when queue was deleted.
52+
/// </summary>
53+
public uint MessageCount => _messageCount;
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Runtime.CompilerServices;
34+
using System.Threading;
35+
using System.Threading.Tasks;
36+
using RabbitMQ.Client.client.framing;
37+
using RabbitMQ.Client.Exceptions;
38+
using RabbitMQ.Client.Framing.Impl;
39+
40+
namespace RabbitMQ.Client.Impl
41+
{
42+
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation, IDisposable
43+
{
44+
private readonly CancellationTokenSource _ct;
45+
46+
protected readonly TaskCompletionSource<T> _tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
47+
48+
private bool _disposedValue;
49+
50+
public AsyncRpcContinuation(TimeSpan continuationTimeout)
51+
{
52+
_ct = new CancellationTokenSource(continuationTimeout);
53+
_ct.Token.Register(() =>
54+
{
55+
if (_tcs.TrySetCanceled())
56+
{
57+
// TODO LRB #1347
58+
// Cancellation was successful, does this mean we should set a TimeoutException
59+
// in the same manner as BlockingCell?
60+
}
61+
}, useSynchronizationContext: false);
62+
}
63+
64+
public TaskAwaiter<T> GetAwaiter() => _tcs.Task.GetAwaiter();
65+
66+
// TODO LRB #1347
67+
// What to do if setting a result fails?
68+
public abstract void HandleCommand(in IncomingCommand cmd);
69+
70+
public void HandleChannelShutdown(ShutdownEventArgs reason) => _tcs.SetException(new OperationInterruptedException(reason));
71+
72+
protected virtual void Dispose(bool disposing)
73+
{
74+
if (!_disposedValue)
75+
{
76+
if (disposing)
77+
{
78+
_ct.Dispose();
79+
}
80+
81+
_disposedValue = true;
82+
}
83+
}
84+
85+
public void Dispose()
86+
{
87+
Dispose(disposing: true);
88+
GC.SuppressFinalize(this);
89+
}
90+
}
91+
92+
internal class ConnectionSecureOrTuneContinuation : AsyncRpcContinuation<ConnectionSecureOrTune>
93+
{
94+
public ConnectionSecureOrTuneContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
95+
{
96+
}
97+
98+
public override void HandleCommand(in IncomingCommand cmd)
99+
{
100+
try
101+
{
102+
if (cmd.CommandId == ProtocolCommandId.ConnectionSecure)
103+
{
104+
var secure = new ConnectionSecure(cmd.MethodBytes.Span);
105+
_tcs.TrySetResult(new ConnectionSecureOrTune { m_challenge = secure._challenge });
106+
}
107+
else if (cmd.CommandId == ProtocolCommandId.ConnectionTune)
108+
{
109+
var tune = new ConnectionTune(cmd.MethodBytes.Span);
110+
// TODO LRB #1347
111+
// What to do if setting a result fails?
112+
_tcs.TrySetResult(new ConnectionSecureOrTune
113+
{
114+
m_tuneDetails = new() { m_channelMax = tune._channelMax, m_frameMax = tune._frameMax, m_heartbeatInSeconds = tune._heartbeat }
115+
});
116+
}
117+
else
118+
{
119+
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
120+
}
121+
}
122+
finally
123+
{
124+
cmd.ReturnMethodBuffer();
125+
}
126+
}
127+
}
128+
129+
internal class SimpleAsyncRpcContinuation : AsyncRpcContinuation<bool>
130+
{
131+
private readonly ProtocolCommandId _expectedCommandId;
132+
133+
public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan continuationTimeout) : base(continuationTimeout)
134+
{
135+
_expectedCommandId = expectedCommandId;
136+
}
137+
138+
public override void HandleCommand(in IncomingCommand cmd)
139+
{
140+
try
141+
{
142+
if (cmd.CommandId == _expectedCommandId)
143+
{
144+
_tcs.TrySetResult(true);
145+
}
146+
else
147+
{
148+
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
149+
}
150+
}
151+
finally
152+
{
153+
cmd.ReturnMethodBuffer();
154+
}
155+
}
156+
}
157+
158+
internal class ExchangeDeclareAsyncRpcContinuation : SimpleAsyncRpcContinuation
159+
{
160+
public ExchangeDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeDeclareOk, continuationTimeout)
161+
{
162+
}
163+
}
164+
165+
internal class ExchangeDeleteAsyncRpcContinuation : SimpleAsyncRpcContinuation
166+
{
167+
public ExchangeDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeDeleteOk, continuationTimeout)
168+
{
169+
}
170+
}
171+
172+
internal class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation<QueueDeclareOk>
173+
{
174+
public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
175+
{
176+
}
177+
178+
public override void HandleCommand(in IncomingCommand cmd)
179+
{
180+
try
181+
{
182+
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
183+
var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
184+
if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk)
185+
{
186+
_tcs.TrySetResult(result);
187+
}
188+
else
189+
{
190+
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
191+
}
192+
}
193+
finally
194+
{
195+
cmd.ReturnMethodBuffer();
196+
}
197+
}
198+
}
199+
200+
internal class QueueDeleteAsyncRpcContinuation : AsyncRpcContinuation<QueueDeleteOk>
201+
{
202+
public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
203+
{
204+
}
205+
206+
public override void HandleCommand(in IncomingCommand cmd)
207+
{
208+
try
209+
{
210+
var result = new Client.Framing.Impl.QueueDeleteOk(cmd.MethodBytes.Span);
211+
if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk)
212+
{
213+
_tcs.TrySetResult(result);
214+
}
215+
else
216+
{
217+
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
218+
}
219+
}
220+
finally
221+
{
222+
cmd.ReturnMethodBuffer();
223+
}
224+
}
225+
}
226+
}

0 commit comments

Comments
 (0)