diff --git a/projects/Benchmarks/WireFormatting/MethodFraming.cs b/projects/Benchmarks/WireFormatting/MethodFraming.cs index 7e66796e5f..6fc40f3097 100644 --- a/projects/Benchmarks/WireFormatting/MethodFraming.cs +++ b/projects/Benchmarks/WireFormatting/MethodFraming.cs @@ -19,7 +19,7 @@ public class MethodFramingBasicAck public ushort Channel { get; set; } [Benchmark] - internal RentedMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel); + internal RentedOutgoingMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel); } [Config(typeof(Config))] @@ -41,13 +41,13 @@ public class MethodFramingBasicPublish public int FrameMax { get; set; } [Benchmark] - internal RentedMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax); + internal RentedOutgoingMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax); [Benchmark] - internal RentedMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); + internal RentedOutgoingMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); [Benchmark] - internal RentedMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); + internal RentedOutgoingMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax); } [Config(typeof(Config))] @@ -60,6 +60,6 @@ public class MethodFramingChannelClose public ushort Channel { get; set; } [Benchmark] - internal RentedMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel); + internal RentedOutgoingMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel); } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 239e593250..947b4e9d5d 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -2,6 +2,7 @@ abstract RabbitMQ.Client.Exceptions.ProtocolException.ReplyCode.get -> ushort const RabbitMQ.Client.AmqpTcpEndpoint.DefaultAmqpSslPort = 5671 -> int const RabbitMQ.Client.AmqpTcpEndpoint.UseDefaultPort = -1 -> int const RabbitMQ.Client.ConnectionFactory.DefaultChannelMax = 2047 -> ushort +const RabbitMQ.Client.ConnectionFactory.DefaultCopyBodyToMemoryThreshold = 2147483647 -> int const RabbitMQ.Client.ConnectionFactory.DefaultFrameMax = 0 -> uint const RabbitMQ.Client.ConnectionFactory.DefaultMaxMessageSize = 134217728 -> uint const RabbitMQ.Client.ConnectionFactory.DefaultPass = "guest" -> string @@ -212,6 +213,8 @@ RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void +RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.get -> int +RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.set -> void RabbitMQ.Client.ConnectionFactory.CreateConnection() -> RabbitMQ.Client.IConnection RabbitMQ.Client.ConnectionFactory.CreateConnection(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName) -> RabbitMQ.Client.IConnection RabbitMQ.Client.ConnectionFactory.CreateConnection(string clientProvidedName) -> RabbitMQ.Client.IConnection @@ -507,8 +510,10 @@ RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool r RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler RabbitMQ.Client.IChannel.BasicPublish(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void RabbitMQ.Client.IChannel.BasicPublish(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void -RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence body = default(System.Buffers.ReadOnlySequence), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence body = default(System.Buffers.ReadOnlySequence), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicQos(uint prefetchSize, ushort prefetchCount, bool global) -> void RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicReject(ulong deliveryTag, bool requeue) -> void @@ -583,6 +588,7 @@ RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler System.EventHandler RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler +RabbitMQ.Client.IConnection.CopyBodyToMemoryThreshold.get -> int RabbitMQ.Client.IConnection.CreateChannel() -> RabbitMQ.Client.IChannel RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint @@ -880,6 +886,7 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan +readonly RabbitMQ.Client.ConnectionConfig.CopyBodyToMemoryThreshold -> int readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumersAsync -> bool readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan @@ -953,11 +960,11 @@ static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> void +static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> void static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory body) -> void -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel) -> void static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> void diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs new file mode 100644 index 0000000000..6469806ddb --- /dev/null +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -0,0 +1,117 @@ +#nullable enable + +using System; +using System.Buffers; +using System.Diagnostics; +using System.IO.Pipelines; +using System.Threading.Tasks; +using RabbitMQ.Client.Impl; + +namespace RabbitMQ.Client +{ + internal sealed class RentedOutgoingMemory : IDisposable + { + private readonly TaskCompletionSource? _sendCompletionSource; + private bool _disposedValue; + private byte[]? _rentedArray; + private ReadOnlySequence _data; + + public RentedOutgoingMemory(ReadOnlyMemory data, byte[]? rentedArray = null, bool waitSend = false) + : this(new ReadOnlySequence(data), rentedArray, waitSend) + { + } + + public RentedOutgoingMemory(ReadOnlySequence data, byte[]? rentedArray = null, bool waitSend = false) + { + _data = data; + _rentedArray = rentedArray; + + if (waitSend) + { + _sendCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + } + + internal int Size => (int)Data.Length; + + public int RentedArraySize => _rentedArray?.Length ?? 0; + + internal ReadOnlySequence Data + { + get + { + if (_disposedValue) + { + throw new ObjectDisposedException(nameof(RentedOutgoingMemory)); + } + + return _data; + } + } + + /// + /// Mark the data as sent. + /// + /// true if the object can be disposed, false if the is waiting for the data to be sent. + public bool DidSend() + { + if (_sendCompletionSource is null) + { + return true; + } + + _sendCompletionSource.SetResult(true); + return false; + } + + /// + /// Wait for the data to be sent. + /// + /// true if the data was sent and the object can be disposed. + public ValueTask WaitForDataSendAsync() + { + return _sendCompletionSource is null ? new ValueTask(false) : WaitForFinishCore(); + + async ValueTask WaitForFinishCore() + { + await _sendCompletionSource.Task.ConfigureAwait(false); + return true; + } + } + + public void WriteTo(PipeWriter pipeWriter) + { + foreach (ReadOnlyMemory memory in Data) + { + pipeWriter.Write(memory.Span); + } + } + + private void Dispose(bool disposing) + { + if (_disposedValue) + { + return; + } + + Debug.Assert(_sendCompletionSource is null or { Task.IsCompleted: true }, "The send task should be completed before disposing."); + _disposedValue = true; + + if (disposing) + { + _data = default; + + if (_rentedArray != null) + { + ClientArrayPool.Return(_rentedArray); + _rentedArray = null; + } + } + } + + public void Dispose() + { + Dispose(disposing: true); + } + } +} diff --git a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs index 3aba1f9ab2..451dd73209 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs @@ -143,6 +143,16 @@ public sealed class ConnectionConfig /// public readonly int DispatchConsumerConcurrency; + /// + /// The threshold for when to copy the body to a temporary array. + /// + /// + /// When the body is larger than this threshold it will reuse the same buffer. Because of this + /// the buffer cannot be modified by the application. This causes + /// the socket () to block until the frame is sent. + /// + public readonly int CopyBodyToMemoryThreshold; + internal readonly Func> FrameHandlerFactoryAsync; internal ConnectionConfig(string virtualHost, string userName, string password, @@ -153,7 +163,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler, TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout, bool dispatchConsumersAsync, int dispatchConsumerConcurrency, - Func> frameHandlerFactoryAsync) + Func> frameHandlerFactoryAsync, int copyBodyToMemoryThreshold) { VirtualHost = virtualHost; UserName = userName; @@ -176,6 +186,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, DispatchConsumersAsync = dispatchConsumersAsync; DispatchConsumerConcurrency = dispatchConsumerConcurrency; FrameHandlerFactoryAsync = frameHandlerFactoryAsync; + CopyBodyToMemoryThreshold = copyBodyToMemoryThreshold; } } } diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 7a50ae9e85..ced8372ca6 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -138,6 +138,11 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor /// public const string DefaultVHost = "/"; + /// + /// Default value for the copy body to memory threshold. + /// + public const int DefaultCopyBodyToMemoryThreshold = int.MaxValue; + /// /// TLS versions enabled by default: TLSv1.2, v1.1, v1.0. /// @@ -361,6 +366,16 @@ public AmqpTcpEndpoint Endpoint /// public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize; + /// + /// The threshold for when to copy the body to a temporary array. + /// + /// + /// When the body is larger than this threshold it will reuse the same buffer. Because of this + /// the buffer cannot be modified by the application. This causes + /// the socket () to block until the frame is sent. + /// + public int CopyBodyToMemoryThreshold { get; set; } = DefaultCopyBodyToMemoryThreshold; + /// /// The uri to use for the connection. /// @@ -748,7 +763,12 @@ public async ValueTask CreateConnectionAsync(IEndpointResolver endp } } - private ConnectionConfig CreateConfig(string clientProvidedName) + internal ConnectionConfig CreateConfig() + { + return CreateConfig(ClientProvidedName); + } + + internal ConnectionConfig CreateConfig(string clientProvidedName) { return new ConnectionConfig( VirtualHost, @@ -771,7 +791,8 @@ private ConnectionConfig CreateConfig(string clientProvidedName) RequestedConnectionTimeout, DispatchConsumersAsync, ConsumerDispatchConcurrency, - CreateFrameHandlerAsync); + CreateFrameHandlerAsync, + CopyBodyToMemoryThreshold); } internal async Task CreateFrameHandlerAsync( diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 9d3248d22a..e65874c27f 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -258,7 +259,7 @@ void BasicPublish(CachedString exchange, CachedString routingKey, i /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; /// @@ -269,7 +270,29 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, in /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool? copyBody = null) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + + /// + /// Asynchronously publishes a message. + /// + /// + /// + /// Routing key must be shorter than 255 bytes. + /// + /// + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader; + + /// + /// Asynchronously publishes a message. + /// + /// + /// + /// Routing key must be shorter than 255 bytes. + /// + /// + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body = default, bool mandatory = false, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; #nullable disable diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index 74edd17fb2..f7ea8dcd80 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -140,14 +140,14 @@ public static ValueTask BasicPublishAsync(this IChannel channel, PublicationA public static void BasicPublish(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) => channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); - public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) + => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody); - public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) + public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) => channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); - public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false, bool? copyBody = null) + => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody); #nullable disable /// diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index 11653032a1..0df5edacb5 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -125,6 +125,11 @@ public interface IConnection : INetworkConnection, IDisposable /// IEnumerable ShutdownReport { get; } + /// + /// The threshold for when to copy the body to a temporary array. + /// + int CopyBodyToMemoryThreshold { get; } + /// /// Application-specific connection name, will be displayed in the management UI /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index d37f7a4764..5f9baba2a1 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; @@ -327,13 +328,21 @@ public void BasicPublish(CachedString exchange, CachedString routin where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory); - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody); public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 56962d8edc..9bc9830591 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -50,7 +50,7 @@ internal sealed partial class AutorecoveringConnection : IConnection private Connection _innerConnection; private bool _disposed; - private Connection InnerConnection + internal Connection InnerConnection { get { @@ -181,6 +181,8 @@ public event EventHandler RecoveringConsumer public IEnumerable ShutdownReport => InnerConnection.ShutdownReport; + public int CopyBodyToMemoryThreshold => InnerConnection.CopyBodyToMemoryThreshold; + public IProtocol Protocol => Endpoint.Protocol; public RecoveryAwareChannel CreateNonRecoveringChannel() diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 4aa0c31f16..6e665227be 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -507,7 +508,7 @@ protected void ChannelSend(in TMethod method, in THeader heade } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body) + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlySequence body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -516,7 +517,7 @@ protected ValueTask ModelSendAsync(in TMethod method, in THead _flowControlBlock.Wait(); } - return Session.TransmitAsync(in method, in header, body); + return Session.TransmitAsync(in method, in header, body, copyBody); } internal void OnCallbackException(CallbackExceptionEventArgs args) @@ -1258,7 +1259,7 @@ public void BasicPublish(CachedString exchange, CachedString routin ChannelSend(in cmd, in basicProperties, body); } - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1270,10 +1271,16 @@ public ValueTask BasicPublishAsync(string exchange, string routingK } var cmd = new BasicPublish(exchange, routingKey, mandatory, default); - return ModelSendAsync(in cmd, in basicProperties, body); + return ModelSendAsync(in cmd, in basicProperties, body, copyBody); } - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence(body), mandatory, copyBody); + } + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence body, bool mandatory, bool? copyBody = null) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1285,7 +1292,13 @@ public ValueTask BasicPublishAsync(CachedString exchange, CachedStr } var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - return ModelSendAsync(in cmd, in basicProperties, body); + return ModelSendAsync(in cmd, in basicProperties, body, copyBody); + } + + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory, bool? copyBody = null) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence(body), mandatory, copyBody); } public void UpdateSecret(string newSecret, string reason) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 4417d991e3..a60a28a87c 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -58,6 +58,9 @@ internal sealed partial class Connection : IConnection private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + internal bool TrackRentedBytes = false; + internal uint RentedBytes; + internal Connection(ConnectionConfig config, IFrameHandler frameHandler) { _config = config; @@ -101,6 +104,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) public IDictionary? ServerProperties { get; private set; } + public int CopyBodyToMemoryThreshold => _config.CopyBodyToMemoryThreshold; + public IEnumerable ShutdownReport => _shutdownReport; private ShutdownReportEntry[] _shutdownReport = Array.Empty(); @@ -539,7 +544,7 @@ internal void OnCallbackException(CallbackExceptionEventArgs args) _callbackExceptionWrapper.Invoke(this, args); } - internal void Write(RentedMemory frames) + internal void Write(RentedOutgoingMemory frames) { ValueTask task = _frameHandler.WriteAsync(frames); if (!task.IsCompletedSuccessfully) @@ -548,11 +553,25 @@ internal void Write(RentedMemory frames) } } - internal ValueTask WriteAsync(RentedMemory frames) + internal ValueTask WriteAsync(RentedOutgoingMemory frames) { + TrackRented(frames.RentedArraySize); + return _frameHandler.WriteAsync(frames); } + private void TrackRented(int size) + { + if (TrackRentedBytes && size > 0) + { +#if NET + Interlocked.Add(ref RentedBytes, (uint)size); +#else + Interlocked.Add(ref Unsafe.As(ref RentedBytes), size); +#endif + } + } + public void Dispose() { if (_disposed) diff --git a/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs b/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs index 7868556dce..5547d413a1 100644 --- a/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs +++ b/projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs @@ -8,7 +8,7 @@ namespace RabbitMQ.Client.client.impl #nullable enable internal readonly struct EmptyBasicProperty : IReadOnlyBasicProperties, IAmqpHeader { - internal static readonly EmptyBasicProperty Empty; + internal static readonly EmptyBasicProperty Empty = default; ushort IAmqpHeader.ProtocolClassId => ClassConstants.Basic; diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index ad5eab4c19..98cd735b87 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -113,13 +113,29 @@ internal static class BodySegment public const int FrameSize = BaseFrameSize; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int WriteTo(Span span, ushort channel, ReadOnlySpan body) + public static int WriteTo(ref SequenceBuilder data, Memory buffer, ushort channel, ReadOnlySequence body, bool copyBody) { const int StartBodyArgument = StartPayload; - NetworkOrderSerializer.WriteUInt64(ref span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8)); - body.CopyTo(span.Slice(StartBodyArgument)); - span[StartPayload + body.Length] = Constants.FrameEnd; - return body.Length + BaseFrameSize; + NetworkOrderSerializer.WriteUInt64(ref buffer.Span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8)); + + if (copyBody) + { + int length = (int)body.Length; + Span span = buffer.Span; + + body.CopyTo(span.Slice(StartBodyArgument)); + span[StartPayload + length] = Constants.FrameEnd; + + data.Append(buffer.Slice(0, length + BaseFrameSize)); + + return length + BaseFrameSize; + } + + data.Append(buffer.Slice(0, StartBodyArgument)); + data.Append(body); + buffer.Span[StartBodyArgument] = Constants.FrameEnd; + data.Append(buffer.Slice(StartBodyArgument, 1)); + return BaseFrameSize; } } @@ -139,18 +155,18 @@ internal static class Heartbeat /// private static ReadOnlySpan Payload => new byte[] { Constants.FrameHeartbeat, 0, 0, 0, 0, 0, 0, Constants.FrameEnd }; - public static RentedMemory GetHeartbeatFrame() + public static RentedOutgoingMemory GetHeartbeatFrame() { // Is returned by SocketFrameHandler.WriteLoop byte[] buffer = ClientArrayPool.Rent(FrameSize); Payload.CopyTo(buffer); var mem = new ReadOnlyMemory(buffer, 0, FrameSize); - return new RentedMemory(mem, buffer); + return new RentedOutgoingMemory(mem, buffer); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static RentedMemory SerializeToFrames(ref T method, ushort channelNumber) + public static RentedOutgoingMemory SerializeToFrames(ref T method, ushort channelNumber) where T : struct, IOutgoingAmqpMethod { int size = Method.FrameSize + method.GetRequiredBufferSize(); @@ -161,35 +177,57 @@ public static RentedMemory SerializeToFrames(ref T method, ushort channelNumb System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); var mem = new ReadOnlyMemory(array, 0, size); - return new RentedMemory(mem, array); + return new RentedOutgoingMemory(mem, array); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static RentedMemory SerializeToFrames(ref TMethod method, ref THeader header, ReadOnlyMemory body, ushort channelNumber, int maxBodyPayloadBytes) + public static RentedOutgoingMemory SerializeToFrames(ref TMethod method, ref THeader header, ReadOnlySequence body, ushort channelNumber, int maxBodyPayloadBytes, bool copyBody = true) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { - int remainingBodyBytes = body.Length; + int remainingBodyBytes = (int)body.Length; int size = Method.FrameSize + Header.FrameSize + method.GetRequiredBufferSize() + header.GetRequiredBufferSize() + - BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes) + remainingBodyBytes; + BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes); + + if (copyBody) + { + size += remainingBodyBytes; + } // Will be returned by SocketFrameWriter.WriteLoop byte[] array = ClientArrayPool.Rent(size); + SequenceBuilder sequenceBuilder = new SequenceBuilder(); + Memory buffer = array.AsMemory(); + int offset = Method.WriteTo(array, channelNumber, ref method); offset += Header.WriteTo(array.AsSpan(offset), channelNumber, ref header, remainingBodyBytes); - ReadOnlySpan bodySpan = body.Span; + + sequenceBuilder.Append(buffer.Slice(0, offset)); + buffer = buffer.Slice(offset); + + ReadOnlySequence remainingBody = body; while (remainingBodyBytes > 0) { int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes; - offset += BodySegment.WriteTo(array.AsSpan(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize)); + int segmentSize = BodySegment.WriteTo(ref sequenceBuilder, buffer, channelNumber, remainingBody.Slice(remainingBody.Length - remainingBodyBytes, frameSize), copyBody); + + buffer = buffer.Slice(segmentSize); + offset += segmentSize; remainingBodyBytes -= frameSize; } System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); - var mem = new ReadOnlyMemory(array, 0, size); - return new RentedMemory(mem, array); + return new RentedOutgoingMemory(sequenceBuilder.Build(), array, waitSend: !copyBody); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static RentedOutgoingMemory SerializeToFrames(ref TMethod method, ref THeader header, ReadOnlyMemory body, ushort channelNumber, int maxBodyPayloadBytes) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader + { + return SerializeToFrames(ref method, ref header, new ReadOnlySequence(body), channelNumber, maxBodyPayloadBytes); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs index beeaf85de2..df07a7e1ee 100644 --- a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs @@ -73,6 +73,6 @@ internal interface IFrameHandler Task SendProtocolHeaderAsync(CancellationToken cancellationToken); // TODO cancellation token for write timeout / cancellation? - ValueTask WriteAsync(RentedMemory frames); + ValueTask WriteAsync(RentedOutgoingMemory frames); } } diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 1d5a34f76b..ca8076b439 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Threading.Tasks; using RabbitMQ.Client.Framing.Impl; @@ -79,13 +80,21 @@ internal interface ISession void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod; + void Transmit(in TMethod cmd, in THeader header, ReadOnlySequence body) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader; + void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader; ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingAmqpMethod; - ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body) + ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool? copyBody = null) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader; + + ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader; } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index a433626878..fe078ff338 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -151,7 +152,7 @@ public virtual ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingA return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber)); } - public void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) + public void Transmit(in TMethod cmd, in THeader header, ReadOnlySequence body) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -163,7 +164,14 @@ public void Transmit(in TMethod cmd, in THeader header, ReadOn Connection.Write(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); } - public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body) + public void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader + { + Transmit(cmd, header, new ReadOnlySequence(body)); + } + + public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlySequence body, bool? copyBody = null) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -172,7 +180,16 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); + copyBody ??= body.Length <= Connection.CopyBodyToMemoryThreshold; + + return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value)); + } + + public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body, bool? copyBody = null) + where TMethod : struct, IOutgoingAmqpMethod + where THeader : IAmqpHeader + { + return TransmitAsync(cmd, header, new ReadOnlySequence(body), copyBody); } private void ThrowAlreadyClosedException() diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index ef922b8c59..b319533390 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -48,8 +48,8 @@ internal sealed class SocketFrameHandler : IFrameHandler private readonly Func _socketFactory; private readonly TimeSpan _connectionTimeout; - private readonly ChannelWriter _channelWriter; - private readonly ChannelReader _channelReader; + private readonly ChannelWriter _channelWriter; + private readonly ChannelReader _channelReader; private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1); private IPAddress[] _amqpTcpEndpointAddresses; @@ -76,7 +76,7 @@ public SocketFrameHandler(AmqpTcpEndpoint amqpTcpEndpoint, _readTimeout = readTimeout; _writeTimeout = writeTimeout; - var channel = Channel.CreateBounded( + var channel = Channel.CreateBounded( new BoundedChannelOptions(128) { AllowSynchronousContinuations = false, @@ -296,7 +296,7 @@ await _pipeWriter.FlushAsync(cancellationToken) .ConfigureAwait(false); } - public async ValueTask WriteAsync(RentedMemory frames) + public async ValueTask WriteAsync(RentedOutgoingMemory frames) { if (_closed) { @@ -307,6 +307,14 @@ public async ValueTask WriteAsync(RentedMemory frames) { await _channelWriter.WriteAsync(frames) .ConfigureAwait(false); + + bool didSend = await frames.WaitForDataSendAsync() + .ConfigureAwait(false); + + if (didSend) + { + frames.Dispose(); + } } } @@ -332,17 +340,21 @@ private async Task WriteLoop() { while (await _channelReader.WaitToReadAsync().ConfigureAwait(false)) { - while (_channelReader.TryRead(out RentedMemory frames)) + while (_channelReader.TryRead(out RentedOutgoingMemory frames)) { try { - await _pipeWriter.WriteAsync(frames.Memory) + frames.WriteTo(_pipeWriter); + await _pipeWriter.FlushAsync() .ConfigureAwait(false); RabbitMqClientEventSource.Log.CommandSent(frames.Size); } finally { - frames.Dispose(); + if (frames.DidSend()) + { + frames.Dispose(); + } } } diff --git a/projects/RabbitMQ.Client/util/SequenceBuilder.cs b/projects/RabbitMQ.Client/util/SequenceBuilder.cs new file mode 100644 index 0000000000..6d32607d6a --- /dev/null +++ b/projects/RabbitMQ.Client/util/SequenceBuilder.cs @@ -0,0 +1,90 @@ +#nullable enable +using System; +using System.Buffers; +using System.Runtime.InteropServices; +using RabbitMQ.Client.Impl; + +namespace RabbitMQ.Util; + +internal ref struct SequenceBuilder +{ + private Segment? _first; + private Segment? _current; + + public SequenceBuilder() + { + _first = _current = null; + } + + public void Append(ReadOnlySequence sequence) + { + SequencePosition pos = sequence.Start; + while (sequence.TryGet(ref pos, out ReadOnlyMemory mem)) + { + Append(mem); + } + } + + public void Append(ReadOnlyMemory memory) + { + if (_current == null) + { + _first = _current = new Segment(memory); + } + else if (!_current.TryMerge(memory)) + { + _current = _current.Append(memory); + } + } + + public ReadOnlySequence Build() + { + if (_first == null || _current == null) + { + return default; + } + + return new ReadOnlySequence(_first, 0, _current, _current.Memory.Length); + } + + private sealed class Segment : ReadOnlySequenceSegment + { + public Segment(ReadOnlyMemory memory) + { + Memory = memory; + } + + /// + /// Try to merge the next memory into this chunk. + /// + /// + /// Used in that can write the same array when the body is being copied. + /// + /// The next memory. + /// true if the memory was merged; otherwise false. + public bool TryMerge(ReadOnlyMemory next) + { + if (MemoryMarshal.TryGetArray(Memory, out ArraySegment segment) && + MemoryMarshal.TryGetArray(next, out ArraySegment nextSegment) && + segment.Array == nextSegment.Array && + nextSegment.Offset == segment.Offset + segment.Count) + { + Memory = segment.Array.AsMemory(segment.Offset, segment.Count + nextSegment.Count); + return true; + } + + return false; + } + + public Segment Append(ReadOnlyMemory memory) + { + Segment nextChunk = new(memory) + { + RunningIndex = RunningIndex + Memory.Length + }; + + Next = nextChunk; + return nextChunk; + } + } +} diff --git a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs index e6a526a130..fc89f5ccc7 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs @@ -67,5 +67,76 @@ public async Task TestQueuePurgeAsync() Assert.True(await publishSyncSource.Task); Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q)); } + + [Fact] + public async Task TestNonCopyingBody() + { + const int size = 1024; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytesAsync()) + { + await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: false); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is smaller than the size of the body + // since we're not copying the body. Only the frame headers are rented. + Assert.True(rentedBytes < size); + } + + [Fact] + public async Task TestCopyingBody() + { + const int size = 1024; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytesAsync()) + { + await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); + } + + [Fact] + public async Task TestDefaultCopyingBody() + { + Assert.Equal(int.MaxValue, _conn.CopyBodyToMemoryThreshold); + + const int size = 1024; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytesAsync()) + { + await _channel.BasicPublishAsync(string.Empty, q, body); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); + } } } diff --git a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs new file mode 100644 index 0000000000..65e8e39000 --- /dev/null +++ b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs @@ -0,0 +1,66 @@ +using System.Threading.Tasks; +using RabbitMQ.Client; +using Xunit; +using Xunit.Abstractions; + +namespace Test.AsyncIntegration; + +public class TestBasicPublishCopyBodyAsync : AsyncIntegrationFixture +{ + public TestBasicPublishCopyBodyAsync(ITestOutputHelper output) : base(output) + { + } + + protected override ConnectionFactory CreateConnectionFactory() + { + var factory = base.CreateConnectionFactory(); + factory.CopyBodyToMemoryThreshold = 1024; + return factory; + } + + [Theory] + [InlineData(512)] + [InlineData(1024)] + public async Task TestNonCopyingBody(ushort size) + { + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytesAsync()) + { + await _channel.BasicPublishAsync(string.Empty, q, body); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); + } + + [Theory] + [InlineData(1025)] + [InlineData(2048)] + public async Task TestCopyingBody(ushort size) + { + QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null); + byte[] body = GetRandomBody(size); + + uint rentedBytes; + + using (var result = await TrackRentedBytesAsync()) + { + await _channel.BasicPublishAsync(string.Empty, q, body); + rentedBytes = result.RentedBytes; + } + + Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); + + // It is expected that the rented bytes is smaller than the size of the body + // since we're not copying the body. Only the frame headers are rented. + Assert.True(rentedBytes < size); + } +} diff --git a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs index 9626bf4fa8..4e431ecbdc 100644 --- a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -55,37 +55,28 @@ public override async Task InitializeAsync() _conn.ConnectionShutdown += HandleConnectionShutdown; } - [Fact] - public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync() + [Theory] + [InlineData(false)] + [InlineData(true)] + public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync(bool copyBody) { - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), 30); + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), copyBody, 30); } - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize64Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize256Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize1024Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024); - } - - private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30) + [Theory] + [InlineData(64, false)] + [InlineData(64, true)] + [InlineData(256, false)] + [InlineData(256, true)] + [InlineData(1024, false)] + [InlineData(1024, true)] + public Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, bool copyBody, int iterations = 30) { byte[] body = GetRandomBody(length); - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations); + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, copyBody, iterations); } - private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations) + private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, bool copyBody, int iterations) { return TestConcurrentChannelOperationsAsync(async (conn) => { @@ -128,7 +119,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); for (ushort j = 0; j < _messageCount; j++) { - await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true); + await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true, copyBody: copyBody); } Assert.True(await tcs.Task); diff --git a/projects/Test/Common/IntegrationFixtureBase.cs b/projects/Test/Common/IntegrationFixtureBase.cs index b177dd6e15..1777e424f6 100644 --- a/projects/Test/Common/IntegrationFixtureBase.cs +++ b/projects/Test/Common/IntegrationFixtureBase.cs @@ -38,6 +38,7 @@ using System.Reflection; using System.Text; using System.Threading; +using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; @@ -48,6 +49,8 @@ namespace Test { public abstract class IntegrationFixtureBase : IDisposable { + private readonly SemaphoreSlim _byteTrackingLock = new SemaphoreSlim(1, 1); + private static bool s_isRunningInCI = false; private static bool s_isWindows = false; private static bool s_isVerbose = false; @@ -371,7 +374,7 @@ protected void Wait(ManualResetEventSlim latch, TimeSpan timeSpan, string desc) $"waiting {timeSpan.TotalSeconds} seconds on a latch for '{desc}' timed out"); } - protected ConnectionFactory CreateConnectionFactory() + protected virtual ConnectionFactory CreateConnectionFactory() { string now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); return new ConnectionFactory @@ -418,6 +421,39 @@ protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action a(args); } + protected async Task TrackRentedBytesAsync() + { + Connection connection; + + if (_conn is AutorecoveringConnection autorecoveringConnection) + { + connection = autorecoveringConnection.InnerConnection as Connection; + } + else + { + connection = _conn as Connection; + } + + if (connection is null) + { + throw new InvalidOperationException("Cannot track rented bytes without a connection"); + } + + await _byteTrackingLock.WaitAsync(); + + try + { + connection.RentedBytes = 0; + connection.TrackRentedBytes = true; + return new TrackRentedByteResult(connection, _byteTrackingLock); + } + catch + { + _byteTrackingLock.Release(); + throw; + } + } + private static void InitIsRunningInCI() { bool ci; diff --git a/projects/Test/Common/TrackRentedByteResult.cs b/projects/Test/Common/TrackRentedByteResult.cs new file mode 100644 index 0000000000..48b9a0e50f --- /dev/null +++ b/projects/Test/Common/TrackRentedByteResult.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading; +using RabbitMQ.Client.Framing.Impl; + +namespace Test; + +public sealed class TrackRentedByteResult : IDisposable +{ + private readonly Connection _connection; + private readonly SemaphoreSlim _byteTrackingLock; + + internal TrackRentedByteResult(Connection connection, SemaphoreSlim byteTrackingLock) + { + _connection = connection; + _byteTrackingLock = byteTrackingLock; + } + + public uint RentedBytes => _connection.RentedBytes; + + public void Dispose() + { + _byteTrackingLock.Release(); + } +} diff --git a/projects/Test/Unit/TestFrameFormatting.cs b/projects/Test/Unit/TestFrameFormatting.cs index 9bbb31e46d..268f52596c 100644 --- a/projects/Test/Unit/TestFrameFormatting.cs +++ b/projects/Test/Unit/TestFrameFormatting.cs @@ -30,9 +30,11 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers; using RabbitMQ.Client; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Client.Impl; +using RabbitMQ.Util; using Xunit; namespace Test.Unit @@ -42,12 +44,15 @@ public class TestFrameFormatting : WireFormattingFixture [Fact] public void HeartbeatFrame() { - RentedMemory sfc = Framing.Heartbeat.GetHeartbeatFrame(); - ReadOnlySpan frameSpan = sfc.Memory.Span; + RentedOutgoingMemory sfc = Framing.Heartbeat.GetHeartbeatFrame(); try { - Assert.Equal(8, frameSpan.Length); + Assert.Equal(8, sfc.Size); + + Span frameSpan = stackalloc byte[8]; + sfc.Data.CopyTo(frameSpan); + Assert.Equal(Constants.FrameHeartbeat, frameSpan[0]); Assert.Equal(0, frameSpan[1]); // channel Assert.Equal(0, frameSpan[2]); // channel @@ -59,7 +64,7 @@ public void HeartbeatFrame() } finally { - ClientArrayPool.Return(sfc.RentedArray); + sfc.Dispose(); } } @@ -133,14 +138,29 @@ public void MethodFrame() Assert.Equal(Constants.FrameEnd, frameBytes[18]); } - [Fact] - public void BodySegmentFrame() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void BodySegmentFrame(bool copyBody) { const int Channel = 3; byte[] payload = new byte[4]; - byte[] frameBytes = new byte[RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize + payload.Length]; - RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(frameBytes, Channel, payload); + byte[] buffer = new byte[RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize + (copyBody ? payload.Length : 0)]; + + SequenceBuilder builder = new SequenceBuilder(); + + RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(ref builder, buffer, Channel, new ReadOnlySequence(payload), copyBody); + + var sequence = builder.Build(); + + if (copyBody) + { + // When copying the body, the memory is sequential + Assert.True(sequence.IsSingleSegment); + } + + byte[] frameBytes = sequence.ToArray(); Assert.Equal(8, RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize); Assert.Equal(Constants.FrameBody, frameBytes[0]); diff --git a/projects/Test/Unit/TestRentedOutgoingMemory.cs b/projects/Test/Unit/TestRentedOutgoingMemory.cs new file mode 100644 index 0000000000..955d43d50c --- /dev/null +++ b/projects/Test/Unit/TestRentedOutgoingMemory.cs @@ -0,0 +1,65 @@ +using System.Threading.Tasks; +using RabbitMQ.Client; +using Xunit; + +namespace Test.Unit; + +public class TestRentedOutgoingMemory +{ + [Fact] + public async Task TestNonBlocking() + { + // Arrange + byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; + RentedOutgoingMemory rentedMemory = new RentedOutgoingMemory(buffer, waitSend: false); + + // Act + var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); + var timeoutTask = Task.Delay(100); + var completedTask = await Task.WhenAny(timeoutTask, waitTask); + bool didSend = rentedMemory.DidSend(); + + // Assert + Assert.Equal(waitTask, completedTask); + Assert.False(!waitTask.IsCompleted || await waitTask); + Assert.True(didSend); + } + + [Fact] + public async Task TestBlocking() + { + // Arrange + byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; + RentedOutgoingMemory rentedMemory = new RentedOutgoingMemory(buffer, waitSend: true); + + // Act + var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); + var timeoutTask = Task.Delay(100); + var completedTask = await Task.WhenAny(timeoutTask, waitTask); + + // Assert + Assert.Equal(timeoutTask, completedTask); + Assert.False(waitTask.IsCompleted); + } + + [Fact] + public async Task TestBlockingCompleted() + { + // Arrange + byte[] buffer = new byte[] { 1, 2, 3, 4, 5 }; + RentedOutgoingMemory rentedMemory = new RentedOutgoingMemory(buffer, waitSend: true); + + // Act + var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); + var timeoutTask = Task.Delay(100); + + bool didSend = rentedMemory.DidSend(); + + var completedTask = await Task.WhenAny(timeoutTask, waitTask); + + // Assert + Assert.Equal(waitTask, completedTask); + Assert.True(waitTask.IsCompleted && await waitTask); + Assert.False(didSend); + } +} diff --git a/projects/Test/Unit/TestSequenceBuilder.cs b/projects/Test/Unit/TestSequenceBuilder.cs new file mode 100644 index 0000000000..8bd8b7d8dd --- /dev/null +++ b/projects/Test/Unit/TestSequenceBuilder.cs @@ -0,0 +1,89 @@ +using System; +using System.Buffers; +using System.Runtime.InteropServices; +using RabbitMQ.Util; +using Xunit; + +namespace Test.Unit; + +public class TestSequenceBuilder +{ + [Fact] + public void TestSingleMemory() + { + // Arrange + byte[] array = new byte[] { 1, 2, 3, 4, 5, 6 }; + + // Act + SequenceBuilder builder = new SequenceBuilder(); + builder.Append(array); + + var sequence = builder.Build(); + + // Assert + Assert.Equal(6, sequence.Length); + Assert.True(sequence.IsSingleSegment); + Assert.True(MemoryMarshal.TryGetArray(sequence.First, out var segment)); + Assert.Equal(array, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(6, segment.Count); + } + + [Fact] + public void TestMerge() + { + // Arrange + byte[] array = new byte[] { 1, 2, 3, 4, 5, 6 }; + Memory first = array.AsMemory(0, 3); + Memory second = array.AsMemory(3, 3); + + // Act + SequenceBuilder builder = new SequenceBuilder(); + + builder.Append(first); + builder.Append(second); + + var sequence = builder.Build(); + + // Assert + Assert.Equal(6, sequence.Length); + Assert.True(sequence.IsSingleSegment); + Assert.True(MemoryMarshal.TryGetArray(sequence.First, out var segment)); + Assert.Equal(array, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(6, segment.Count); + } + + [Fact] + public void TestMultipleMemory() + { + // Arrange + byte[] first = new byte[] { 1, 2, 3 }; + byte[] second = new byte[] { 4, 5, 6 }; + + // Act + SequenceBuilder builder = new SequenceBuilder(); + + builder.Append(first); + builder.Append(second); + + var sequence = builder.Build(); + + // Assert + Assert.Equal(6, sequence.Length); + Assert.Equal(new byte[] { 1, 2, 3, 4, 5, 6 }, sequence.ToArray()); + + var enumerator = sequence.GetEnumerator(); + Assert.True(enumerator.MoveNext()); + Assert.True(MemoryMarshal.TryGetArray(enumerator.Current, out var segment)); + Assert.Equal(first, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(3, segment.Count); + + Assert.True(enumerator.MoveNext()); + Assert.True(MemoryMarshal.TryGetArray(enumerator.Current, out segment)); + Assert.Equal(second, segment.Array); + Assert.Equal(0, segment.Offset); + Assert.Equal(3, segment.Count); + } +}