Skip to content

Commit 1ea8c30

Browse files
committed
implement channel
1 parent 34aac15 commit 1ea8c30

File tree

116 files changed

+3922
-6221
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+3922
-6221
lines changed

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

+4-1
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,15 @@
5757
</AssemblyAttribute>
5858
</ItemGroup>
5959

60+
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
61+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
62+
<PackageReference Include="System.Memory" Version="4.5.4" />
63+
</ItemGroup>
6064
<ItemGroup>
6165
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="3.3.1" PrivateAssets="All" />
6266
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
6367
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
6468
<PackageReference Include="MinVer" Version="2.3.0" PrivateAssets="All" />
65-
<PackageReference Include="System.Memory" Version="4.5.4" />
6669
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
6770
</ItemGroup>
6871

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Runtime.CompilerServices;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#if NETCOREAPP3_1 || NETSTANDARD
6+
internal static class Interlocked
7+
{
8+
public static ulong CompareExchange(ref ulong location1, ulong value, ulong comparand)
9+
{
10+
return (ulong)System.Threading.Interlocked.CompareExchange(ref Unsafe.As<ulong, long>(ref location1), (long)value, (long)comparand);
11+
}
12+
13+
public static ulong Increment(ref ulong location1)
14+
{
15+
return (ulong)System.Threading.Interlocked.Add(ref Unsafe.As<ulong, long>(ref location1), 1L);
16+
}
17+
}
18+
#endif
19+
}

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

+13-13
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Threading.Tasks;
5-
5+
using RabbitMQ.Client.client.impl.Channel;
66
using RabbitMQ.Client.Events;
77

88
namespace RabbitMQ.Client
@@ -12,29 +12,29 @@ public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer
1212
private readonly HashSet<string> _consumerTags = new HashSet<string>();
1313

1414
/// <summary>
15-
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.
15+
/// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer"/>.
1616
/// </summary>
1717
public AsyncDefaultBasicConsumer()
1818
{
1919
ShutdownReason = null;
20-
Model = null;
20+
Channel = null;
2121
IsRunning = false;
2222
}
2323

2424
/// <summary>
25-
/// Constructor which sets the Model property to the given value.
25+
/// Constructor which sets the <see cref="Channel"/> property to the given value.
2626
/// </summary>
27-
/// <param name="model">Common AMQP model.</param>
28-
public AsyncDefaultBasicConsumer(IModel model)
27+
/// <param name="channel">The channel.</param>
28+
public AsyncDefaultBasicConsumer(IChannel channel)
2929
{
3030
ShutdownReason = null;
3131
IsRunning = false;
32-
Model = model;
32+
Channel = channel;
3333
}
3434

3535
/// <summary>
3636
/// Retrieve the consumer tags this consumer is registered as; to be used when discussing this consumer
37-
/// with the server, for instance with <see cref="IModel.BasicCancel"/>.
37+
/// with the server, for instance with <see cref="IChannel.CancelConsumerAsync"/>.
3838
/// </summary>
3939
public string[] ConsumerTags
4040
{
@@ -50,7 +50,7 @@ public string[] ConsumerTags
5050
public bool IsRunning { get; protected set; }
5151

5252
/// <summary>
53-
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
53+
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
5454
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
5555
/// </summary>
5656
public ShutdownEventArgs ShutdownReason { get; protected set; }
@@ -61,10 +61,10 @@ public string[] ConsumerTags
6161
public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled;
6262

6363
/// <summary>
64-
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
64+
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
6565
/// for use in acknowledging received messages, for instance.
6666
/// </summary>
67-
public IModel Model { get; set; }
67+
public IChannel Channel { get; set; }
6868

6969
/// <summary>
7070
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
@@ -101,7 +101,7 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
101101
/// Called each time a message is delivered for this consumer.
102102
/// </summary>
103103
/// <remarks>
104-
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IModel.BasicAck"/>
104+
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IChannel.AckMessageAsync"/>
105105
/// if consuming in automatic acknowledgement mode.
106106
/// Subclasses must copy or fully use delivery body before returning.
107107
/// Accessing the body at a later point is unsafe as its memory can
@@ -120,7 +120,7 @@ public virtual Task HandleBasicDeliver(string consumerTag,
120120
}
121121

122122
/// <summary>
123-
/// Called when the model (channel) this consumer was registered on terminates.
123+
/// Called when the channel this consumer was registered on terminates.
124124
/// </summary>
125125
/// <param name="model">A channel this consumer was registered on.</param>
126126
/// <param name="reason">Shutdown context.</param>

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ namespace RabbitMQ.Client
5959
/// //
6060
/// IConnection conn = factory.CreateConnection();
6161
/// //
62-
/// IModel ch = conn.CreateModel();
62+
/// IChannel ch = await conn.CreateChannelAsync().ConfigureAwait(false);
6363
/// //
64-
/// // ... use ch's IModel methods ...
64+
/// // ... use ch's IChannel methods ...
6565
/// //
66-
/// ch.Close(Constants.ReplySuccess, "Closing the channel");
66+
/// await ch.CloseAsync().ConfigureAwait(false);
6767
/// conn.Close(Constants.ReplySuccess, "Closing the connection");
6868
/// </code></example>
6969
/// <para>
@@ -492,7 +492,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
492492
else
493493
{
494494
var protocol = new RabbitMQ.Client.Framing.Protocol();
495-
conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
495+
conn = protocol.CreateConnection(this, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
496496
}
497497
}
498498
catch (Exception e)

projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs

+12-12
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
using System;
3333
using System.Collections.Generic;
3434
using System.Linq;
35-
35+
using RabbitMQ.Client.client.impl.Channel;
3636
using RabbitMQ.Client.Events;
3737

3838
namespace RabbitMQ.Client
@@ -56,24 +56,24 @@ public class DefaultBasicConsumer : IBasicConsumer
5656
public DefaultBasicConsumer()
5757
{
5858
ShutdownReason = null;
59-
Model = null;
59+
Channel = null;
6060
IsRunning = false;
6161
}
6262

6363
/// <summary>
64-
/// Constructor which sets the Model property to the given value.
64+
/// Constructor which sets the Channel property to the given value.
6565
/// </summary>
66-
/// <param name="model">Common AMQP model.</param>
67-
public DefaultBasicConsumer(IModel model)
66+
/// <param name="channel">The channel.</param>
67+
public DefaultBasicConsumer(IChannel channel)
6868
{
6969
ShutdownReason = null;
7070
IsRunning = false;
71-
Model = model;
71+
Channel = channel;
7272
}
7373

7474
/// <summary>
7575
/// Retrieve the consumer tags this consumer is registered as; to be used to identify
76-
/// this consumer, for example, when cancelling it with <see cref="IModel.BasicCancel"/>.
76+
/// this consumer, for example, when cancelling it with <see cref="IChannel.CancelConsumerAsync"/>.
7777
/// This value is an array because a single consumer instance can be reused to consume on
7878
/// multiple channels.
7979
/// </summary>
@@ -91,7 +91,7 @@ public string[] ConsumerTags
9191
public bool IsRunning { get; protected set; }
9292

9393
/// <summary>
94-
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
94+
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
9595
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
9696
/// </summary>
9797
public ShutdownEventArgs ShutdownReason { get; protected set; }
@@ -102,10 +102,10 @@ public string[] ConsumerTags
102102
public event EventHandler<ConsumerEventArgs> ConsumerCancelled;
103103

104104
/// <summary>
105-
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
105+
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
106106
/// for use in acknowledging received messages, for instance.
107107
/// </summary>
108-
public IModel Model { get; set; }
108+
public IChannel Channel { get; set; }
109109

110110
/// <summary>
111111
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
@@ -141,7 +141,7 @@ public virtual void HandleBasicConsumeOk(string consumerTag)
141141
/// Called each time a message is delivered for this consumer.
142142
/// </summary>
143143
/// <remarks>
144-
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IModel.BasicAck"/>
144+
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IChannel.AckMessageAsync"/>
145145
/// if consuming in automatic acknowledgement mode.
146146
/// Subclasses must copy or fully use delivery body before returning.
147147
/// Accessing the body at a later point is unsafe as its memory can
@@ -159,7 +159,7 @@ public virtual void HandleBasicDeliver(string consumerTag,
159159
}
160160

161161
/// <summary>
162-
/// Called when the model (channel) this consumer was registered on terminates.
162+
/// Called when the channel this consumer was registered on terminates.
163163
/// </summary>
164164
/// <param name="model">A channel this consumer was registered on.</param>
165165
/// <param name="reason">Shutdown context.</param>

projects/RabbitMQ.Client/client/api/ExchangeType.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace RabbitMQ.Client
3838
/// </summary>
3939
/// <remarks>
4040
/// Use the static members of this class as values for the
41-
/// "exchangeType" arguments for IModel methods such as
41+
/// "exchangeType" arguments for IChannel methods such as
4242
/// ExchangeDeclare. The broker may be extended with additional
4343
/// exchange types that do not appear in this class.
4444
/// </remarks>

projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
using System;
22
using System.Threading.Tasks;
3-
3+
using RabbitMQ.Client.client.impl.Channel;
44
using RabbitMQ.Client.Events;
55

66
namespace RabbitMQ.Client
77
{
88
public interface IAsyncBasicConsumer
99
{
1010
/// <summary>
11-
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
11+
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
1212
/// for use in acknowledging received messages, for instance.
1313
/// </summary>
14-
IModel Model { get; }
14+
IChannel Channel { get; }
1515

1616
/// <summary>
1717
/// Signalled when the consumer gets cancelled.
@@ -43,7 +43,7 @@ public interface IAsyncBasicConsumer
4343
/// </summary>
4444
/// <remarks>
4545
/// Does nothing with the passed in information.
46-
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
46+
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IChannel.AckMessageAsync"/>.
4747
/// The implementation of this method in this class does NOT acknowledge such messages.
4848
/// </remarks>
4949
Task HandleBasicDeliver(string consumerTag,
@@ -55,10 +55,10 @@ Task HandleBasicDeliver(string consumerTag,
5555
ReadOnlyMemory<byte> body);
5656

5757
/// <summary>
58-
/// Called when the model shuts down.
58+
/// Called when the channel shuts down.
5959
/// </summary>
60-
/// <param name="model"> Common AMQP model.</param>
61-
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
60+
/// <param name="model">The channel.</param>
61+
/// <param name="reason"> Information about the reason why a particular channel, session, or connection was destroyed.</param>
6262
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
6363
}
6464
}

projects/RabbitMQ.Client/client/api/IBasicConsumer.cs

+7-10
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
33+
using RabbitMQ.Client.client.impl.Channel;
3434
using RabbitMQ.Client.Events;
3535

3636
namespace RabbitMQ.Client
@@ -39,9 +39,6 @@ namespace RabbitMQ.Client
3939
///receive messages from a queue by subscription.</summary>
4040
/// <remarks>
4141
/// <para>
42-
/// See IModel.BasicConsume, IModel.BasicCancel.
43-
/// </para>
44-
/// <para>
4542
/// Note that the "Handle*" methods run in the connection's
4643
/// thread! Consider using <see cref="EventingBasicConsumer"/>, which uses a
4744
/// SharedQueue instance to safely pass received messages across
@@ -51,10 +48,10 @@ namespace RabbitMQ.Client
5148
public interface IBasicConsumer
5249
{
5350
/// <summary>
54-
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
51+
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
5552
/// for use in acknowledging received messages, for instance.
5653
/// </summary>
57-
IModel Model { get; }
54+
IChannel Channel { get; }
5855

5956
/// <summary>
6057
/// Signalled when the consumer gets cancelled.
@@ -86,7 +83,7 @@ public interface IBasicConsumer
8683
/// </summary>
8784
/// <remarks>
8885
/// Does nothing with the passed in information.
89-
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
86+
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IChannel.AckMessageAsync"/>.
9087
/// The implementation of this method in this class does NOT acknowledge such messages.
9188
/// </remarks>
9289
void HandleBasicDeliver(string consumerTag,
@@ -98,10 +95,10 @@ void HandleBasicDeliver(string consumerTag,
9895
ReadOnlyMemory<byte> body);
9996

10097
/// <summary>
101-
/// Called when the model shuts down.
98+
/// Called when the channel shuts down.
10299
/// </summary>
103-
/// <param name="model"> Common AMQP model.</param>
104-
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
100+
/// <param name="model">The channel.</param>
101+
/// <param name="reason"> Information about the reason why a particular channel, session, or connection was destroyed.</param>
105102
void HandleModelShutdown(object model, ShutdownEventArgs reason);
106103
}
107104
}

projects/RabbitMQ.Client/client/api/IBasicProperties.cs

-6
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@ namespace RabbitMQ.Client
3838
/// 0-8, 0-8qpid, 0-9 and 0-9-1 of AMQP.</summary>
3939
/// <remarks>
4040
/// <para>
41-
/// The specification code generator provides
42-
/// protocol-version-specific implementations of this interface. To
43-
/// obtain an implementation of this interface in a
44-
/// protocol-version-neutral way, use <see cref="IModel.CreateBasicProperties"/>.
45-
/// </para>
46-
/// <para>
4741
/// Each property is readable, writable and clearable: a cleared
4842
/// property will not be transmitted over the wire. Properties on a
4943
/// fresh instance are clear by default.

projects/RabbitMQ.Client/client/api/IConnection.cs

+7-7
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
using System.Collections.Generic;
3434
using System.IO;
3535
using System.Threading;
36-
36+
using System.Threading.Tasks;
37+
using RabbitMQ.Client.client.impl.Channel;
3738
using RabbitMQ.Client.Events;
3839
using RabbitMQ.Client.Exceptions;
39-
using RabbitMQ.Client.Impl;
4040

4141
namespace RabbitMQ.Client
4242
{
@@ -218,7 +218,7 @@ public interface IConnection : INetworkConnection, IDisposable
218218
/// Abort this connection and all its channels.
219219
/// </summary>
220220
/// <remarks>
221-
/// Note that all active channels, sessions, and models will be closed if this method is called.
221+
/// Note that all active channels, sessions, and consumers will be closed if this method is called.
222222
/// In comparison to normal <see cref="Close()"/> method, <see cref="Abort()"/> will not throw
223223
/// <see cref="IOException"/> during closing connection.
224224
///This method waits infinitely for the in-progress close operation to complete.
@@ -275,7 +275,7 @@ public interface IConnection : INetworkConnection, IDisposable
275275
/// Close this connection and all its channels.
276276
/// </summary>
277277
/// <remarks>
278-
/// Note that all active channels, sessions, and models will be
278+
/// Note that all active channels, sessions, and consumers will be
279279
/// closed if this method is called. It will wait for the in-progress
280280
/// close operation to complete. This method will not return to the caller
281281
/// until the shutdown is complete. If the connection is already closed
@@ -304,7 +304,7 @@ public interface IConnection : INetworkConnection, IDisposable
304304
/// and wait with a timeout for all the in-progress close operations to complete.
305305
/// </summary>
306306
/// <remarks>
307-
/// Note that all active channels, sessions, and models will be
307+
/// Note that all active channels, sessions, and consumers will be
308308
/// closed if this method is called. It will wait for the in-progress
309309
/// close operation to complete with a timeout. If the connection is
310310
/// already closed (or closing), then this method will do nothing.
@@ -336,9 +336,9 @@ public interface IConnection : INetworkConnection, IDisposable
336336
void Close(ushort reasonCode, string reasonText, TimeSpan timeout);
337337

338338
/// <summary>
339-
/// Create and return a fresh channel, session, and model.
339+
/// Create and return a fresh channel, session.
340340
/// </summary>
341-
IModel CreateModel();
341+
ValueTask<IChannel> CreateChannelAsync();
342342

343343
/// <summary>
344344
/// Handle incoming Connection.Blocked methods.

0 commit comments

Comments
 (0)