Dead lettering in quorum queues #1820
-
Hello everybody! Got a problem understanding the dead-lettering mechanism in quorum queues. From the doc: Here is my publisher: using System.Text;
using System.Text.Json;
using QuorumQueues.Publisher;
using RabbitMQ.Client;
var connectionFactory = new ConnectionFactory() { HostName = "localhost", Port = 5672};
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync("user.create.dlx", ExchangeType.Direct, durable: true, autoDelete: false);
var arguments = new Dictionary<string, object?>()
{
{ "x-queue-type", "quorum" },
{"x-dead-letter-exchange", "user.create.dlx"},
{"x-dead-letter-routing-key", string.Empty},
{"x-dead-letter-strategy", "at-least-once"},
{"x-overflow", "reject-publish"}
};
await channel.QueueDeclareAsync("user.create", durable: true, exclusive: false, autoDelete: false, arguments);
await channel.QueueDeclareAsync("user.create.dlx.queue", durable: true, exclusive: false, autoDelete: false);
await channel.QueueBindAsync("user.create.dlx.queue", "user.create.dlx", string.Empty);
var user = new User()
{
UserName = "foo",
Password = "bar"
};
var userJson = JsonSerializer.Serialize(user);
var userBody = Encoding.UTF8.GetBytes(userJson);
var props = new BasicProperties()
{
Persistent = true
};
Console.WriteLine($"Publishing the message for user creation: {user.UserName}");
await channel.BasicPublishAsync(string.Empty, "user.create", mandatory: false, props, userBody); and consumer using System.Text;
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using QuorumQueues.User_Create.Consumer;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var connectionFactory = new ConnectionFactory() {HostName = "localhost", Port = 5672};
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = Encoding.UTF8.GetString(ea.Body.ToArray());
var user = JsonSerializer.Deserialize<User>(body);
if (user != null)
{
var newChannel = ((AsyncEventingBasicConsumer)model).Channel;
await using var db = new ApplicationContext();
var existingUser = await db.Users.FirstOrDefaultAsync(u => u.UserName == user.UserName);
if (existingUser != null)
{
Console.WriteLine($"Attempt to create the user with the existing username: {user.UserName}");
await newChannel.BasicRejectAsync(ea.DeliveryTag, requeue: false); // this is where i expect dead-letter to happen
}
else
{
db.Users.Add(user);
await db.SaveChangesAsync();
Console.WriteLine($"Successfully created user: {user.UserName}");
await newChannel.BasicAckAsync(ea.DeliveryTag, multiple: true);
}
}
};
await channel.BasicConsumeAsync("user.create", autoAck: false, consumer); So the question is: why doesn't the message get dead-lettered after it was rejected in the following block: if (existingUser != null)
{
Console.WriteLine($"Attempt to create the user with the existing username: {user.UserName}");
await newChannel.BasicRejectAsync(ea.DeliveryTag, requeue: false);
} Maybe i missed something in the code ? Or dead-lettering works slightly differently in quorum queues ? Publisher and Consumer csproj: <ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="7.1.2" />
</ItemGroup> p.s: the |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
@blendereru because you use While at it, consider never using Why policies exist and are superior to hardcoding arguments in your applications is documented in multiple places. |
Beta Was this translation helpful? Give feedback.
@blendereru because you use
reject-publish
. It cannot be used for cases where there is no publisher present, e.g. when a message is negatively acknowledged by a consumer or the message expires. Usedrop-head
and see the myriad of dead-lettering options quorum queues support.While at it, consider never using
x-arguments
in your application code unless that's the only way to configure a setting. The DLX guide uses a policy in the very first example.Why policies exist and are superior to hardcoding arguments in your applications is documented in multiple places.