From 362e54348633a8249fd10c7d54d4e1b2ed2cd8d1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 15:43:56 +0100 Subject: [PATCH] no need to loadEnvelope of replayed --- .../dynamodb/internal/DynamoDBProjectionImpl.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 350c518d8..be43a509a 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -598,19 +598,17 @@ private[projection] object DynamoDBProjectionImpl { import DynamoDBOffsetStore.Validation._ offsetStore .validate(envelope) - .flatMap { + .map { case Accepted => if (isFilteredEvent(envelope) && settings.warnAboutFilteredEventsInFlow) { log.info( "atLeastOnceFlow doesn't support skipping envelopes. Envelope [{}] still emitted.", envelope) } - loadEnvelope(envelope.asInstanceOf[Envelope], sourceProvider).map { loadedEnvelope => - offsetStore.addInflight(loadedEnvelope) - Some(loadedEnvelope) - } + offsetStore.addInflight(envelope) + Some(envelope.asInstanceOf[Envelope]) case Duplicate => - Future.successful(None) + None case RejectedSeqNr => // this shouldn't happen throw new RejectedEnvelope(