diff --git a/api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala b/api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala index e217114b..5fcfd98d 100644 --- a/api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala +++ b/api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala @@ -4,3 +4,5 @@ import java.io.IOException case class ConversionException(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause) case class ChannelNotRecoveredException(desc: String, cause: Throwable = null) extends IOException(desc, cause) + +case class TooBigMessage(desc: String, cause: Throwable = null) extends IllegalArgumentException(desc, cause) diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index e3d6ca79..12437a45 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -286,6 +286,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim channel, defaultProperties, producerConfig.reportUnroutable, + producerConfig.sizeLimitBytes, blocker, logger, monitor diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala index 8126ccfd..0e1d6124 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala @@ -19,6 +19,7 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String, channel: ServerChannel, defaultProperties: MessageProperties, reportUnroutable: Boolean, + sizeLimitBytes: Option[Int], blocker: Blocker, logger: ImplicitContextLogger[F], monitor: Monitor[F])(implicit F: Effect[F], cs: ContextShift[F]) @@ -54,7 +55,8 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String, } private def send(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = { - logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >> + checkSize(body, routingKey) >> + logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >> blocker .delay { sendLock.synchronized { @@ -76,6 +78,20 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String, } } + private def checkSize(bytes: Bytes, routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = { + sizeLimitBytes match { + case Some(limit) => + val size = bytes.size() + if (size >= limit) { + logger.warn { + s"[$name] Will not send message with $size B to exchange $exchangeName with routing key '$routingKey' as it is over the limit $limit B" + } >> F.raiseError[Unit](TooBigMessage(s"Message too big ($size/$limit)")) + } else F.unit + + case None => F.unit + } + } + // scalastyle:off private object LoggingReturnListener extends ReturnListener { override def handleReturn(replyCode: Int, diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala b/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala index 0c586714..0582c00e 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala @@ -62,11 +62,14 @@ object DeadQueuePoisonedMessageHandler { connection: RabbitMQConnection[F], monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = { val dqpc = c.deadQueueProducer - val pc = ProducerConfig(name = dqpc.name, - exchange = dqpc.exchange, - declare = dqpc.declare, - reportUnroutable = dqpc.reportUnroutable, - properties = dqpc.properties) + val pc = ProducerConfig( + name = dqpc.name, + exchange = dqpc.exchange, + declare = dqpc.declare, + reportUnroutable = dqpc.reportUnroutable, + sizeLimitBytes = dqpc.sizeLimitBytes, + properties = dqpc.properties + ) connection.newProducer[Bytes](pc, monitor.named("deadQueueProducer")).map { producer => new DeadQueuePoisonedMessageHandler[F, A](c.maxAttempts)((d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => { diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala index 83530d78..1e3d97e0 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala @@ -78,6 +78,7 @@ final case class ProducerConfig(name: String, exchange: String, declare: Option[AutoDeclareExchangeConfig] = None, reportUnroutable: Boolean = true, + sizeLimitBytes: Option[Int] = None, properties: ProducerPropertiesConfig = ProducerPropertiesConfig()) final case class ProducerPropertiesConfig(deliveryMode: Int = 2, @@ -120,6 +121,7 @@ final case class DeadQueueProducerConfig(name: String, routingKey: String, declare: Option[AutoDeclareExchangeConfig] = None, reportUnroutable: Boolean = true, + sizeLimitBytes: Option[Int] = None, properties: ProducerPropertiesConfig = ProducerPropertiesConfig()) case object NoOpPoisonedMessageHandling extends PoisonedMessageHandlingConfig diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala index 7f9c2cc3..2363c8ad 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala @@ -1,7 +1,7 @@ package com.avast.clients.rabbitmq import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.api.{CorrelationIdStrategy, MessageProperties} +import com.avast.clients.rabbitmq.api._ import com.avast.clients.rabbitmq.logging.ImplicitContextLogger import com.avast.metrics.scalaeffectapi.Monitor import com.rabbitmq.client.AMQP @@ -31,6 +31,7 @@ class DefaultRabbitMQProducerTest extends TestBase { monitor = Monitor.noOp(), defaultProperties = MessageProperties.empty, reportUnroutable = false, + sizeLimitBytes = None, blocker = TestBase.testBlocker, logger = ImplicitContextLogger.createLogger ) @@ -72,6 +73,7 @@ class DefaultRabbitMQProducerTest extends TestBase { monitor = Monitor.noOp(), defaultProperties = MessageProperties.empty, reportUnroutable = false, + sizeLimitBytes = None, blocker = TestBase.testBlocker, logger = ImplicitContextLogger.createLogger ) @@ -113,6 +115,7 @@ class DefaultRabbitMQProducerTest extends TestBase { monitor = Monitor.noOp(), defaultProperties = MessageProperties.empty, reportUnroutable = false, + sizeLimitBytes = None, blocker = TestBase.testBlocker, logger = ImplicitContextLogger.createLogger ) @@ -151,6 +154,7 @@ class DefaultRabbitMQProducerTest extends TestBase { monitor = Monitor.noOp(), defaultProperties = MessageProperties.empty, reportUnroutable = false, + sizeLimitBytes = None, blocker = TestBase.testBlocker, logger = ImplicitContextLogger.createLogger ) @@ -169,4 +173,36 @@ class DefaultRabbitMQProducerTest extends TestBase { // check that some CID was generated assert(captor.getValue.getCorrelationId != null) } + + test("too big message is denied") { + val exchangeName = Random.nextString(10) + val routingKey = Random.nextString(10) + + val limit = 500 + + val channel = mock[AutorecoveringChannel] + + val producer = new DefaultRabbitMQProducer[Task, Bytes]( + name = "test", + exchangeName = exchangeName, + channel = channel, + monitor = Monitor.noOp(), + defaultProperties = MessageProperties.empty, + reportUnroutable = false, + sizeLimitBytes = Some(limit), + blocker = TestBase.testBlocker, + logger = ImplicitContextLogger.createLogger + ) + + // don't test anything except it doesn't fail + producer.send(routingKey, Bytes.copyFrom(Array.fill(499)(32.toByte))).await + + assertThrows[TooBigMessage] { + producer.send(routingKey, Bytes.copyFrom(Array.fill(501)(32.toByte))).await + } + + assertThrows[TooBigMessage] { + producer.send(routingKey, Bytes.copyFrom(Array.fill(Random.nextInt(1000) + 500)(32.toByte))).await + } + } }