Skip to content

Commit

Permalink
Configurable size limit for producer
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenda Kolena committed Jul 15, 2022
1 parent 334480e commit e5ac157
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ import java.io.IOException
case class ConversionException(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)

case class ChannelNotRecoveredException(desc: String, cause: Throwable = null) extends IOException(desc, cause)

case class TooBigMessage(desc: String, cause: Throwable = null) extends IllegalArgumentException(desc, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
channel,
defaultProperties,
producerConfig.reportUnroutable,
producerConfig.sizeLimitBytes,
blocker,
logger,
monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
channel: ServerChannel,
defaultProperties: MessageProperties,
reportUnroutable: Boolean,
sizeLimitBytes: Option[Int],
blocker: Blocker,
logger: ImplicitContextLogger[F],
monitor: Monitor[F])(implicit F: Effect[F], cs: ContextShift[F])
Expand Down Expand Up @@ -54,7 +55,8 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
}

private def send(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >>
checkSize(body, routingKey) >>
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >>
blocker
.delay {
sendLock.synchronized {
Expand All @@ -76,6 +78,20 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
}
}

private def checkSize(bytes: Bytes, routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
sizeLimitBytes match {
case Some(limit) =>
val size = bytes.size()
if (size >= limit) {
logger.warn {
s"[$name] Will not send message with $size B to exchange $exchangeName with routing key '$routingKey' as it is over the limit $limit B"
} >> F.raiseError[Unit](TooBigMessage(s"Message too big ($size/$limit)"))
} else F.unit

case None => F.unit
}
}

// scalastyle:off
private object LoggingReturnListener extends ReturnListener {
override def handleReturn(replyCode: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ object DeadQueuePoisonedMessageHandler {
connection: RabbitMQConnection[F],
monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
val dqpc = c.deadQueueProducer
val pc = ProducerConfig(name = dqpc.name,
exchange = dqpc.exchange,
declare = dqpc.declare,
reportUnroutable = dqpc.reportUnroutable,
properties = dqpc.properties)
val pc = ProducerConfig(
name = dqpc.name,
exchange = dqpc.exchange,
declare = dqpc.declare,
reportUnroutable = dqpc.reportUnroutable,
sizeLimitBytes = dqpc.sizeLimitBytes,
properties = dqpc.properties
)

connection.newProducer[Bytes](pc, monitor.named("deadQueueProducer")).map { producer =>
new DeadQueuePoisonedMessageHandler[F, A](c.maxAttempts)((d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ final case class ProducerConfig(name: String,
exchange: String,
declare: Option[AutoDeclareExchangeConfig] = None,
reportUnroutable: Boolean = true,
sizeLimitBytes: Option[Int] = None,
properties: ProducerPropertiesConfig = ProducerPropertiesConfig())

final case class ProducerPropertiesConfig(deliveryMode: Int = 2,
Expand Down Expand Up @@ -120,6 +121,7 @@ final case class DeadQueueProducerConfig(name: String,
routingKey: String,
declare: Option[AutoDeclareExchangeConfig] = None,
reportUnroutable: Boolean = true,
sizeLimitBytes: Option[Int] = None,
properties: ProducerPropertiesConfig = ProducerPropertiesConfig())

case object NoOpPoisonedMessageHandling extends PoisonedMessageHandlingConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.avast.clients.rabbitmq

import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.api.{CorrelationIdStrategy, MessageProperties}
import com.avast.clients.rabbitmq.api._
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
import com.avast.metrics.scalaeffectapi.Monitor
import com.rabbitmq.client.AMQP
Expand Down Expand Up @@ -31,6 +31,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
monitor = Monitor.noOp(),
defaultProperties = MessageProperties.empty,
reportUnroutable = false,
sizeLimitBytes = None,
blocker = TestBase.testBlocker,
logger = ImplicitContextLogger.createLogger
)
Expand Down Expand Up @@ -72,6 +73,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
monitor = Monitor.noOp(),
defaultProperties = MessageProperties.empty,
reportUnroutable = false,
sizeLimitBytes = None,
blocker = TestBase.testBlocker,
logger = ImplicitContextLogger.createLogger
)
Expand Down Expand Up @@ -113,6 +115,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
monitor = Monitor.noOp(),
defaultProperties = MessageProperties.empty,
reportUnroutable = false,
sizeLimitBytes = None,
blocker = TestBase.testBlocker,
logger = ImplicitContextLogger.createLogger
)
Expand Down Expand Up @@ -151,6 +154,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
monitor = Monitor.noOp(),
defaultProperties = MessageProperties.empty,
reportUnroutable = false,
sizeLimitBytes = None,
blocker = TestBase.testBlocker,
logger = ImplicitContextLogger.createLogger
)
Expand All @@ -169,4 +173,36 @@ class DefaultRabbitMQProducerTest extends TestBase {
// check that some CID was generated
assert(captor.getValue.getCorrelationId != null)
}

test("too big message is denied") {
val exchangeName = Random.nextString(10)
val routingKey = Random.nextString(10)

val limit = 500

val channel = mock[AutorecoveringChannel]

val producer = new DefaultRabbitMQProducer[Task, Bytes](
name = "test",
exchangeName = exchangeName,
channel = channel,
monitor = Monitor.noOp(),
defaultProperties = MessageProperties.empty,
reportUnroutable = false,
sizeLimitBytes = Some(limit),
blocker = TestBase.testBlocker,
logger = ImplicitContextLogger.createLogger
)

// don't test anything except it doesn't fail
producer.send(routingKey, Bytes.copyFrom(Array.fill(499)(32.toByte))).await

assertThrows[TooBigMessage] {
producer.send(routingKey, Bytes.copyFrom(Array.fill(501)(32.toByte))).await
}

assertThrows[TooBigMessage] {
producer.send(routingKey, Bytes.copyFrom(Array.fill(Random.nextInt(1000) + 500)(32.toByte))).await
}
}
}

0 comments on commit e5ac157

Please sign in to comment.