Skip to content

Commit ae566e0

Browse files
authored
Merge pull request #185 from avast/ProducerSizeLimit
Configurable size limit for producer
2 parents 334480e + e5ac157 commit ae566e0

File tree

6 files changed

+67
-7
lines changed

6 files changed

+67
-7
lines changed

api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ import java.io.IOException
44
case class ConversionException(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)
55

66
case class ChannelNotRecoveredException(desc: String, cause: Throwable = null) extends IOException(desc, cause)
7+
8+
case class TooBigMessage(desc: String, cause: Throwable = null) extends IllegalArgumentException(desc, cause)

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
286286
channel,
287287
defaultProperties,
288288
producerConfig.reportUnroutable,
289+
producerConfig.sizeLimitBytes,
289290
blocker,
290291
logger,
291292
monitor

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
1919
channel: ServerChannel,
2020
defaultProperties: MessageProperties,
2121
reportUnroutable: Boolean,
22+
sizeLimitBytes: Option[Int],
2223
blocker: Blocker,
2324
logger: ImplicitContextLogger[F],
2425
monitor: Monitor[F])(implicit F: Effect[F], cs: ContextShift[F])
@@ -54,7 +55,8 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
5455
}
5556

5657
private def send(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
57-
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >>
58+
checkSize(body, routingKey) >>
59+
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >>
5860
blocker
5961
.delay {
6062
sendLock.synchronized {
@@ -76,6 +78,20 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
7678
}
7779
}
7880

81+
private def checkSize(bytes: Bytes, routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
82+
sizeLimitBytes match {
83+
case Some(limit) =>
84+
val size = bytes.size()
85+
if (size >= limit) {
86+
logger.warn {
87+
s"[$name] Will not send message with $size B to exchange $exchangeName with routing key '$routingKey' as it is over the limit $limit B"
88+
} >> F.raiseError[Unit](TooBigMessage(s"Message too big ($size/$limit)"))
89+
} else F.unit
90+
91+
case None => F.unit
92+
}
93+
}
94+
7995
// scalastyle:off
8096
private object LoggingReturnListener extends ReturnListener {
8197
override def handleReturn(replyCode: Int,

core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,14 @@ object DeadQueuePoisonedMessageHandler {
6262
connection: RabbitMQConnection[F],
6363
monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
6464
val dqpc = c.deadQueueProducer
65-
val pc = ProducerConfig(name = dqpc.name,
66-
exchange = dqpc.exchange,
67-
declare = dqpc.declare,
68-
reportUnroutable = dqpc.reportUnroutable,
69-
properties = dqpc.properties)
65+
val pc = ProducerConfig(
66+
name = dqpc.name,
67+
exchange = dqpc.exchange,
68+
declare = dqpc.declare,
69+
reportUnroutable = dqpc.reportUnroutable,
70+
sizeLimitBytes = dqpc.sizeLimitBytes,
71+
properties = dqpc.properties
72+
)
7073

7174
connection.newProducer[Bytes](pc, monitor.named("deadQueueProducer")).map { producer =>
7275
new DeadQueuePoisonedMessageHandler[F, A](c.maxAttempts)((d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => {

core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ final case class ProducerConfig(name: String,
7878
exchange: String,
7979
declare: Option[AutoDeclareExchangeConfig] = None,
8080
reportUnroutable: Boolean = true,
81+
sizeLimitBytes: Option[Int] = None,
8182
properties: ProducerPropertiesConfig = ProducerPropertiesConfig())
8283

8384
final case class ProducerPropertiesConfig(deliveryMode: Int = 2,
@@ -120,6 +121,7 @@ final case class DeadQueueProducerConfig(name: String,
120121
routingKey: String,
121122
declare: Option[AutoDeclareExchangeConfig] = None,
122123
reportUnroutable: Boolean = true,
124+
sizeLimitBytes: Option[Int] = None,
123125
properties: ProducerPropertiesConfig = ProducerPropertiesConfig())
124126

125127
case object NoOpPoisonedMessageHandling extends PoisonedMessageHandlingConfig

core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.avast.clients.rabbitmq
22

33
import com.avast.bytes.Bytes
4-
import com.avast.clients.rabbitmq.api.{CorrelationIdStrategy, MessageProperties}
4+
import com.avast.clients.rabbitmq.api._
55
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
66
import com.avast.metrics.scalaeffectapi.Monitor
77
import com.rabbitmq.client.AMQP
@@ -31,6 +31,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
3131
monitor = Monitor.noOp(),
3232
defaultProperties = MessageProperties.empty,
3333
reportUnroutable = false,
34+
sizeLimitBytes = None,
3435
blocker = TestBase.testBlocker,
3536
logger = ImplicitContextLogger.createLogger
3637
)
@@ -72,6 +73,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
7273
monitor = Monitor.noOp(),
7374
defaultProperties = MessageProperties.empty,
7475
reportUnroutable = false,
76+
sizeLimitBytes = None,
7577
blocker = TestBase.testBlocker,
7678
logger = ImplicitContextLogger.createLogger
7779
)
@@ -113,6 +115,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
113115
monitor = Monitor.noOp(),
114116
defaultProperties = MessageProperties.empty,
115117
reportUnroutable = false,
118+
sizeLimitBytes = None,
116119
blocker = TestBase.testBlocker,
117120
logger = ImplicitContextLogger.createLogger
118121
)
@@ -151,6 +154,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
151154
monitor = Monitor.noOp(),
152155
defaultProperties = MessageProperties.empty,
153156
reportUnroutable = false,
157+
sizeLimitBytes = None,
154158
blocker = TestBase.testBlocker,
155159
logger = ImplicitContextLogger.createLogger
156160
)
@@ -169,4 +173,36 @@ class DefaultRabbitMQProducerTest extends TestBase {
169173
// check that some CID was generated
170174
assert(captor.getValue.getCorrelationId != null)
171175
}
176+
177+
test("too big message is denied") {
178+
val exchangeName = Random.nextString(10)
179+
val routingKey = Random.nextString(10)
180+
181+
val limit = 500
182+
183+
val channel = mock[AutorecoveringChannel]
184+
185+
val producer = new DefaultRabbitMQProducer[Task, Bytes](
186+
name = "test",
187+
exchangeName = exchangeName,
188+
channel = channel,
189+
monitor = Monitor.noOp(),
190+
defaultProperties = MessageProperties.empty,
191+
reportUnroutable = false,
192+
sizeLimitBytes = Some(limit),
193+
blocker = TestBase.testBlocker,
194+
logger = ImplicitContextLogger.createLogger
195+
)
196+
197+
// don't test anything except it doesn't fail
198+
producer.send(routingKey, Bytes.copyFrom(Array.fill(499)(32.toByte))).await
199+
200+
assertThrows[TooBigMessage] {
201+
producer.send(routingKey, Bytes.copyFrom(Array.fill(501)(32.toByte))).await
202+
}
203+
204+
assertThrows[TooBigMessage] {
205+
producer.send(routingKey, Bytes.copyFrom(Array.fill(Random.nextInt(1000) + 500)(32.toByte))).await
206+
}
207+
}
172208
}

0 commit comments

Comments
 (0)