Skip to content

Commit ec8e552

Browse files
authored
Merge pull request #1224 from rabbitmq/rabbitmq-dotnet-client-1223-6.x
Better integrate max message size
2 parents f69cbbf + 5930d91 commit ec8e552

File tree

11 files changed

+210
-10
lines changed

11 files changed

+210
-10
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
_site/
2+
13
###################
24
## Generated files
35
###################

CHANGELOG.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,26 @@
1+
## Changes Between 6.3.1 and 6.4.0
2+
3+
This release adds the ability to specify a maximum message size when receiving data. The default
4+
values are:
5+
6+
* RabbitMQ .NET client 7.0.0 and beyond: 128MiB
7+
* RabbitMQ .NET client 6.4.0 up to 7.0.0: no limit by default
8+
9+
Receiving a frame that specifies a content larger than the limit will throw an execption. This is to
10+
help prevent situations as described in [this discussion](https://github.com/rabbitmq/rabbitmq-dotnet-client/discussions/1213).
11+
12+
To set a limit, use the set `MaxMessageSize` on your `ConnectionFactory` before opening connections:
13+
14+
```
15+
// This sets the limit to 512MiB
16+
var cf = new ConnectionFactory();
17+
cf.MaxMessageSize = 536870912;
18+
var conn = cf.CreateConnection()`
19+
```
20+
21+
GitHub milestone: [`6.4.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/58?closed=1)
22+
Diff: [link](https://github.com/rabbitmq/rabbitmq-dotnet-client/compare/v6.3.1...v6.4.0)
23+
124
## Changes Between 6.3.0 and 6.3.1
225

326
GitHub milestone: [`6.3.1`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/57?closed=1)

projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,32 @@ public class AmqpTcpEndpoint// : ICloneable
6060

6161
private int _port;
6262

63+
private readonly uint _maxMessageSize;
64+
6365
/// <summary>
6466
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
6567
/// </summary>
6668
/// <param name="hostName">Hostname.</param>
6769
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
6870
/// <param name="ssl">Ssl option.</param>
69-
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl)
71+
/// <param name="maxMessageSize">Maximum message size from RabbitMQ. 0 means "unlimited"</param>
72+
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint maxMessageSize)
7073
{
7174
HostName = hostName;
7275
_port = portOrMinusOne;
7376
Ssl = ssl;
77+
_maxMessageSize = maxMessageSize;
78+
}
79+
80+
/// <summary>
81+
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
82+
/// </summary>
83+
/// <param name="hostName">Hostname.</param>
84+
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
85+
/// <param name="ssl">Ssl option.</param>
86+
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl) :
87+
this(hostName, portOrMinusOne, ssl, 0)
88+
{
7489
}
7590

7691
/// <summary>
@@ -116,7 +131,7 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port)
116131
/// <returns>A copy with the same hostname, port, and TLS settings</returns>
117132
public object Clone()
118133
{
119-
return new AmqpTcpEndpoint(HostName, _port, Ssl);
134+
return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxMessageSize);
120135
}
121136

122137
/// <summary>
@@ -126,7 +141,7 @@ public object Clone()
126141
/// <returns>A copy with the provided hostname and port/TLS settings of this endpoint</returns>
127142
public AmqpTcpEndpoint CloneWithHostname(string hostname)
128143
{
129-
return new AmqpTcpEndpoint(hostname, _port, Ssl);
144+
return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxMessageSize);
130145
}
131146

132147
/// <summary>
@@ -176,9 +191,12 @@ public IProtocol Protocol
176191
public SslOption Ssl { get; set; }
177192

178193
/// <summary>
179-
/// Set the maximum size for a message in bytes. The default value is 0 (unlimited)
194+
/// Get the maximum size for a message in bytes. The default value is 0 (unlimited)
180195
/// </summary>
181-
public uint MaxMessageSize { get; set; }
196+
public uint MaxMessageSize
197+
{
198+
get { return _maxMessageSize; }
199+
}
182200

183201
/// <summary>
184202
/// Construct an instance from a protocol and an address in "hostname:port" format.

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ namespace RabbitMQ.Client
5757
/// factory.VirtualHost = ConnectionFactory.DefaultVHost;
5858
/// factory.HostName = hostName;
5959
/// factory.Port = AmqpTcpEndpoint.UseDefaultPort;
60+
/// factory.MaxMessageSize = 512 * 1024 * 1024;
6061
/// //
6162
/// IConnection conn = factory.CreateConnection();
6263
/// //
@@ -103,6 +104,13 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF
103104
/// </summary>
104105
public const uint DefaultFrameMax = 0;
105106

107+
/// <summary>
108+
/// Default value for the maximum allowed message size, in bytes, from RabbitMQ.
109+
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
110+
/// Note: the default is 0 which means "unlimited".
111+
/// </summary>
112+
public const uint DefaultMaxMessageSize = 0;
113+
106114
/// <summary>
107115
/// Default value for desired heartbeat interval. Default is 60 seconds,
108116
/// TimeSpan.Zero means "heartbeats are disabled".
@@ -276,12 +284,13 @@ public ConnectionFactory()
276284
/// </summary>
277285
public AmqpTcpEndpoint Endpoint
278286
{
279-
get { return new AmqpTcpEndpoint(HostName, Port, Ssl); }
287+
get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxMessageSize); }
280288
set
281289
{
282290
Port = value.Port;
283291
HostName = value.HostName;
284292
Ssl = value.Ssl;
293+
MaxMessageSize = value.MaxMessageSize;
285294
}
286295
}
287296

@@ -325,6 +334,12 @@ public AmqpTcpEndpoint Endpoint
325334
/// </summary>
326335
public string VirtualHost { get; set; } = DefaultVHost;
327336

337+
/// <summary>
338+
/// Maximum allowed message size, in bytes, from RabbitMQ.
339+
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
340+
/// </summary>
341+
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;
342+
328343
/// <summary>
329344
/// The uri to use for the connection.
330345
/// </summary>

projects/RabbitMQ.Client/client/exceptions/HardProtocolException.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,20 @@ namespace RabbitMQ.Client.Exceptions
3535
///requiring a connection.close.</summary>
3636
public abstract class HardProtocolException : ProtocolException
3737
{
38+
protected readonly bool _canShutdownCleanly = true;
39+
3840
protected HardProtocolException(string message) : base(message)
3941
{
4042
}
43+
44+
protected HardProtocolException(string message, bool canShutdownCleanly) : base(message)
45+
{
46+
_canShutdownCleanly= canShutdownCleanly;
47+
}
48+
49+
public bool CanShutdownCleanly
50+
{
51+
get { return _canShutdownCleanly; }
52+
}
4153
}
4254
}

projects/RabbitMQ.Client/client/exceptions/MalformedFrameException.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public MalformedFrameException(string message) : base(message)
4646
{
4747
}
4848

49+
public MalformedFrameException(string message, bool canShutdownCleanly) :
50+
base(message, canShutdownCleanly)
51+
{
52+
}
53+
4954
public override ushort ReplyCode
5055
{
5156
get { return Constants.FrameError; }

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe)
479479
hpe.ShutdownReason.ReplyCode,
480480
hpe.ShutdownReason.ReplyText));
481481
}
482-
return true;
482+
return hpe.CanShutdownCleanly;
483483
}
484484
catch (IOException ioe)
485485
{

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, A
246246
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4));
247247
if ((maxMessageSize > 0) && (payloadSize > maxMessageSize))
248248
{
249-
throw new MalformedFrameException($"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes");
249+
string msg = $"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes";
250+
throw new MalformedFrameException(message: msg, canShutdownCleanly: false);
250251
}
251252

252253
const int EndMarkerLength = 1;

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ namespace RabbitMQ.Client
1111
public AmqpTcpEndpoint(string hostName, int portOrMinusOne = -1) { }
1212
public AmqpTcpEndpoint(System.Uri uri, RabbitMQ.Client.SslOption ssl) { }
1313
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) { }
14+
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxMessageSize) { }
1415
public System.Net.Sockets.AddressFamily AddressFamily { get; set; }
1516
public string HostName { get; set; }
16-
public uint MaxMessageSize { get; set; }
17+
public uint MaxMessageSize { get; }
1718
public int Port { get; set; }
1819
public RabbitMQ.Client.IProtocol Protocol { get; }
1920
public RabbitMQ.Client.SslOption Ssl { get; set; }
@@ -72,6 +73,7 @@ namespace RabbitMQ.Client
7273
{
7374
public const ushort DefaultChannelMax = 2047;
7475
public const uint DefaultFrameMax = 0u;
76+
public const uint DefaultMaxMessageSize = 0u;
7577
public const string DefaultPass = "guest";
7678
public const string DefaultUser = "guest";
7779
public const string DefaultVHost = "/";
@@ -91,6 +93,7 @@ namespace RabbitMQ.Client
9193
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
9294
public System.TimeSpan HandshakeContinuationTimeout { get; set; }
9395
public string HostName { get; set; }
96+
public uint MaxMessageSize { get; set; }
9497
public System.Buffers.ArrayPool<byte> MemoryPool { get; set; }
9598
public System.TimeSpan NetworkRecoveryInterval { get; set; }
9699
public string Password { get; set; }
@@ -728,11 +731,15 @@ namespace RabbitMQ.Client.Exceptions
728731
}
729732
public abstract class HardProtocolException : RabbitMQ.Client.Exceptions.ProtocolException
730733
{
734+
protected readonly bool _canShutdownCleanly;
731735
protected HardProtocolException(string message) { }
736+
protected HardProtocolException(string message, bool canShutdownCleanly) { }
737+
public bool CanShutdownCleanly { get; }
732738
}
733739
public class MalformedFrameException : RabbitMQ.Client.Exceptions.HardProtocolException
734740
{
735741
public MalformedFrameException(string message) { }
742+
public MalformedFrameException(string message, bool canShutdownCleanly) { }
736743
public override ushort ReplyCode { get; }
737744
}
738745
[System.Serializable]

projects/Unit/TestBasicPublish.cs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Text;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using NUnit.Framework;
@@ -101,5 +102,96 @@ public void CanNotModifyPayloadAfterPublish()
101102
m.BasicCancel(tag);
102103
}
103104
}
105+
106+
[Test]
107+
public void TestMaxMessageSize()
108+
{
109+
var re = new ManualResetEventSlim();
110+
const ushort maxMsgSize = 1024;
111+
112+
int count = 0;
113+
byte[] msg0 = Encoding.UTF8.GetBytes("hi");
114+
115+
var r = new System.Random();
116+
byte[] msg1 = new byte[maxMsgSize * 2];
117+
r.NextBytes(msg1);
118+
119+
var cf = new ConnectionFactory();
120+
cf.AutomaticRecoveryEnabled = false;
121+
cf.TopologyRecoveryEnabled = false;
122+
cf.MaxMessageSize = maxMsgSize;
123+
124+
bool sawConnectionShutdown = false;
125+
bool sawModelShutdown = false;
126+
bool sawConsumerRegistered = false;
127+
bool sawConsumerCancelled = false;
128+
129+
using (IConnection c = cf.CreateConnection())
130+
{
131+
c.ConnectionShutdown += (o, a) =>
132+
{
133+
sawConnectionShutdown= true;
134+
};
135+
136+
Assert.AreEqual(maxMsgSize, cf.MaxMessageSize);
137+
Assert.AreEqual(maxMsgSize, cf.Endpoint.MaxMessageSize);
138+
Assert.AreEqual(maxMsgSize, c.Endpoint.MaxMessageSize);
139+
140+
using (IModel m = c.CreateModel())
141+
{
142+
m.ModelShutdown += (o, a) =>
143+
{
144+
sawModelShutdown= true;
145+
};
146+
147+
m.CallbackException += (o, a) =>
148+
{
149+
Assert.Fail("Unexpected m.CallbackException");
150+
};
151+
152+
QueueDeclareOk q = m.QueueDeclare();
153+
IBasicProperties bp = m.CreateBasicProperties();
154+
155+
var consumer = new EventingBasicConsumer(m);
156+
157+
consumer.Shutdown += (o, a) =>
158+
{
159+
re.Set();
160+
};
161+
162+
consumer.Registered += (o, a) =>
163+
{
164+
sawConsumerRegistered = true;
165+
};
166+
167+
consumer.Unregistered += (o, a) =>
168+
{
169+
Assert.Fail("Unexpected consumer.Unregistered");
170+
};
171+
172+
consumer.ConsumerCancelled += (o, a) =>
173+
{
174+
sawConsumerCancelled = true;
175+
};
176+
177+
consumer.Received += (o, a) =>
178+
{
179+
Interlocked.Increment(ref count);
180+
};
181+
182+
string tag = m.BasicConsume(q.QueueName, true, consumer);
183+
184+
m.BasicPublish("", q.QueueName, bp, msg0);
185+
m.BasicPublish("", q.QueueName, bp, msg1);
186+
Assert.IsTrue(re.Wait(TimeSpan.FromSeconds(5)));
187+
188+
Assert.AreEqual(1, count);
189+
Assert.IsTrue(sawConnectionShutdown);
190+
Assert.IsTrue(sawModelShutdown);
191+
Assert.IsTrue(sawConsumerRegistered);
192+
Assert.IsTrue(sawConsumerCancelled);
193+
}
194+
}
195+
}
104196
}
105197
}

0 commit comments

Comments
 (0)