Skip to content

Commit

Permalink
Merge pull request #183 from avast/ConfigurablePayloadLogging
Browse files Browse the repository at this point in the history
Configurable redaction of payload when logging
  • Loading branch information
jendakol authored Jul 15, 2022
2 parents 23c3699 + 9df19c3 commit 334480e
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 15 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ myConsumer {
}
```

### Payload logging or redaction

By default, the client logs received delivery (on the TRACE level, unless timeout or sth happens - it's on some higher levels then) for
better debugging experience. However, if you transfer some sensitive data and you don't want the delivery to be logged, you can easily
turn it off by using `redactPayload = true` parameter in consumer configs (note: producer doesn't log the delivery at all, just its
metadata like routing key and properties).

### Caveats

1. `null` instead of converter instance
Expand Down
28 changes: 18 additions & 10 deletions core/src/main/scala/com/avast/clients/rabbitmq/ConsumerBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import scala.util._
final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A](
consumerName: String,
queueName: String,
redactPayload: Boolean,
blocker: Blocker,
consumerLogger: ImplicitContextLogger[F],
consumerRootMonitor: Monitor[F])(implicit val contextShift: ContextShift[F], implicit val deliveryConverter: DeliveryConverter[A]) {
Expand All @@ -37,24 +38,26 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A
case Success(Right(a)) =>
val delivery = Delivery(a, fixedProperties.asScala, dctx.routingKey.value)

consumerLogger.trace(s"[$consumerName] Received delivery from queue '$queueName': ${delivery.copy(body = rawBody)}").as {
delivery
}
consumerLogger
.trace(s"[$consumerName] Received delivery from queue '$queueName': ${logIfAllowed(delivery.toString)}")
.as(delivery)

case Success(Left(ce)) =>
val delivery = Delivery.MalformedContent(rawBody, fixedProperties.asScala, dctx.routingKey.value, ce)

consumerLogger.trace(s"[$consumerName] Received delivery from queue '$queueName' but could not convert it: $delivery").as {
delivery
}
consumerLogger
.trace(
s"[$consumerName] Received delivery from queue '$queueName' but could not convert it: ${logIfAllowed(delivery.toString)}"
)
.as(delivery)

case Failure(ce) =>
val ex = ConversionException("Unexpected failure", ce)
val delivery = Delivery.MalformedContent(rawBody, fixedProperties.asScala, dctx.routingKey.value, ex)

consumerLogger
.trace(
s"[$consumerName] Received delivery from queue '$queueName' but could not convert it as the convertor has failed: $delivery")
.trace(s"[$consumerName] Received delivery from queue '$queueName' but " +
s"could not convert it as the convertor has failed: ${logIfAllowed(delivery.toString)}")
.as(delivery)
}
.map(DeliveryWithContext(_, dctx))
Expand All @@ -76,8 +79,9 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A
consumerLogger.trace(e)(s"[$consumerName] Timeout for $messageId") >>
timeoutsMeter.mark >> {

lazy val msg =
s"[$consumerName] Task timed-out after $processTimeout of processing delivery $messageId with routing key ${delivery.routingKey}, applying DeliveryResult.$timeoutAction. Delivery was:\n$delivery"
lazy val msg = s"[$consumerName] Task timed-out after $processTimeout of processing delivery $messageId " +
s"with routing key ${delivery.routingKey}, applying DeliveryResult.$timeoutAction. " +
s"Delivery was:\n${logIfAllowed(delivery.toString)}"

(timeoutLogLevel match {
case Level.ERROR => consumerLogger.error(msg)
Expand All @@ -92,4 +96,8 @@ final private[rabbitmq] case class ConsumerBase[F[_]: ConcurrentEffect: Timer, A
}
} else result
}

def logIfAllowed(f: => String): String = {
if (!redactPayload) f else "--redacted--"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
val base = new ConsumerBase[F, A](
name,
queueName,
redactPayload,
blocker,
ImplicitContextLogger.createLogger[F, DefaultRabbitMQStreamingConsumer[F, A]],
monitor
Expand Down Expand Up @@ -178,6 +179,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
val base = new ConsumerBase[F, A](
name,
queueName,
redactPayload,
blocker,
logger,
monitor
Expand Down Expand Up @@ -236,6 +238,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
val base = new ConsumerBase[F, A](
name,
queueName,
redactPayload,
blocker,
logger,
monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ final case class ConsumerConfig(name: String,
timeoutAction: DeliveryResult = DeliveryResult.Republish(),
timeoutLogLevel: Level = Level.WARN,
prefetchCount: Int = 100,
redactPayload: Boolean = false,
declare: Option[AutoDeclareQueueConfig] = None,
consumerTag: String = "Default",
poisonedMessageHandling: Option[PoisonedMessageHandlingConfig] = None)
Expand All @@ -45,13 +46,15 @@ final case class StreamingConsumerConfig(name: String,
timeoutLogLevel: Level = Level.WARN,
prefetchCount: Int = 100,
queueBufferSize: Int = 100,
redactPayload: Boolean = false,
declare: Option[AutoDeclareQueueConfig] = None,
consumerTag: String = "Default",
poisonedMessageHandling: Option[PoisonedMessageHandlingConfig] = None)

final case class PullConsumerConfig(name: String,
queueName: String,
bindings: immutable.Seq[AutoBindQueueConfig],
redactPayload: Boolean = false,
declare: Option[AutoDeclareQueueConfig] = None,
poisonedMessageHandling: Option[PoisonedMessageHandlingConfig] = None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ class DefaultRabbitMQConsumerTest extends TestBase {
val base = new ConsumerBase[Task, Bytes](
"test",
"queueName",
false,
TestBase.testBlocker,
ImplicitContextLogger.createLogger,
monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class DefaultRabbitMQPullConsumerTest extends TestBase {
val base = new ConsumerBase[Task, A](
"test",
"queueName",
false,
TestBase.testBlocker,
ImplicitContextLogger.createLogger,
Monitor.noOp()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,13 @@ class PoisonedMessageHandlerLiveTest extends TestBase with ScalaFutures {
// run async:
ex.execute(() => {
while (true) {
val PullResult.Ok(dwh) = cons.pull().await
processed.incrementAndGet()
dwh.handle(DeliveryResult.Republish()).await
cons.pull().await match {
case PullResult.Ok(dwh) =>
processed.incrementAndGet()
dwh.handle(DeliveryResult.Republish()).await

case PullResult.EmptyQueue => // 🤷‍
}
}
})

Expand Down Expand Up @@ -318,7 +322,7 @@ class PoisonedMessageHandlerLiveTest extends TestBase with ScalaFutures {
sender.send(initialRoutingKey, Bytes.copyFromUtf8(n.toString), Some(MessageProperties(messageId = Some(s"msg_${n}_")))).await
}

eventually(timeout(Span(90, Seconds)), interval(Span(1, Seconds))) {
eventually(timeout(Span(120, Seconds)), interval(Span(1, Seconds))) {
println(s"PROCESSED COUNT: ${processed.get()}")
// we can't assert the `processed` here - some deliveries may have been cancelled before they were even executed
assertResult(0)(testHelper.queue.getMessagesCount(queueName1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class RepublishStrategyTest extends TestBase {

private def newConsumer(channel: ServerChannel, republishStrategy: RepublishStrategy[Task])(
userAction: DeliveryReadAction[Task, Bytes]): DefaultRabbitMQConsumer[Task, Bytes] = {
val base = new ConsumerBase[Task, Bytes]("test", "queueName", TestBase.testBlocker, ImplicitContextLogger.createLogger, Monitor.noOp())
val base =
new ConsumerBase[Task, Bytes]("test", "queueName", false, TestBase.testBlocker, ImplicitContextLogger.createLogger, Monitor.noOp())

val channelOps = new ConsumerChannelOps[Task, Bytes](
"test",
Expand Down

0 comments on commit 334480e

Please sign in to comment.