Skip to content

Commit d46f851

Browse files
author
Karel Fajkus
committed
refactor + documentation
1 parent 7f44787 commit d46f851

File tree

6 files changed

+205
-116
lines changed

6 files changed

+205
-116
lines changed

README.md

+19
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,25 @@ If you don't specify the strategy by yourself, `CorrelationIdStrategy.FromProper
517517
properties (or headers) and generate a new one if it doesn't succeed. In any way, the CID will be part of both logs and resulting (outgoing)
518518
RabbitMQ message.
519519

520+
#### Publisher confirms
521+
522+
By using following configuration
523+
```hocon
524+
producer {
525+
properties {
526+
confirms {
527+
enabled = true
528+
sendAttempts = 2
529+
}
530+
}
531+
}
532+
```
533+
clients can enable [publisher confirms](https://www.rabbitmq.com/confirms.html#publisher-confirms). Each `send` call will wait for ack/nack from broker.
534+
This wait is of course non-blocking. `sendAttempts` is number of all attempts including initial one. If number of `sendAttempts` is greater than 1 it will try to resend messages again
535+
right after it obtains nack from broker.
536+
537+
From implementation point of view, it uses asynchronous acks/nacks combined with [Deferred](https://typelevel.org/cats-effect/docs/std/deferred) from cats library.
538+
520539
#### Consumers
521540

522541
You can also get the CorrelationId from the message properties on the consumer side. The CID is taken from both AMQP properties

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala

+35-18
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.avast.bytes.Bytes
77
import com.avast.clients.rabbitmq.DefaultRabbitMQClientFactory.startConsumingQueue
88
import com.avast.clients.rabbitmq.api._
99
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
10+
import com.avast.clients.rabbitmq.publisher.{BaseRabbitMQProducer, DefaultRabbitMQProducer, PublishConfirmsRabbitMQProducer}
1011
import com.avast.metrics.scalaeffectapi.Monitor
1112
import com.rabbitmq.client.Consumer
1213

@@ -27,7 +28,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
2728

2829
object Producer {
2930

30-
def create[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor[F]): Resource[F, DefaultRabbitMQProducer[F, A]] = {
31+
def create[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor[F]): Resource[F, BaseRabbitMQProducer[F, A]] = {
3132
prepareProducer[A](producerConfig, connection, monitor)
3233
}
3334
}
@@ -264,37 +265,53 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
264265

265266
private def prepareProducer[A: ProductConverter](producerConfig: ProducerConfig,
266267
connection: RabbitMQConnection[F],
267-
monitor: Monitor[F]): Resource[F, DefaultRabbitMQProducer[F, A]] = {
268-
val logger = ImplicitContextLogger.createLogger[F, DefaultRabbitMQProducer[F, A]]
268+
monitor: Monitor[F]): Resource[F, BaseRabbitMQProducer[F, A]] = {
269+
val logger = ImplicitContextLogger.createLogger[F, BaseRabbitMQProducer[F, A]]
269270

270271
connection
271272
.newChannel()
272273
.evalTap { channel =>
273274
// auto declare exchange; if configured
274275
producerConfig.declare.map { declareExchange(producerConfig.exchange, channel, _)(logger) }.getOrElse(F.unit)
275276
}
276-
.evalMap { channel =>
277+
.evalMap[F, BaseRabbitMQProducer[F, A]] { channel =>
277278
val defaultProperties = MessageProperties(
278279
deliveryMode = DeliveryMode.fromCode(producerConfig.properties.deliveryMode),
279280
contentType = producerConfig.properties.contentType,
280281
contentEncoding = producerConfig.properties.contentEncoding,
281282
priority = producerConfig.properties.priority.map(Integer.valueOf)
282283
)
283284

284-
producerConfig.properties.confirms.traverse(_ => Ref.of(Map.empty[Long, Deferred[F, Either[Throwable, Unit]]]))
285-
.map(new DefaultRabbitMQProducer[F, A](
286-
producerConfig.name,
287-
producerConfig.exchange,
288-
channel,
289-
defaultProperties,
290-
_,
291-
producerConfig.properties.confirms,
292-
producerConfig.reportUnroutable,
293-
producerConfig.sizeLimitBytes,
294-
blocker,
295-
logger,
296-
monitor
297-
))
285+
producerConfig.properties.confirms match {
286+
case Some(PublisherConfirmsConfig(true, sendAttempts)) =>
287+
Ref.of(Map.empty[Long, Deferred[F, Either[Throwable, Unit]]])
288+
.map {
289+
new PublishConfirmsRabbitMQProducer[F, A](
290+
producerConfig.name,
291+
producerConfig.exchange,
292+
channel,
293+
defaultProperties,
294+
_,
295+
sendAttempts,
296+
producerConfig.reportUnroutable,
297+
producerConfig.sizeLimitBytes,
298+
blocker,
299+
logger,
300+
monitor)
301+
}
302+
case _ =>
303+
F.pure {
304+
new DefaultRabbitMQProducer[F, A](producerConfig.name,
305+
producerConfig.exchange,
306+
channel,
307+
defaultProperties,
308+
producerConfig.reportUnroutable,
309+
producerConfig.sizeLimitBytes,
310+
blocker,
311+
logger,
312+
monitor)
313+
}
314+
}
298315
}
299316
}
300317

Original file line numberDiff line numberDiff line change
@@ -1,52 +1,44 @@
1-
package com.avast.clients.rabbitmq
1+
package com.avast.clients.rabbitmq.publisher
22

3-
import cats.effect.concurrent.{Deferred, Ref}
43
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync}
54
import cats.syntax.applicativeError._
65
import cats.syntax.flatMap._
7-
import cats.syntax.foldable._
86
import cats.syntax.functor._
97
import com.avast.bytes.Bytes
10-
import com.avast.clients.rabbitmq.DefaultRabbitMQProducer.SentMessages
11-
import com.avast.clients.rabbitmq.JavaConverters._
128
import com.avast.clients.rabbitmq.api.CorrelationIdStrategy.FromPropertiesOrRandomNew
139
import com.avast.clients.rabbitmq.api._
10+
import com.avast.clients.rabbitmq.JavaConverters._
1411
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
12+
import com.avast.clients.rabbitmq.{startAndForget, CorrelationId, ProductConverter, ServerChannel}
1513
import com.avast.metrics.scalaeffectapi.Monitor
1614
import com.rabbitmq.client.AMQP.BasicProperties
17-
import com.rabbitmq.client.{AlreadyClosedException, ConfirmListener, ReturnListener}
15+
import com.rabbitmq.client.{AlreadyClosedException, ReturnListener}
1816

1917
import java.util.UUID
2018
import scala.util.control.NonFatal
2119

22-
class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
23-
exchangeName: String,
24-
channel: ServerChannel,
25-
defaultProperties: MessageProperties,
26-
sentMessages: Option[SentMessages[F]],
27-
publisherConfirmsConfig: Option[PublisherConfirmsConfig],
28-
reportUnroutable: Boolean,
29-
sizeLimitBytes: Option[Int],
30-
blocker: Blocker,
31-
logger: ImplicitContextLogger[F],
32-
monitor: Monitor[F])(implicit F: ConcurrentEffect[F], cs: ContextShift[F])
20+
abstract class BaseRabbitMQProducer[F[_], A: ProductConverter](name: String,
21+
exchangeName: String,
22+
channel: ServerChannel,
23+
defaultProperties: MessageProperties,
24+
reportUnroutable: Boolean,
25+
sizeLimitBytes: Option[Int],
26+
blocker: Blocker,
27+
logger: ImplicitContextLogger[F],
28+
monitor: Monitor[F])(implicit F: ConcurrentEffect[F], cs: ContextShift[F])
3329
extends RabbitMQProducer[F, A] {
3430

3531
private val sentMeter = monitor.meter("sent")
3632
private val sentFailedMeter = monitor.meter("sentFailed")
3733
private val unroutableMeter = monitor.meter("unroutable")
38-
private val acked = monitor.meter("acked")
39-
private val nacked = monitor.meter("nacked")
4034

4135
private val converter = implicitly[ProductConverter[A]]
4236

4337
private val sendLock = new Object
4438

4539
channel.addReturnListener(if (reportUnroutable) LoggingReturnListener else NoOpReturnListener)
46-
publisherConfirmsConfig.foreach(cfg => if (cfg.enabled) {
47-
channel.confirmSelect()
48-
channel.addConfirmListener(DefaultConfirmListener)
49-
})
40+
41+
def sendMessage(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit]
5042

5143
override def send(routingKey: String, body: A, properties: Option[MessageProperties] = None)(
5244
implicit cidStrategy: CorrelationIdStrategy = FromPropertiesOrRandomNew(properties)): F[Unit] = {
@@ -65,17 +57,13 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
6557
case Right(convertedBody) =>
6658
for {
6759
_ <- checkSize(convertedBody, routingKey)
68-
result = publisherConfirmsConfig match {
69-
case Some(cfg @ PublisherConfirmsConfig(true, _)) => sendWithAck(routingKey, convertedBody, finalProperties, 1, cfg)
70-
case _ => send(routingKey, convertedBody, finalProperties)
71-
}
72-
_ <- logErrors(result, routingKey)
60+
_ <- logErrors(sendMessage(routingKey, convertedBody, finalProperties), routingKey)
7361
} yield ()
7462
case Left(ce) => Sync[F].raiseError(ce)
7563
}
7664
}
7765

78-
private def send(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
66+
protected def basicSend(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
7967
for {
8068
_ <- logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties")
8169
_ <- blocker.delay {
@@ -102,38 +90,6 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
10290
}
10391
}
10492

105-
private def sendWithAck(routingKey: String, body: Bytes, properties: MessageProperties, attemptCount: Int, publisherConfirmsConfig: PublisherConfirmsConfig)(
106-
implicit correlationId: CorrelationId): F[Unit] = {
107-
108-
if (attemptCount > publisherConfirmsConfig.sendAttempts) {
109-
F.raiseError(MaxAttempts("Exhausted max number of attempts"))
110-
} else {
111-
val messageId = channel.getNextPublishSeqNo
112-
for {
113-
defer <- Deferred.apply[F, Either[Throwable, Unit]]
114-
_ <- sentMessages.traverse_(_.update(_ + (messageId -> defer)))
115-
_ <- send(routingKey, body, properties)
116-
result <- defer.get
117-
_ <- result match {
118-
case Left(err) =>
119-
val sendResult = if (publisherConfirmsConfig.sendAttempts > 1) {
120-
clearProcessedMessage(messageId) >> sendWithAck(routingKey, body, properties, attemptCount + 1, publisherConfirmsConfig)
121-
} else {
122-
F.raiseError(NotAcknowledgedPublish(s"Broker did not acknowledge publish of message $messageId", err))
123-
}
124-
125-
nacked.mark >> sendResult // TODO: markovat kazdy nack nebo az pokud se to nepovede?
126-
case Right(_) =>
127-
acked.mark >> clearProcessedMessage(messageId)
128-
}
129-
} yield ()
130-
}
131-
}
132-
133-
private def clearProcessedMessage(messageId: Long): F[Unit] = {
134-
sentMessages.traverse_(_.update(_ - messageId))
135-
}
136-
13793
private def checkSize(bytes: Bytes, routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
13894
sizeLimitBytes match {
13995
case Some(limit) =>
@@ -178,26 +134,4 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
178134
}
179135
}
180136

181-
private object DefaultConfirmListener extends ConfirmListener {
182-
import cats.syntax.foldable._
183-
override def handleAck(deliveryTag: Long, multiple: Boolean): Unit = {
184-
startAndForget {
185-
logger.plainTrace(s"Acked $deliveryTag") >> completeDefer(deliveryTag, Right())
186-
}
187-
}
188-
override def handleNack(deliveryTag: Long, multiple: Boolean): Unit = {
189-
startAndForget {
190-
logger.plainTrace(s"Not acked $deliveryTag") >> completeDefer(deliveryTag, Left(new Exception(s"Message $deliveryTag not acknowledged by broker")))
191-
}
192-
}
193-
194-
private def completeDefer(deliveryTag: Long, result: Either[Throwable, Unit]): F[Unit] = {
195-
sentMessages.traverse_(_.get.flatMap(_.get(deliveryTag).traverse_(_.complete(result))))
196-
}
197-
}
198-
199-
}
200-
201-
object DefaultRabbitMQProducer {
202-
type SentMessages[F[_]] = Ref[F, Map[Long, Deferred[F, Either[Throwable, Unit]]]]
203137
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.avast.clients.rabbitmq.publisher
2+
3+
import cats.effect.{Blocker, ConcurrentEffect, ContextShift}
4+
import com.avast.bytes.Bytes
5+
import com.avast.clients.rabbitmq.api.MessageProperties
6+
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
7+
import com.avast.clients.rabbitmq.{CorrelationId, ProductConverter, ServerChannel}
8+
import com.avast.metrics.scalaeffectapi.Monitor
9+
10+
class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
11+
exchangeName: String,
12+
channel: ServerChannel,
13+
defaultProperties: MessageProperties,
14+
reportUnroutable: Boolean,
15+
sizeLimitBytes: Option[Int],
16+
blocker: Blocker,
17+
logger: ImplicitContextLogger[F],
18+
monitor: Monitor[F])(implicit F: ConcurrentEffect[F], cs: ContextShift[F])
19+
extends BaseRabbitMQProducer[F, A](name,
20+
exchangeName,
21+
channel,
22+
defaultProperties,
23+
reportUnroutable,
24+
sizeLimitBytes,
25+
blocker,
26+
logger,
27+
monitor) {
28+
override def sendMessage(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] =
29+
basicSend(routingKey, body, properties)
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.avast.clients.rabbitmq.publisher
2+
3+
import cats.effect.concurrent.{Deferred, Ref}
4+
import cats.effect.{Blocker, ConcurrentEffect, ContextShift}
5+
import cats.syntax.flatMap._
6+
import cats.syntax.functor._
7+
import com.avast.bytes.Bytes
8+
import com.avast.clients.rabbitmq.api.{MaxAttempts, MessageProperties, NotAcknowledgedPublish}
9+
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
10+
import com.avast.clients.rabbitmq.publisher.PublishConfirmsRabbitMQProducer.SentMessages
11+
import com.avast.clients.rabbitmq.{CorrelationId, ProductConverter, ServerChannel, startAndForget}
12+
import com.avast.metrics.scalaeffectapi.Monitor
13+
import com.rabbitmq.client.ConfirmListener
14+
15+
class PublishConfirmsRabbitMQProducer[F[_], A: ProductConverter](name: String,
16+
exchangeName: String,
17+
channel: ServerChannel,
18+
defaultProperties: MessageProperties,
19+
sentMessages: SentMessages[F],
20+
sendAttempts: Int,
21+
reportUnroutable: Boolean,
22+
sizeLimitBytes: Option[Int],
23+
blocker: Blocker,
24+
logger: ImplicitContextLogger[F],
25+
monitor: Monitor[F])(implicit F: ConcurrentEffect[F], cs: ContextShift[F])
26+
extends BaseRabbitMQProducer[F, A](name,
27+
exchangeName,
28+
channel,
29+
defaultProperties,
30+
reportUnroutable,
31+
sizeLimitBytes,
32+
blocker,
33+
logger,
34+
monitor) {
35+
36+
channel.confirmSelect()
37+
channel.addConfirmListener(DefaultConfirmListener)
38+
39+
private val acked = monitor.meter("acked")
40+
private val nacked = monitor.meter("nacked")
41+
override def sendMessage(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] =
42+
sendWithAck(routingKey, body, properties, 1)
43+
private def sendWithAck(routingKey: String, body: Bytes, properties: MessageProperties, attemptCount: Int)(
44+
implicit correlationId: CorrelationId): F[Unit] = {
45+
46+
if (attemptCount > sendAttempts) {
47+
F.raiseError(MaxAttempts("Exhausted max number of attempts"))
48+
} else {
49+
val messageId = channel.getNextPublishSeqNo
50+
for {
51+
defer <- Deferred.apply[F, Either[Throwable, Unit]]
52+
_ <- sentMessages.update(_ + (messageId -> defer))
53+
_ <- basicSend(routingKey, body, properties)
54+
result <- defer.get
55+
_ <- result match {
56+
case Left(err) =>
57+
val sendResult = if (sendAttempts > 1) {
58+
clearProcessedMessage(messageId) >> sendWithAck(routingKey, body, properties, attemptCount + 1)
59+
} else {
60+
F.raiseError(NotAcknowledgedPublish(s"Broker did not acknowledge publish of message $messageId", err))
61+
}
62+
63+
nacked.mark >> sendResult
64+
case Right(_) =>
65+
acked.mark >> clearProcessedMessage(messageId)
66+
}
67+
} yield ()
68+
}
69+
}
70+
71+
private def clearProcessedMessage(messageId: Long): F[Unit] = {
72+
sentMessages.update(_ - messageId)
73+
}
74+
75+
private object DefaultConfirmListener extends ConfirmListener {
76+
import cats.syntax.foldable._
77+
override def handleAck(deliveryTag: Long, multiple: Boolean): Unit = {
78+
startAndForget {
79+
logger.plainTrace(s"Acked $deliveryTag") >> completeDefer(deliveryTag, Right(()))
80+
}
81+
}
82+
override def handleNack(deliveryTag: Long, multiple: Boolean): Unit = {
83+
startAndForget {
84+
logger.plainTrace(s"Not acked $deliveryTag") >> completeDefer(
85+
deliveryTag,
86+
Left(new Exception(s"Message $deliveryTag not acknowledged by broker")))
87+
}
88+
}
89+
90+
private def completeDefer(deliveryTag: Long, result: Either[Throwable, Unit]): F[Unit] = {
91+
sentMessages.get.flatMap(_.get(deliveryTag).traverse_(_.complete(result)))
92+
}
93+
}
94+
95+
}
96+
97+
object PublishConfirmsRabbitMQProducer {
98+
type SentMessages[F[_]] = Ref[F, Map[Long, Deferred[F, Either[Throwable, Unit]]]]
99+
}

0 commit comments

Comments
 (0)