diff --git a/core/src/main/scala/pl/iterators/stir/server/directives/DebuggingDirectives.scala b/core/src/main/scala/pl/iterators/stir/server/directives/DebuggingDirectives.scala index b589006..7c1d113 100644 --- a/core/src/main/scala/pl/iterators/stir/server/directives/DebuggingDirectives.scala +++ b/core/src/main/scala/pl/iterators/stir/server/directives/DebuggingDirectives.scala @@ -17,38 +17,31 @@ trait DebuggingDirectives { */ def logRequest(logHeaders: Boolean = true, logBody: Boolean = true, redactHeadersWhen: CIString => Boolean = Headers.SensitiveHeaders.contains, + maxLogLength: Int = Int.MaxValue, logAction: Option[String => IO[Unit]] = None): Directive0 = { - val log = logAction.getOrElse { (s: String) => + val log = trimLog(maxLogLength).andThen(logAction.getOrElse { (s: String) => DebuggingDirectives.logger.info(s) - } + }) + Directive { inner => ctx => - if (logBody) { + if (logBody && !ctx.request.isChunked) { IO.ref(Vector.empty[Chunk[Byte]]) .flatMap { vec => - val collectChunks: Pipe[IO, Byte, Nothing] = - _.chunks.flatMap(c => Stream.exec(vec.update(_ :+ c))) - val pipe: Pipe[IO, Byte, Byte] = _.observe(collectChunks) - val bodyForLog = Stream.eval(vec.get).flatMap(v => Stream.emits(v)).unchunks - val logRequest = Logger.logMessage[IO, Request[IO]](ctx.request.withBodyStream(bodyForLog))(logHeaders, - logBody, redactHeadersWhen)(log) - val bodyForRouting = pipe(ctx.request.body) - val newCtx = ctx.copy(request = ctx.request.withBodyStream(bodyForRouting)) - inner(())(newCtx).flatMap { - case RouteResult.Complete(response) => - IO.pure(RouteResult.Complete(response.withBodyStream(response.body.onFinalizeWeak(logRequest)))) - case RouteResult.Rejected(rejections) => - logRequest.as(RouteResult.Rejected(rejections)) - } + val newBody = Stream.eval(vec.get) + .flatMap(chunks => Stream.emits(chunks).covary[IO]) + .flatMap(chunks => Stream.chunk(chunks).covary[IO]) + val newRequest = ctx.request.withBodyStream( + ctx.request.body.observe(_.chunks.flatMap(chunk => Stream.eval(vec.update(_ :+ chunk)).drain))) + + val newCtx = ctx.copy(request = ctx.request.withBodyStream(newBody)) + Logger.logMessage[IO, Request[IO]](newRequest)(logHeaders, logBody = true, redactHeadersWhen)(log).flatMap( + _ => + inner(())(newCtx)) } } else { - inner(())(ctx).flatMap { - case RouteResult.Complete(response) => - Logger.logMessage[IO, Request[IO]](ctx.request)(logHeaders, logBody, redactHeadersWhen)(log) - .as(RouteResult.Complete(response)) - case RouteResult.Rejected(rejections) => - Logger.logMessage[IO, Request[IO]](ctx.request)(logHeaders, logBody, redactHeadersWhen)(log) - .as(RouteResult.Rejected(rejections)) - } + Logger.logMessage[IO, Request[IO]](ctx.request)(logHeaders, logBody = false, redactHeadersWhen)(log).flatMap( + _ => + inner(())(ctx)) } } } @@ -60,25 +53,20 @@ trait DebuggingDirectives { */ def logResult(logHeaders: Boolean = true, logBody: Boolean = true, redactHeadersWhen: CIString => Boolean = Headers.SensitiveHeaders.contains, + maxLogLength: Int = Int.MaxValue, logAction: Option[String => IO[Unit]] = None): Directive0 = { - val log = logAction.getOrElse { (s: String) => + val log = trimLog(maxLogLength).andThen(logAction.getOrElse { (s: String) => DebuggingDirectives.logger.info(s) - } + }) + Directive { inner => ctx => inner(())(ctx).flatMap { case RouteResult.Complete(response) => - if (logBody) { - IO.ref(Vector.empty[Chunk[Byte]]).map { vec => - val newBody = Stream.eval(vec.get).flatMap(v => Stream.emits(v)).unchunks - // Cannot Be Done Asynchronously - Otherwise All Chunks May Not Be Appended Previous to Finalization - val logPipe: Pipe[IO, Byte, Byte] = - _.observe(_.chunks.flatMap(c => Stream.exec(vec.update(_ :+ c)))) - .onFinalizeWeak(Logger.logMessage[IO, Response[IO]](response.withBodyStream(newBody))(logHeaders, - logBody, redactHeadersWhen)(log)) - RouteResult.Complete(response.withBodyStream(logPipe(response.body))) - } + if (logBody && !response.isChunked) { + Logger.logMessage[IO, Response[IO]](response)(logHeaders, logBody = true, redactHeadersWhen)(log).as( + RouteResult.Complete(response)) } else { - Logger.logMessage[IO, Response[IO]](response)(logHeaders, logBody, redactHeadersWhen)(log).as( + Logger.logMessage[IO, Response[IO]](response)(logHeaders, logBody = false, redactHeadersWhen)(log).as( RouteResult.Complete(response)) } case RouteResult.Rejected(rejections) => @@ -95,10 +83,17 @@ trait DebuggingDirectives { */ def logRequestResult(logHeaders: Boolean = true, logBody: Boolean = true, redactHeadersWhen: CIString => Boolean = Headers.SensitiveHeaders.contains, + maxLogLength: Int = Int.MaxValue, logAction: Option[String => IO[Unit]] = None): Directive0 = { - logResult(logHeaders, logBody, redactHeadersWhen, logAction) & logRequest(logHeaders, logBody, redactHeadersWhen, + logResult(logHeaders, logBody, redactHeadersWhen, maxLogLength, logAction) & logRequest(logHeaders, logBody, + redactHeadersWhen, + maxLogLength, logAction) } + + private def trimLog(maxLogLength: Int): String => String = { log => + if (log.length > maxLogLength) log.take(maxLogLength) + "..." else log + } } object DebuggingDirectives extends DebuggingDirectives {