Skip to content

Commit bded97c

Browse files
authored
Merge pull request #1453 from rabbitmq/rabbitmq-dotnet-client-1043
Ensure delivery tag is decremented for client-side exception
2 parents 8a3fc58 + 98beae2 commit bded97c

File tree

6 files changed

+155
-13
lines changed

6 files changed

+155
-13
lines changed

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

+2
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,9 @@ RabbitMQ.Client.IChannel.TxRollback() -> void
567567
RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.ValueTask
568568
RabbitMQ.Client.IChannel.TxSelect() -> void
569569
RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.ValueTask
570+
RabbitMQ.Client.IChannel.WaitForConfirms() -> bool
570571
RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
572+
RabbitMQ.Client.IChannel.WaitForConfirmsOrDie() -> void
571573
RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
572574
RabbitMQ.Client.IChannelExtensions
573575
RabbitMQ.Client.IConnection

projects/RabbitMQ.Client/client/api/IChannel.cs

+24
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,19 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
598598
/// Wait until all published messages on this channel have been confirmed.
599599
/// </summary>
600600
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
601+
/// <remarks>
602+
/// Waits until all messages published on this channel since the last call have
603+
/// been either ack'd or nack'd by the server. Returns whether
604+
/// all the messages were ack'd (and none were nack'd).
605+
/// Throws an exception when called on a channel
606+
/// that does not have publisher confirms enabled.
607+
/// </remarks>
608+
bool WaitForConfirms();
609+
610+
/// <summary>
611+
/// Asynchronously wait until all published messages on this channel have been confirmed.
612+
/// </summary>
613+
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
601614
/// <param name="token">The cancellation token.</param>
602615
/// <remarks>
603616
/// Waits until all messages published on this channel since the last call have
@@ -608,6 +621,17 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
608621
/// </remarks>
609622
Task<bool> WaitForConfirmsAsync(CancellationToken token = default);
610623

624+
/// <summary>
625+
/// Wait until all published messages on this channel have been confirmed.
626+
/// </summary>
627+
/// <remarks>
628+
/// Waits until all messages published on this channel since the last call have
629+
/// been ack'd by the server. If a nack is received or the timeout
630+
/// elapses, throws an IOException exception immediately and closes
631+
/// the channel.
632+
/// </remarks>
633+
void WaitForConfirmsOrDie();
634+
611635
/// <summary>
612636
/// Wait until all published messages on this channel have been confirmed.
613637
/// </summary>

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

+6
Original file line numberDiff line numberDiff line change
@@ -602,9 +602,15 @@ public ValueTask TxSelectAsync()
602602
return InnerChannel.TxSelectAsync();
603603
}
604604

605+
public bool WaitForConfirms()
606+
=> InnerChannel.WaitForConfirms();
607+
605608
public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
606609
=> InnerChannel.WaitForConfirmsAsync(token);
607610

611+
public void WaitForConfirmsOrDie()
612+
=> InnerChannel.WaitForConfirmsOrDie();
613+
608614
public Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
609615
=> InnerChannel.WaitForConfirmsOrDieAsync(token);
610616

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

+83-8
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public IBasicConsumer DefaultConsumer
166166

167167
public bool IsOpen => CloseReason is null;
168168

169+
// TODO add private bool for Confirm mode
169170
public ulong NextPublishSeqNo { get; private set; }
170171

171172
public string CurrentQueue { get; private set; }
@@ -1239,8 +1240,24 @@ public void BasicPublish<TProperties>(string exchange, string routingKey, in TPr
12391240
}
12401241
}
12411242

1242-
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
1243-
ChannelSend(in cmd, in basicProperties, body);
1243+
try
1244+
{
1245+
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
1246+
ChannelSend(in cmd, in basicProperties, body);
1247+
}
1248+
catch
1249+
{
1250+
if (NextPublishSeqNo > 0)
1251+
{
1252+
lock (_confirmLock)
1253+
{
1254+
NextPublishSeqNo--;
1255+
_pendingDeliveryTags.RemoveLast();
1256+
}
1257+
}
1258+
1259+
throw;
1260+
}
12441261
}
12451262

12461263
public void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
@@ -1254,8 +1271,24 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
12541271
}
12551272
}
12561273

1257-
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1258-
ChannelSend(in cmd, in basicProperties, body);
1274+
try
1275+
{
1276+
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1277+
ChannelSend(in cmd, in basicProperties, body);
1278+
}
1279+
catch
1280+
{
1281+
if (NextPublishSeqNo > 0)
1282+
{
1283+
lock (_confirmLock)
1284+
{
1285+
NextPublishSeqNo--;
1286+
_pendingDeliveryTags.RemoveLast();
1287+
}
1288+
}
1289+
1290+
throw;
1291+
}
12591292
}
12601293

12611294
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
@@ -1269,8 +1302,24 @@ public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingK
12691302
}
12701303
}
12711304

1272-
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
1273-
return ModelSendAsync(in cmd, in basicProperties, body);
1305+
try
1306+
{
1307+
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
1308+
return ModelSendAsync(in cmd, in basicProperties, body);
1309+
}
1310+
catch
1311+
{
1312+
if (NextPublishSeqNo > 0)
1313+
{
1314+
lock (_confirmLock)
1315+
{
1316+
NextPublishSeqNo--;
1317+
_pendingDeliveryTags.RemoveLast();
1318+
}
1319+
}
1320+
1321+
throw;
1322+
}
12741323
}
12751324

12761325
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
@@ -1284,8 +1333,24 @@ public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedStr
12841333
}
12851334
}
12861335

1287-
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1288-
return ModelSendAsync(in cmd, in basicProperties, body);
1336+
try
1337+
{
1338+
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1339+
return ModelSendAsync(in cmd, in basicProperties, body);
1340+
}
1341+
catch
1342+
{
1343+
if (NextPublishSeqNo > 0)
1344+
{
1345+
lock (_confirmLock)
1346+
{
1347+
NextPublishSeqNo--;
1348+
_pendingDeliveryTags.RemoveLast();
1349+
}
1350+
}
1351+
1352+
throw;
1353+
}
12891354
}
12901355

12911356
public void UpdateSecret(string newSecret, string reason)
@@ -1755,6 +1820,11 @@ await ModelSendAsync(method)
17551820

17561821
private List<TaskCompletionSource<bool>> _confirmsTaskCompletionSources;
17571822

1823+
public bool WaitForConfirms()
1824+
{
1825+
return WaitForConfirmsAsync().EnsureCompleted();
1826+
}
1827+
17581828
public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
17591829
{
17601830
if (NextPublishSeqNo == 0UL)
@@ -1812,6 +1882,11 @@ await tokenRegistration.DisposeAsync()
18121882
}
18131883
}
18141884

1885+
public void WaitForConfirmsOrDie()
1886+
{
1887+
WaitForConfirmsOrDieAsync().EnsureCompleted();
1888+
}
1889+
18151890
public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
18161891
{
18171892
try

projects/Test/AsyncIntegration/TestConnectionFactory.cs

-3
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929
// Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32-
#if !NET6_0_OR_GREATER
33-
using System;
34-
#endif
3532
using System;
3633
using System.Collections.Generic;
3734
using System.Threading;

projects/Test/Integration/TestConfirmSelect.cs

+40-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ public TestConfirmSelect(ITestOutputHelper output) : base(output)
4444
[Fact]
4545
public void TestConfirmSelectIdempotency()
4646
{
47+
void Publish()
48+
{
49+
_channel.BasicPublish("", "amq.fanout", _encoding.GetBytes("message"));
50+
}
51+
4752
_channel.ConfirmSelect();
4853
Assert.Equal(1ul, _channel.NextPublishSeqNo);
4954
Publish();
@@ -60,9 +65,42 @@ public void TestConfirmSelectIdempotency()
6065
Assert.Equal(6ul, _channel.NextPublishSeqNo);
6166
}
6267

63-
protected void Publish()
68+
[Theory]
69+
[InlineData(255)]
70+
[InlineData(256)]
71+
public void TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)
6472
{
65-
_channel.BasicPublish("", "amq.fanout", _encoding.GetBytes("message"));
73+
byte[] body = GetRandomBody(16);
74+
75+
_channel.ExchangeDeclare("sample", "fanout", autoDelete: true);
76+
// _channel.BasicAcks += (s, e) => _output.WriteLine("Acked {0}", e.DeliveryTag);
77+
_channel.ConfirmSelect();
78+
79+
var properties = new BasicProperties();
80+
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
81+
_channel.BasicPublish(exchange: "sample", routingKey: string.Empty, in properties, body);
82+
_channel.WaitForConfirmsOrDie();
83+
84+
try
85+
{
86+
properties = new BasicProperties
87+
{
88+
CorrelationId = new string('o', correlationIdLength)
89+
};
90+
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
91+
_channel.BasicPublish("sample", string.Empty, in properties, body);
92+
_channel.WaitForConfirmsOrDie();
93+
}
94+
catch
95+
{
96+
// _output.WriteLine("Error when trying to publish with long string: {0}", e.Message);
97+
}
98+
99+
properties = new BasicProperties();
100+
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
101+
_channel.BasicPublish("sample", string.Empty, in properties, body);
102+
_channel.WaitForConfirmsOrDie();
103+
// _output.WriteLine("I'm done...");
66104
}
67105
}
68106
}

0 commit comments

Comments
 (0)