Skip to content

Commit 965256a

Browse files
committed
Fix sync Channel Close and add test using CloseAsync()
1 parent fccd710 commit 965256a

File tree

4 files changed

+75
-7
lines changed

4 files changed

+75
-7
lines changed

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

+20-3
Original file line numberDiff line numberDiff line change
@@ -250,15 +250,32 @@ public static void Close(this IChannel channel)
250250
channel.Close(Constants.ReplySuccess, "Goodbye", false);
251251
}
252252

253-
/// <summary>Close this session.</summary>
253+
/// <summary>Asynchronously close this session.</summary>
254+
/// <remarks>
255+
/// If the session is already closed (or closing), then this
256+
/// method does nothing but wait for the in-progress close
257+
/// operation to complete. This method will not return to the
258+
/// caller until the shutdown is complete.
259+
/// </remarks>
260+
public static ValueTask CloseAsync(this IChannel channel)
261+
{
262+
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, "Goodbye");
263+
return channel.CloseAsync(reason, false);
264+
}
265+
266+
/// <summary>
267+
/// Close this channel.
268+
/// </summary>
269+
/// <param name="channel">The channel.</param>
270+
/// <param name="replyCode">The reply code.</param>
271+
/// <param name="replyText">The reply text.</param>
254272
/// <remarks>
255273
/// The method behaves in the same way as Close(), with the only
256274
/// difference that the channel is closed with the given channel
257275
/// close code and message.
258276
/// <para>
259277
/// The close code (See under "Reply Codes" in the AMQP specification)
260-
/// </para>
261-
/// <para>
278+
/// </para><para>
262279
/// A message indicating the reason for closing the channel
263280
/// </para>
264281
/// </remarks>

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

+41-3
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,47 @@ protected void TakeOver(ChannelBase other)
194194

195195
public void Close(ushort replyCode, string replyText, bool abort)
196196
{
197-
_ = CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText), abort);
197+
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
198+
var k = new ShutdownContinuation();
199+
ChannelShutdown += k.OnConnectionShutdown;
200+
201+
try
202+
{
203+
ConsumerDispatcher.Quiesce();
204+
205+
if (SetCloseReason(reason))
206+
{
207+
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0);
208+
}
209+
210+
k.Wait(TimeSpan.FromMilliseconds(10000));
211+
ConsumerDispatcher.WaitForShutdownAsync().ConfigureAwait(false);
212+
}
213+
catch (AlreadyClosedException)
214+
{
215+
if (!abort)
216+
{
217+
throw;
218+
}
219+
}
220+
catch (IOException)
221+
{
222+
if (!abort)
223+
{
224+
throw;
225+
}
226+
}
227+
catch (Exception)
228+
{
229+
if (!abort)
230+
{
231+
throw;
232+
}
233+
}
234+
finally
235+
{
236+
ChannelShutdown -= k.OnConnectionShutdown;
237+
}
198238
}
199239

200240
public async ValueTask CloseAsync(ShutdownEventArgs reason, bool abort)
@@ -216,8 +256,6 @@ public async ValueTask CloseAsync(ShutdownEventArgs reason, bool abort)
216256
bool result = await k;
217257
Debug.Assert(result);
218258

219-
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
220-
// k.Wait(TimeSpan.FromMilliseconds(10000));
221259
await ConsumerDispatcher.WaitForShutdownAsync().ConfigureAwait(false);
222260
}
223261
catch (AlreadyClosedException)

projects/Unit/APIApproval.Approve.verified.txt

+1
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ namespace RabbitMQ.Client
489489
public static System.Threading.Tasks.ValueTask BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default, bool mandatory = false) { }
490490
public static void Close(this RabbitMQ.Client.IChannel channel) { }
491491
public static void Close(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) { }
492+
public static System.Threading.Tasks.ValueTask CloseAsync(this RabbitMQ.Client.IChannel channel) { }
492493
public static void ExchangeBind(this RabbitMQ.Client.IChannel channel, string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
493494
public static void ExchangeBindNoWait(this RabbitMQ.Client.IChannel channel, string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
494495
public static void ExchangeDeclare(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null) { }

projects/Unit/TestChannelAllocation.cs

+13-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading.Tasks;
3435
using RabbitMQ.Client.Impl;
3536
using Xunit;
3637

@@ -50,7 +51,8 @@ public int ChannelNumber(IChannel channel)
5051

5152
public TestChannelAllocation()
5253
{
53-
_c = new ConnectionFactory().CreateConnection();
54+
var cf = new ConnectionFactory();
55+
_c = cf.CreateConnection();
5456
}
5557

5658
public void Dispose() => _c.Close();
@@ -72,6 +74,16 @@ public void AllocateAfterFreeingLast()
7274
Assert.Equal(1, ChannelNumber(ch));
7375
}
7476

77+
[Fact]
78+
public async Task AllocateAfterFreeingLastAsync()
79+
{
80+
IChannel ch = _c.CreateChannel();
81+
Assert.Equal(1, ChannelNumber(ch));
82+
await ch.CloseAsync();
83+
ch = _c.CreateChannel();
84+
Assert.Equal(1, ChannelNumber(ch));
85+
}
86+
7587
public int CompareChannels(IChannel x, IChannel y)
7688
{
7789
int i = ChannelNumber(x);

0 commit comments

Comments
 (0)