Skip to content

Commit 1617300

Browse files
committed
Add delayed message support for RabbitMQ.
1 parent cb9625b commit 1617300

File tree

6 files changed

+99
-61
lines changed

6 files changed

+99
-61
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Globalization;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using DotNetCore.CAP.Messages;
7+
8+
// ReSharper disable once CheckNamespace
9+
namespace DotNetCore.CAP
10+
{
11+
public static class CapPublisherExtensions
12+
{
13+
/// <summary>
14+
/// Schedule a message to be published at the feature time with callback name.
15+
/// <para>SHOULD BE ENABLE RabbitMQ <b>rabbitmq_delayed_message_exchange</b> PLUGINS.</para>
16+
/// </summary>
17+
/// <typeparam name="T">content object</typeparam>
18+
/// <param name="delayTime">The delay for message to published.</param>
19+
/// <param name="name">the topic name or exchange router key.</param>
20+
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
21+
/// <param name="callbackName">callback subscriber name.</param>
22+
/// <param name="cancellationToken"></param>
23+
/// <param name="publisher"></param>
24+
public static async Task PublishDelayAsync<T>(this ICapPublisher publisher, TimeSpan delayTime,
25+
string name, T? contentObj, string? callbackName = null, CancellationToken cancellationToken = default)
26+
{
27+
var dic = new Dictionary<string, string?>
28+
{
29+
{Headers.CallbackName, callbackName},
30+
{"x-delay", delayTime.TotalMilliseconds.ToString(CultureInfo.InvariantCulture)}
31+
};
32+
33+
await publisher.PublishAsync(name, contentObj, dic, cancellationToken).ConfigureAwait(false);
34+
}
35+
36+
/// <summary>
37+
/// Schedule a message to be published at the feature time with custom headers.
38+
/// <para>SHOULD BE ENABLE RabbitMQ <b>rabbitmq_delayed_message_exchange</b> PLUGINS.</para>
39+
/// </summary>
40+
/// <typeparam name="T">content object</typeparam>
41+
/// <param name="publisher"></param>
42+
/// <param name="delayTime">The delay for message to published.</param>
43+
/// <param name="name">the topic name or exchange router key.</param>
44+
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
45+
/// <param name="headers">message additional headers.</param>
46+
/// <param name="cancellationToken"></param>
47+
public static async Task PublishDelayAsync<T>(this ICapPublisher publisher, TimeSpan delayTime,
48+
string name, T? contentObj, Dictionary<string, string?> headers, CancellationToken cancellationToken = default)
49+
{
50+
headers.Add("x-delay", delayTime.TotalMilliseconds.ToString(CultureInfo.InvariantCulture));
51+
52+
await publisher.PublishAsync(name, contentObj, headers, cancellationToken).ConfigureAwait(false);
53+
}
54+
}
55+
}

src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Diagnostics;
77
using System.Reflection;
88
using System.Threading;
9+
using System.Threading.Channels;
910
using Microsoft.Extensions.Logging;
1011
using Microsoft.Extensions.Options;
1112
using RabbitMQ.Client;
@@ -129,6 +130,8 @@ public virtual IModel Rent()
129130
try
130131
{
131132
model = GetConnection().CreateModel();
133+
model.ExchangeDeclare(Exchange, RabbitMQOptions.ExchangeType, true);
134+
model.ConfirmSelect();
132135
}
133136
catch (Exception e)
134137
{

src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Linq;
67
using System.Threading.Tasks;
78
using DotNetCore.CAP.Internal;
@@ -27,7 +28,7 @@ public RabbitMQTransport(
2728
_exchange = _connectionChannelPool.Exchange;
2829
}
2930

30-
public BrokerAddress BrokerAddress => new ("RabbitMQ", _connectionChannelPool.HostAddress);
31+
public BrokerAddress BrokerAddress => new("RabbitMQ", _connectionChannelPool.HostAddress);
3132

3233
public Task<OperateResult> SendAsync(TransportMessage message)
3334
{
@@ -36,17 +37,28 @@ public Task<OperateResult> SendAsync(TransportMessage message)
3637
{
3738
channel = _connectionChannelPool.Rent();
3839

39-
//channel.ConfirmSelect();
40-
4140
var props = channel.CreateBasicProperties();
4241
props.DeliveryMode = 2;
4342
props.Headers = message.Headers.ToDictionary(x => x.Key, x => (object?)x.Value);
4443

45-
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
44+
if (message.Headers.ContainsKey("x-delay"))
45+
{
46+
var delayExchangeName = _exchange + "_delayed";
47+
channel.ExchangeDeclare(exchange: delayExchangeName, type: "x-delayed-message", durable: true, autoDelete: false,
48+
new Dictionary<string, object>
49+
{
50+
["x-delayed-type"] = "topic"
51+
});
52+
channel.ExchangeBind(_exchange, delayExchangeName, message.GetName());
4653

47-
channel.BasicPublish(_exchange, message.GetName(), props, message.Body);
54+
channel.BasicPublish(delayExchangeName, message.GetName(), props, message.Body);
55+
}
56+
else
57+
{
58+
channel.BasicPublish(_exchange, message.GetName(), props, message.Body);
59+
}
4860

49-
//channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
61+
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
5062

5163
_logger.LogInformation("CAP message '{0}' published, internal id '{1}'", message.GetName(), message.GetId());
5264

src/DotNetCore.CAP/Internal/IMessageSender.Default.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public async Task<OperateResult> SendAsync(MediumMessage message)
4949
{
5050
var executedResult = await SendWithoutRetryAsync(message);
5151
result = executedResult.Item2;
52-
if (result == OperateResult.Success)
52+
if (result.Equals(OperateResult.Success))
5353
{
5454
return result;
5555
}

src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ internal class SubscribeDispatcher : ISubscribeDispatcher
2323
private readonly ILogger _logger;
2424
private readonly IServiceProvider _provider;
2525
private readonly CapOptions _options;
26+
private readonly string? _hostName;
2627

2728
// diagnostics listener
2829
// ReSharper disable once InconsistentNaming
@@ -39,6 +40,7 @@ public SubscribeDispatcher(
3940

4041
_dataStorage = _provider.GetRequiredService<IDataStorage>();
4142
Invoker = _provider.GetRequiredService<ISubscribeInvoker>();
43+
_hostName = GenerateHostnameInstanceId();
4244
}
4345

4446
private ISubscribeInvoker Invoker { get; }
@@ -66,13 +68,13 @@ public async Task<OperateResult> DispatchAsync(MediumMessage message, ConsumerEx
6668
OperateResult result;
6769

6870
//record instance id
69-
message.Origin.Headers[Headers.ExecutionInstanceId] = GenerateHostnameInstanceId();
71+
message.Origin.Headers[Headers.ExecutionInstanceId] = _hostName;
7072

7173
do
7274
{
7375
var (shouldRetry, operateResult) = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken);
7476
result = operateResult;
75-
if (result == OperateResult.Success)
77+
if (result.Equals(OperateResult.Success))
7678
{
7779
return result;
7880
}

src/DotNetCore.CAP/OperateResult.cs

Lines changed: 18 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,78 +3,44 @@
33

44
using System;
55
using System.Collections.Generic;
6-
using System.Linq;
76

87
namespace DotNetCore.CAP
98
{
109
/// <summary>
1110
/// Represents the result of an consistent message operation.
1211
/// </summary>
13-
public class OperateResult
12+
public struct OperateResult : IEqualityComparer<OperateResult>
1413
{
15-
// ReSharper disable once InconsistentNaming
14+
private readonly OperateError? _operateError = null;
1615

17-
// ReSharper disable once FieldCanBeMadeReadOnly.Local
18-
private readonly List<OperateError> _errors = new List<OperateError>();
16+
public OperateResult(bool succeeded, Exception? exception = null, OperateError? error = null)
17+
{
18+
Succeeded = succeeded;
19+
Exception = exception;
20+
_operateError = error;
21+
}
1922

20-
/// <summary>
21-
/// Flag indicating whether if the operation succeeded or not.
22-
/// </summary>
2323
public bool Succeeded { get; set; }
2424

2525
public Exception? Exception { get; set; }
2626

27-
/// <summary>
28-
/// An <see cref="IEnumerable{T}" /> of <see cref="OperateError" />s containing an errors
29-
/// that occurred during the operation.
30-
/// </summary>
31-
/// <value>An <see cref="IEnumerable{T}" /> of <see cref="OperateError" />s.</value>
32-
public IEnumerable<OperateError> Errors => _errors;
27+
public static OperateResult Success => new(true);
3328

34-
/// <summary>
35-
/// Returns an <see cref="OperateResult" /> indicating a successful identity operation.
36-
/// </summary>
37-
/// <returns>An <see cref="OperateResult" /> indicating a successful operation.</returns>
38-
public static OperateResult Success { get; } = new OperateResult {Succeeded = true};
29+
public static OperateResult Failed(Exception ex, OperateError? errors = null) => new(false, ex, errors);
3930

40-
/// <summary>
41-
/// Creates an <see cref="OperateResult" /> indicating a failed operation, with a list of <paramref name="errors" /> if
42-
/// applicable.
43-
/// </summary>
44-
/// <param name="ex">Operate Result exception</param>
45-
/// <param name="errors">An optional array of <see cref="OperateError" />s which caused the operation to fail.</param>
46-
/// <returns>
47-
/// An <see cref="OperateResult" /> indicating a failed operation, with a list of <paramref name="errors" /> if
48-
/// applicable.
49-
/// </returns>
50-
public static OperateResult Failed(Exception ex, params OperateError[] errors)
31+
public override string ToString()
5132
{
52-
var result = new OperateResult
53-
{
54-
Succeeded = false,
55-
Exception = ex
56-
};
57-
if (errors != null)
58-
{
59-
result._errors.AddRange(errors);
60-
}
33+
return Succeeded ? "Succeeded" : $"Failed : {_operateError?.Code}";
34+
}
6135

62-
return result;
36+
public bool Equals(OperateResult x, OperateResult y)
37+
{
38+
return x.Succeeded == y.Succeeded;
6339
}
6440

65-
/// <summary>
66-
/// Converts the value of the current <see cref="OperateResult" /> object to its equivalent string representation.
67-
/// </summary>
68-
/// <returns>A string representation of the current <see cref="OperateResult" /> object.</returns>
69-
/// <remarks>
70-
/// If the operation was successful the ToString() will return "Succeeded" otherwise it returned
71-
/// "Failed : " followed by a comma delimited list of error codes from its <see cref="Errors" /> collection, if any.
72-
/// </remarks>
73-
public override string ToString()
41+
public int GetHashCode(OperateResult obj)
7442
{
75-
return Succeeded
76-
? "Succeeded"
77-
: string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList()));
43+
return HashCode.Combine(obj._operateError, obj.Succeeded, obj.Exception);
7844
}
7945
}
8046

0 commit comments

Comments
 (0)