diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 946e443d3..2d8ae583f 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -217,7 +217,7 @@ private[projection] class DynamoDBOffsetStore( } private val logger = LoggerFactory.getLogger(this.getClass) - private val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:" + val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:" private val dao = new OffsetStoreDao(system, settings, projectionId, client) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index bf8284e83..ee09de564 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -203,12 +203,15 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - replayIfPossible(envelope).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible(envelope).map { + case true => Done + case false => throwRejectedEnvelopeAfterFailedReplay("query", sourceProvider, envelope) + }(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => replayIfPossible(envelope).map { case true => Done - case false => throwRejectedEnvelope(sourceProvider, envelope) - } + case false => throwRejectedEnvelopeAfterFailedReplay("backtracking", sourceProvider, envelope) + }(ExecutionContext.parasitic) } } @@ -257,13 +260,14 @@ private[projection] object DynamoDBProjectionImpl { } .runFold(0) { case (acc, _) => acc + 1 } .map { count => - val expected = toSeqNr - fromSeqNr + val expected = toSeqNr - fromSeqNr + 1 if (count == expected) { true } else { // it's expected to find all events, otherwise fail the replay attempt log.warn( - "Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, count, expected, persistenceId, @@ -274,7 +278,8 @@ private[projection] object DynamoDBProjectionImpl { } .recoverWith { exc => log.warn( - "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr, @@ -514,6 +519,20 @@ private[projection] object DynamoDBProjectionImpl { } } + private def throwRejectedEnvelopeAfterFailedReplay[Offset, Envelope]( + source: String, + sourceProvider: SourceProvider[Offset, Envelope], + envelope: Envelope): Nothing = { + extractOffsetPidSeqNr(sourceProvider, envelope) match { + case OffsetPidSeqNr(_, Some((pid, seqNr))) => + throw new RejectedEnvelope( + s"Replay failed, after rejected envelope from $source, persistenceId [$pid], seqNr [$seqNr], due to unexpected sequence number.") + case OffsetPidSeqNr(_, None) => + throw new RejectedEnvelope( + s"Replay failed, after rejected envelope from $source, due to unexpected sequence number.") + } + } + @nowarn("msg=never used") abstract class AdaptedHandler[E](val delegate: Handler[E])(implicit ec: ExecutionContext, system: ActorSystem[_]) extends Handler[E] {