Skip to content

Commit e8ae39a

Browse files
committed
remove CancellationTokenSource from DispatcherChannelBase
1 parent 3adee90 commit e8ae39a

File tree

3 files changed

+31
-71
lines changed

3 files changed

+31
-71
lines changed

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Threading;
32
using System.Threading.Tasks;
43
using RabbitMQ.Client.Events;
54
using RabbitMQ.Client.Impl;
@@ -14,11 +13,11 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
1413
{
1514
}
1615

17-
protected override async Task ProcessChannelAsync(CancellationToken token)
16+
protected override async Task ProcessChannelAsync()
1817
{
1918
try
2019
{
21-
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
20+
while (await _reader.WaitToReadAsync().ConfigureAwait(false))
2221
{
2322
while (_reader.TryRead(out WorkStruct work))
2423
{
@@ -54,7 +53,7 @@ protected override async Task ProcessChannelAsync(CancellationToken token)
5453
}
5554
catch (OperationCanceledException)
5655
{
57-
if (false == token.IsCancellationRequested)
56+
if (!_reader.Completion.IsCompleted)
5857
{
5958
throw;
6059
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Threading;
32
using System.Threading.Tasks;
43
using RabbitMQ.Client.Events;
54
using RabbitMQ.Client.Impl;
@@ -14,11 +13,11 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency)
1413
{
1514
}
1615

17-
protected override async Task ProcessChannelAsync(CancellationToken token)
16+
protected override async Task ProcessChannelAsync()
1817
{
1918
try
2019
{
21-
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
20+
while (await _reader.WaitToReadAsync().ConfigureAwait(false))
2221
{
2322
while (_reader.TryRead(out WorkStruct work))
2423
{
@@ -60,7 +59,7 @@ await consumer.HandleBasicDeliverAsync(
6059
}
6160
catch (OperationCanceledException)
6261
{
63-
if (false == token.IsCancellationRequested)
62+
if (!_reader.Completion.IsCompleted)
6463
{
6564
throw;
6665
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

+25-63
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@ namespace RabbitMQ.Client.ConsumerDispatching
1111
#nullable enable
1212
internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, IConsumerDispatcher
1313
{
14-
protected readonly CancellationTokenSource _consumerDispatcherCts = new CancellationTokenSource();
15-
protected readonly CancellationToken _consumerDispatcherToken;
16-
1714
protected readonly ChannelBase _channel;
1815
protected readonly ChannelReader<WorkStruct> _reader;
1916
private readonly ChannelWriter<WorkStruct> _writer;
@@ -23,7 +20,6 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
2320

2421
internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency)
2522
{
26-
_consumerDispatcherToken = _consumerDispatcherCts.Token;
2723
_channel = channel;
2824
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
2925
{
@@ -34,18 +30,17 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency)
3430
_reader = workChannel.Reader;
3531
_writer = workChannel.Writer;
3632

37-
Func<Task> loopStart =
38-
() => ProcessChannelAsync(_consumerDispatcherToken);
33+
Func<Task> loopStart = ProcessChannelAsync;
3934
if (concurrency == 1)
4035
{
41-
_worker = Task.Run(loopStart, _consumerDispatcherToken);
36+
_worker = Task.Run(loopStart);
4237
}
4338
else
4439
{
4540
var tasks = new Task[concurrency];
4641
for (int i = 0; i < concurrency; i++)
4742
{
48-
tasks[i] = Task.Run(loopStart, _consumerDispatcherToken);
43+
tasks[i] = Task.Run(loopStart);
4944
}
5045
_worker = Task.WhenAll(tasks);
5146
}
@@ -122,21 +117,6 @@ public void Quiesce()
122117
_quiesce = true;
123118
}
124119

125-
private bool IsCancellationRequested
126-
{
127-
get
128-
{
129-
try
130-
{
131-
return _consumerDispatcherCts.IsCancellationRequested;
132-
}
133-
catch (ObjectDisposedException)
134-
{
135-
return true;
136-
}
137-
}
138-
}
139-
140120
public void WaitForShutdown()
141121
{
142122
if (_disposed)
@@ -146,40 +126,37 @@ public void WaitForShutdown()
146126

147127
if (_quiesce)
148128
{
149-
if (IsCancellationRequested)
129+
try
150130
{
151-
try
131+
if (false == _reader.Completion.Wait(TimeSpan.FromSeconds(2)))
152132
{
153-
if (false == _reader.Completion.Wait(TimeSpan.FromSeconds(2)))
154-
{
155-
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
156-
}
157-
if (false == _worker.Wait(TimeSpan.FromSeconds(2)))
158-
{
159-
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
160-
}
133+
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
161134
}
162-
catch (AggregateException aex)
135+
if (false == _worker.Wait(TimeSpan.FromSeconds(2)))
163136
{
164-
AggregateException aexf = aex.Flatten();
165-
bool foundUnexpectedException = false;
166-
foreach (Exception innerAexf in aexf.InnerExceptions)
167-
{
168-
if (false == (innerAexf is OperationCanceledException))
169-
{
170-
foundUnexpectedException = true;
171-
break;
172-
}
173-
}
174-
if (foundUnexpectedException)
137+
ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)");
138+
}
139+
}
140+
catch (AggregateException aex)
141+
{
142+
AggregateException aexf = aex.Flatten();
143+
bool foundUnexpectedException = false;
144+
foreach (Exception innerAexf in aexf.InnerExceptions)
145+
{
146+
if (false == (innerAexf is OperationCanceledException))
175147
{
176-
ESLog.Warn("consumer dispatcher task had unexpected exceptions");
148+
foundUnexpectedException = true;
149+
break;
177150
}
178151
}
179-
catch (OperationCanceledException)
152+
if (foundUnexpectedException)
180153
{
154+
ESLog.Warn("consumer dispatcher task had unexpected exceptions");
181155
}
182156
}
157+
catch (OperationCanceledException)
158+
{
159+
}
183160
}
184161
else
185162
{
@@ -238,17 +215,15 @@ protected sealed override void ShutdownConsumer(IBasicConsumer consumer, Shutdow
238215
protected override void InternalShutdown()
239216
{
240217
_writer.Complete();
241-
CancelConsumerDispatcherCts();
242218
}
243219

244220
protected override Task InternalShutdownAsync()
245221
{
246222
_writer.Complete();
247-
CancelConsumerDispatcherCts();
248223
return _worker;
249224
}
250225

251-
protected abstract Task ProcessChannelAsync(CancellationToken token);
226+
protected abstract Task ProcessChannelAsync();
252227

253228
protected readonly struct WorkStruct : IDisposable
254229
{
@@ -334,17 +309,6 @@ protected enum WorkType : byte
334309
ConsumeOk
335310
}
336311

337-
protected void CancelConsumerDispatcherCts()
338-
{
339-
try
340-
{
341-
_consumerDispatcherCts.Cancel();
342-
}
343-
catch (ObjectDisposedException)
344-
{
345-
}
346-
}
347-
348312
protected virtual void Dispose(bool disposing)
349313
{
350314
if (!_disposed)
@@ -354,8 +318,6 @@ protected virtual void Dispose(bool disposing)
354318
if (disposing)
355319
{
356320
Quiesce();
357-
CancelConsumerDispatcherCts();
358-
_consumerDispatcherCts.Dispose();
359321
}
360322
}
361323
catch

0 commit comments

Comments
 (0)