From c609e02a8d031a47747456fefc30bc04cc3f837e Mon Sep 17 00:00:00 2001 From: Jenda Kolena Date: Wed, 22 Jun 2022 15:06:58 +0200 Subject: [PATCH 1/2] Configurable redaction of payload when logging --- README.md | 7 +++++ .../avast/clients/rabbitmq/ConsumerBase.scala | 28 ++++++++++++------- .../DefaultRabbitMQClientFactory.scala | 3 ++ .../clients/rabbitmq/configuration.scala | 3 ++ .../DefaultRabbitMQConsumerTest.scala | 1 + .../DefaultRabbitMQPullConsumerTest.scala | 1 + .../rabbitmq/RepublishStrategyTest.scala | 3 +- 7 files changed, 35 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index a431e582..38b5f7a0 100644 --- a/README.md +++ b/README.md @@ -308,6 +308,13 @@ myConsumer { } ``` +### Payload logging or redaction + +By default, the client logs received delivery (on the TRACE level, unless timeout or sth happens - it's on some higher levels then) for +better debugging experience. However, if you transfer some sensitive data and you don't want the delivery to be logged, you can easily +turn it off by using `redactPayload = true` parameter in consumer configs (note: producer doesn't log the delivery at all, just its +metadata like routing key and properties). + ### Caveats 1. `null` instead of converter instance diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala index 421a8f31..34f230c2 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala @@ -19,6 +19,7 @@ import scala.util._ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A]( consumerName: String, queueName: String, + redactPayload: Boolean, blocker: Blocker, consumerLogger: ImplicitContextLogger[F], consumerRootMonitor: Monitor[F])(implicit val contextShift: ContextShift[F], implicit val deliveryConverter: DeliveryConverter[A]) { @@ -37,24 +38,26 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A case Success(Right(a)) => val delivery = Delivery(a, fixedProperties.asScala, dctx.routingKey.value) - consumerLogger.trace(s"[$consumerName] Received delivery from queue '$queueName': ${delivery.copy(body = rawBody)}").as { - delivery - } + consumerLogger + .trace(s"[$consumerName] Received delivery from queue '$queueName': ${logIfAllowed(delivery.toString)}") + .as(delivery) case Success(Left(ce)) => val delivery = Delivery.MalformedContent(rawBody, fixedProperties.asScala, dctx.routingKey.value, ce) - consumerLogger.trace(s"[$consumerName] Received delivery from queue '$queueName' but could not convert it: $delivery").as { - delivery - } + consumerLogger + .trace( + s"[$consumerName] Received delivery from queue '$queueName' but could not convert it: ${logIfAllowed(delivery.toString)}" + ) + .as(delivery) case Failure(ce) => val ex = ConversionException("Unexpected failure", ce) val delivery = Delivery.MalformedContent(rawBody, fixedProperties.asScala, dctx.routingKey.value, ex) consumerLogger - .trace( - s"[$consumerName] Received delivery from queue '$queueName' but could not convert it as the convertor has failed: $delivery") + .trace(s"[$consumerName] Received delivery from queue '$queueName' but " + + s"could not convert it as the convertor has failed: ${logIfAllowed(delivery.toString)}") .as(delivery) } .map(DeliveryWithContext(_, dctx)) @@ -76,8 +79,9 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A consumerLogger.trace(e)(s"[$consumerName] Timeout for $messageId") >> timeoutsMeter.mark >> { - lazy val msg = - s"[$consumerName] Task timed-out after $processTimeout of processing delivery $messageId with routing key ${delivery.routingKey}, applying DeliveryResult.$timeoutAction. Delivery was:\n$delivery" + lazy val msg = s"[$consumerName] Task timed-out after $processTimeout of processing delivery $messageId " + + s"with routing key ${delivery.routingKey}, applying DeliveryResult.$timeoutAction. " + + s"Delivery was:\n${logIfAllowed(delivery.toString)}" (timeoutLogLevel match { case Level.ERROR => consumerLogger.error(msg) @@ -92,4 +96,8 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A } } else result } + + def logIfAllowed(f: => String): String = { + if (!redactPayload) f else "--redacted--" + } } diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala index f1eb8b31..e3d6ca79 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala @@ -118,6 +118,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim val base = new ConsumerBase[F, A]( name, queueName, + redactPayload, blocker, ImplicitContextLogger.createLogger[F, DefaultRabbitMQStreamingConsumer[F, A]], monitor @@ -178,6 +179,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim val base = new ConsumerBase[F, A]( name, queueName, + redactPayload, blocker, logger, monitor @@ -236,6 +238,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim val base = new ConsumerBase[F, A]( name, queueName, + redactPayload, blocker, logger, monitor diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala index 907ed698..83530d78 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala @@ -33,6 +33,7 @@ final case class ConsumerConfig(name: String, timeoutAction: DeliveryResult = DeliveryResult.Republish(), timeoutLogLevel: Level = Level.WARN, prefetchCount: Int = 100, + redactPayload: Boolean = false, declare: Option[AutoDeclareQueueConfig] = None, consumerTag: String = "Default", poisonedMessageHandling: Option[PoisonedMessageHandlingConfig] = None) @@ -45,6 +46,7 @@ final case class StreamingConsumerConfig(name: String, timeoutLogLevel: Level = Level.WARN, prefetchCount: Int = 100, queueBufferSize: Int = 100, + redactPayload: Boolean = false, declare: Option[AutoDeclareQueueConfig] = None, consumerTag: String = "Default", poisonedMessageHandling: Option[PoisonedMessageHandlingConfig] = None) @@ -52,6 +54,7 @@ final case class StreamingConsumerConfig(name: String, final case class PullConsumerConfig(name: String, queueName: String, bindings: immutable.Seq[AutoBindQueueConfig], + redactPayload: Boolean = false, declare: Option[AutoDeclareQueueConfig] = None, poisonedMessageHandling: Option[PoisonedMessageHandlingConfig] = None) diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala index e3cc7d1b..5d79aa61 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala @@ -437,6 +437,7 @@ class DefaultRabbitMQConsumerTest extends TestBase { val base = new ConsumerBase[Task, Bytes]( "test", "queueName", + false, TestBase.testBlocker, ImplicitContextLogger.createLogger, monitor diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala index 5ecac720..4c015fdc 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala @@ -266,6 +266,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase { val base = new ConsumerBase[Task, A]( "test", "queueName", + false, TestBase.testBlocker, ImplicitContextLogger.createLogger, Monitor.noOp() diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala index 27e2dc25..240c839e 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/RepublishStrategyTest.scala @@ -99,7 +99,8 @@ class RepublishStrategyTest extends TestBase { private def newConsumer(channel: ServerChannel, republishStrategy: RepublishStrategy[Task])( userAction: DeliveryReadAction[Task, Bytes]): DefaultRabbitMQConsumer[Task, Bytes] = { - val base = new ConsumerBase[Task, Bytes]("test", "queueName", TestBase.testBlocker, ImplicitContextLogger.createLogger, Monitor.noOp()) + val base = + new ConsumerBase[Task, Bytes]("test", "queueName", false, TestBase.testBlocker, ImplicitContextLogger.createLogger, Monitor.noOp()) val channelOps = new ConsumerChannelOps[Task, Bytes]( "test", From 9df19c315f49f0d6080269a1484a7aefff7c0290 Mon Sep 17 00:00:00 2001 From: Jenda Kolena Date: Fri, 15 Jul 2022 11:19:19 +0200 Subject: [PATCH 2/2] Making PoisonedMessageHandlerLiveTest more failure-prone --- .../rabbitmq/PoisonedMessageHandlerLiveTest.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerLiveTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerLiveTest.scala index e03c701f..49bb09f5 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerLiveTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerLiveTest.scala @@ -206,9 +206,13 @@ class PoisonedMessageHandlerLiveTest extends TestBase with ScalaFutures { // run async: ex.execute(() => { while (true) { - val PullResult.Ok(dwh) = cons.pull().await - processed.incrementAndGet() - dwh.handle(DeliveryResult.Republish()).await + cons.pull().await match { + case PullResult.Ok(dwh) => + processed.incrementAndGet() + dwh.handle(DeliveryResult.Republish()).await + + case PullResult.EmptyQueue => // 🤷‍ + } } }) @@ -318,7 +322,7 @@ class PoisonedMessageHandlerLiveTest extends TestBase with ScalaFutures { sender.send(initialRoutingKey, Bytes.copyFromUtf8(n.toString), Some(MessageProperties(messageId = Some(s"msg_${n}_")))).await } - eventually(timeout(Span(90, Seconds)), interval(Span(1, Seconds))) { + eventually(timeout(Span(120, Seconds)), interval(Span(1, Seconds))) { println(s"PROCESSED COUNT: ${processed.get()}") // we can't assert the `processed` here - some deliveries may have been cancelled before they were even executed assertResult(0)(testHelper.queue.getMessagesCount(queueName1))