From f172c54b9b80dd3dee3ea05ac0589ffea9396aa2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 12:13:47 +0100 Subject: [PATCH 01/28] fix: DynamoDB Opt-in to replay rejected events --- .../internal/DynamoDBProjectionImpl.scala | 69 ++++++++++++++++++- .../javadsl/EventSourcedProvider.scala | 19 ++++- .../scaladsl/EventSourcedProvider.scala | 32 ++++++++- 3 files changed, 116 insertions(+), 4 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index a60d574df..5effd1b2a 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -37,6 +37,7 @@ import akka.projection.StatusObserver import akka.projection.dynamodb.DynamoDBProjectionSettings import akka.projection.dynamodb.internal.DynamoDBOffsetStore.RejectedEnvelope import akka.projection.dynamodb.scaladsl.DynamoDBTransactHandler +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.internal.ActorHandlerInit import akka.projection.internal.AtLeastOnce import akka.projection.internal.AtMostOnce @@ -202,9 +203,10 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map(_ => Done)( + ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) } @@ -420,6 +422,69 @@ private[projection] object DynamoDBProjectionImpl { } } + private def replayIfPossible[Offset, Envelope]( + sourceProvider: SourceProvider[Offset, Envelope], + offsetStore: DynamoDBOffsetStore, + originalEnvelope: Envelope, + handler: Handler[Envelope])(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = { + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + sourceProvider match { + // FIXME config to make this case opt in + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import DynamoDBOffsetStore.Validation._ + offsetStore + .validate(envelope) + .flatMap { + case Accepted => + if (isFilteredEvent(envelope)) { + offsetStore.addInflight(envelope) + FutureDone + } else { + handler + .process(envelope.asInstanceOf[Envelope]) + .map { _ => + offsetStore.addInflight(envelope) + Done + } + } + case Duplicate => + FutureDone + case RejectedSeqNr => + throwRejectedEnvelope(sourceProvider, envelope.asInstanceOf[Envelope]) + case RejectedBacktrackingSeqNr => + throwRejectedEnvelope(sourceProvider, envelope.asInstanceOf[Envelope]) + } + } + .run() + .map(_ => true) + .recoverWith { exc => + log.warn( + "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr) + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case None => FutureFalse + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } + private def throwRejectedEnvelope[Offset, Envelope]( sourceProvider: SourceProvider[Offset, Envelope], envelope: Envelope): Nothing = { diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala index c67a849ac..563286e65 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala @@ -22,12 +22,14 @@ import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery import akka.persistence.query.javadsl.EventsByTagQuery import akka.persistence.query.javadsl.ReadJournal +import akka.persistence.query.typed.javadsl.CurrentEventsByPersistenceIdTypedQuery import akka.persistence.query.typed.javadsl.EventTimestampQuery import akka.persistence.query.typed.javadsl.EventsBySliceQuery import akka.persistence.query.typed.javadsl.EventsBySliceStartingFromSnapshotsQuery import akka.persistence.query.typed.javadsl.LoadEventQuery import akka.projection.BySlicesSourceProvider import akka.projection.eventsourced.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.internal.CanTriggerReplay import akka.projection.javadsl import akka.projection.javadsl.SourceProvider @@ -277,7 +279,8 @@ object EventSourcedProvider { extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] with BySlicesSourceProvider with EventTimestampQuerySourceProvider - with LoadEventQuerySourceProvider { + with LoadEventQuerySourceProvider + with LoadEventsByPersistenceIdSourceProvider[Event] { override def readJournal: ReadJournal = eventsBySlicesQuery @@ -296,6 +299,20 @@ object EventSourcedProvider { override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long = envelope.timestamp + /** + * INTERNAL API + */ + @InternalApi override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long) + : Option[akka.stream.scaladsl.Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = { + eventsBySlicesQuery match { + case q: CurrentEventsByPersistenceIdTypedQuery => + Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr).asScala) + case _ => None // not supported by this query + } + } } /** diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala index ecdd38985..6ebd02ba4 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala @@ -12,11 +12,13 @@ import scala.concurrent.Future import akka.NotUsed import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery import akka.persistence.query.scaladsl.EventsByTagQuery import akka.persistence.query.scaladsl.ReadJournal +import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery import akka.persistence.query.typed.scaladsl.EventTimestampQuery import akka.persistence.query.typed.scaladsl.EventsBySliceQuery import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery @@ -251,7 +253,8 @@ object EventSourcedProvider { extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] with BySlicesSourceProvider with EventTimestampQuerySourceProvider - with LoadEventQuerySourceProvider { + with LoadEventQuerySourceProvider + with LoadEventsByPersistenceIdSourceProvider[Event] { implicit val executionContext: ExecutionContext = system.executionContext override def readJournal: ReadJournal = eventsBySlicesQuery @@ -271,6 +274,19 @@ object EventSourcedProvider { override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long = envelope.timestamp + /** + * INTERNAL API + */ + @InternalApi override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = { + eventsBySlicesQuery match { + case q: CurrentEventsByPersistenceIdTypedQuery => + Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr)) + case _ => None // not supported by this query + } + } } private class EventsBySlicesStartingFromSnapshotsSourceProvider[Snapshot, Event]( @@ -341,4 +357,18 @@ object EventSourcedProvider { } } + /** + * INTERNAL API + */ + @InternalApi private[akka] trait LoadEventsByPersistenceIdSourceProvider[Event] { + + /** + * INTERNAL API + */ + @InternalApi private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] + } + } From 85b2450b71dce4f1cc0694c825c4b6fedcd7b458 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 13:13:33 +0100 Subject: [PATCH 02/28] better recover --- .../dynamodb/internal/DynamoDBProjectionImpl.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 5effd1b2a..1581ef316 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -458,9 +458,13 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - throwRejectedEnvelope(sourceProvider, envelope.asInstanceOf[Envelope]) + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") case RejectedBacktrackingSeqNr => - throwRejectedEnvelope(sourceProvider, envelope.asInstanceOf[Envelope]) + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") } } .run() @@ -470,8 +474,9 @@ private[projection] object DynamoDBProjectionImpl { "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", persistenceId, fromSeqNr, - originalEventEnvelope.sequenceNr) - triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + originalEventEnvelope.sequenceNr, + exc) + Future.failed(exc) } case None => FutureFalse } From 56b55525a255735eb0daf5d1755cbe268a00fc61 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 13:40:19 +0100 Subject: [PATCH 03/28] more providers --- .../javadsl/EventSourcedProvider.scala | 18 +++++++++++++++++- .../scaladsl/EventSourcedProvider.scala | 17 ++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala index 563286e65..b2b0e0d47 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala @@ -331,7 +331,8 @@ object EventSourcedProvider { extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] with BySlicesSourceProvider with EventTimestampQuerySourceProvider - with LoadEventQuerySourceProvider { + with LoadEventQuerySourceProvider + with LoadEventsByPersistenceIdSourceProvider[Event] { override def readJournal: ReadJournal = eventsBySlicesQuery @@ -355,6 +356,21 @@ object EventSourcedProvider { override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long = envelope.timestamp + /** + * INTERNAL API + */ + @InternalApi override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long) + : Option[akka.stream.scaladsl.Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = { + eventsBySlicesQuery match { + case q: CurrentEventsByPersistenceIdTypedQuery => + Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr).asScala) + case _ => None // not supported by this query + } + } + } /** diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala index 6ebd02ba4..7c8e5f06d 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala @@ -300,7 +300,8 @@ object EventSourcedProvider { extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] with BySlicesSourceProvider with EventTimestampQuerySourceProvider - with LoadEventQuerySourceProvider { + with LoadEventQuerySourceProvider + with LoadEventsByPersistenceIdSourceProvider[Event] { implicit val executionContext: ExecutionContext = system.executionContext override def readJournal: ReadJournal = eventsBySlicesQuery @@ -325,6 +326,20 @@ object EventSourcedProvider { override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long = envelope.timestamp + /** + * INTERNAL API + */ + @InternalApi override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = { + eventsBySlicesQuery match { + case q: CurrentEventsByPersistenceIdTypedQuery => + Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr)) + case _ => None // not supported by this query + } + } + } private trait EventTimestampQuerySourceProvider extends EventTimestampQuery { From 84244507dd500646ceb2fc83b6fb8a2fdaac3734 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 18:09:13 +0100 Subject: [PATCH 04/28] bump: akka-persistence-dynamodb 2.0.3 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a12453044..1ebe51dfc 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,7 +40,7 @@ object Dependencies { case Seq(major, minor, _*) => s"$major.$minor" } - val AkkaPersistenceDynamodb = "2.0.1" + val AkkaPersistenceDynamodb = "2.0.3" val AkkaPersistenceDynamodbVersionInDocs = VersionNumber(AkkaPersistenceDynamodb).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } From cb9063c729c53dafec94920338bcd9682c079b90 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 18:09:33 +0100 Subject: [PATCH 05/28] test --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 71 +++++++++++++++++-- .../TestSourceProviderWithInput.scala | 26 ++++++- 2 files changed, 91 insertions(+), 6 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index d9fe789e7..ce7a3a620 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -52,6 +52,7 @@ import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Pid import akka.projection.dynamodb.internal.DynamoDBOffsetStore.SeqNr import akka.projection.dynamodb.scaladsl.DynamoDBProjection import akka.projection.dynamodb.scaladsl.DynamoDBTransactHandler +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.scaladsl.Handler import akka.projection.scaladsl.SourceProvider import akka.projection.testkit.scaladsl.ProjectionTestKit @@ -104,11 +105,13 @@ object DynamoDBTimestampOffsetProjectionSpec { class TestTimestampSourceProvider( envelopes: immutable.IndexedSeq[EventEnvelope[String]], testSourceProvider: TestSourceProvider[Offset, EventEnvelope[String]], - override val maxSlice: Int) + override val maxSlice: Int, + enableCurrentEventsByPersistenceId: Boolean) extends SourceProvider[Offset, EventEnvelope[String]] with BySlicesSourceProvider with EventTimestampQuery - with LoadEventQuery { + with LoadEventQuery + with LoadEventsByPersistenceIdSourceProvider[String] { override def source(offset: () => Future[Option[Offset]]): Future[Source[EventEnvelope[String], NotUsed]] = testSourceProvider.source(offset) @@ -142,6 +145,18 @@ object DynamoDBTimestampOffsetProjectionSpec { s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found.")) } } + + override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = { + if (enableCurrentEventsByPersistenceId) + Some(Source(envelopes.filter { env => + env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr + })) + else + None + } } // test model is as simple as a text that gets other string concatenated to it @@ -282,6 +297,15 @@ class DynamoDBTimestampOffsetProjectionSpec def createSourceProvider( envelopes: immutable.IndexedSeq[EventEnvelope[String]], complete: Boolean = true): TestTimestampSourceProvider = { + createSourceProviderWithMoreEnvelopes(envelopes, envelopes, enableCurrentEventsByPersistenceId = false, complete) + } + + // envelopes are emitted by the "query" source, but allEnvelopes can be loaded + def createSourceProviderWithMoreEnvelopes( + envelopes: immutable.IndexedSeq[EventEnvelope[String]], + allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]], + enableCurrentEventsByPersistenceId: Boolean, + complete: Boolean = true): TestTimestampSourceProvider = { val sp = TestSourceProvider[Offset, EventEnvelope[String]](Source(envelopes), _.offset) .withStartSourceFrom { @@ -294,7 +318,11 @@ class DynamoDBTimestampOffsetProjectionSpec } .withAllowCompletion(complete) - new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1) + new TestTimestampSourceProvider( + allEnvelopes, + sp, + persistenceExt.numberOfSlices - 1, + enableCurrentEventsByPersistenceId) } def createBacktrackingSourceProvider( @@ -304,7 +332,11 @@ class DynamoDBTimestampOffsetProjectionSpec TestSourceProvider[Offset, EventEnvelope[String]](Source(envelopes), _.offset) .withStartSourceFrom { (_, _) => false } // include all .withAllowCompletion(complete) - new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1) + new TestTimestampSourceProvider( + envelopes, + sp, + persistenceExt.numberOfSlices - 1, + enableCurrentEventsByPersistenceId = false) } private def latestOffsetShouldBe(expected: Any)(implicit offsetStore: DynamoDBOffsetStore) = { @@ -857,6 +889,37 @@ class DynamoDBTimestampOffsetProjectionSpec latestOffsetShouldBe(envelopes.last.offset) } } + + "replay rejected sequence numbers" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val projectionRef = spawn( + ProjectionBehavior(DynamoDBProjection + .atLeastOnce(projectionId, Some(settings), sourceProvider, handler = () => new ConcatHandler(repository)))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1) + projectedValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } } "A DynamoDB exactly-once projection with TimestampOffset" must { diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestSourceProviderWithInput.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestSourceProviderWithInput.scala index 117bdd879..5f0b73758 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestSourceProviderWithInput.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/TestSourceProviderWithInput.scala @@ -23,15 +23,19 @@ import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.EventTimestampQuery import akka.persistence.query.typed.scaladsl.LoadEventQuery import akka.projection.BySlicesSourceProvider +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.scaladsl.SourceProvider import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source -class TestSourceProviderWithInput()(implicit val system: ActorSystem[_]) +class TestSourceProviderWithInput(enableCurrentEventsByPersistenceId: Boolean)(implicit val system: ActorSystem[_]) extends SourceProvider[TimestampOffset, EventEnvelope[String]] with BySlicesSourceProvider with EventTimestampQuery - with LoadEventQuery { + with LoadEventQuery + with LoadEventsByPersistenceIdSourceProvider[String] { + + def this()(implicit system: ActorSystem[_]) = this(enableCurrentEventsByPersistenceId = false) private implicit val ec: ExecutionContext = system.executionContext private val persistenceExt = Persistence(system) @@ -96,4 +100,22 @@ class TestSourceProviderWithInput()(implicit val system: ActorSystem[_]) s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found.")) } } + + override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = { + if (enableCurrentEventsByPersistenceId) + Some( + Source( + envelopes + .iterator() + .asScala + .filter { env => + env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr + } + .toVector)) + else + None + } } From a882e015fdd43434c784b4ee3404ba9d23b3ced3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 18:11:29 +0100 Subject: [PATCH 06/28] verify that it finds all events --- .../internal/DynamoDBProjectionImpl.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 1581ef316..a7cc52f36 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -435,7 +435,8 @@ private[projection] object DynamoDBProjectionImpl { val persistenceId = originalEventEnvelope.persistenceId offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 - provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr) match { + val toSeqNr = originalEventEnvelope.sequenceNr + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource .mapAsync(1) { envelope => @@ -467,8 +468,23 @@ private[projection] object DynamoDBProjectionImpl { s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") } } - .run() - .map(_ => true) + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + if (count == expected) { + true + } else { + // it's expected to find all events, otherwise fail the replay attempt + log.warn( + "Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + false + } + } .recoverWith { exc => log.warn( "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", From 52817891442755e0fa9cdc4cf9fc02ce120f18cb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 20:58:42 +0100 Subject: [PATCH 07/28] unrelated test fail --- .../dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index ce7a3a620..c102c785a 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -1176,6 +1176,7 @@ class DynamoDBTimestampOffsetProjectionSpec offsetShouldBeEmpty() projectionTestKit.run(projection) { projectedTestValueShouldBe("e1|e2|e5") + offsetStore.storedSeqNr(pid).futureValue shouldBe 6 } latestOffsetShouldBe(envelopes.last.offset) } From df900913d601b7ff3b3c1ea8ef1f5d555c7281c1 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 16:28:47 +1300 Subject: [PATCH 08/28] add test for clock skew on event writes --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index c102c785a..165cc5431 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -296,8 +296,9 @@ class DynamoDBTimestampOffsetProjectionSpec def createSourceProvider( envelopes: immutable.IndexedSeq[EventEnvelope[String]], + enableCurrentEventsByPersistenceId: Boolean = false, complete: Boolean = true): TestTimestampSourceProvider = { - createSourceProviderWithMoreEnvelopes(envelopes, envelopes, enableCurrentEventsByPersistenceId = false, complete) + createSourceProviderWithMoreEnvelopes(envelopes, envelopes, enableCurrentEventsByPersistenceId, complete) } // envelopes are emitted by the "query" source, but allEnvelopes can be loaded @@ -920,6 +921,59 @@ class DynamoDBTimestampOffsetProjectionSpec } projectionRef ! ProjectionBehavior.Stop } + + "replay rejected sequence numbers due to clock skew on event write" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val projectionRef = spawn( + ProjectionBehavior(DynamoDBProjection + .atLeastOnce(projectionId, Some(settings), sourceProvider, handler = () => new ConcatHandler(repository)))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + latestOffsetShouldBe(envelopes.last.offset) + } + + projectionRef ! ProjectionBehavior.Stop + } } "A DynamoDB exactly-once projection with TimestampOffset" must { From 5984826e8b161478cfb69f8949c3c7bd1082bc0b Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 18:45:51 +1300 Subject: [PATCH 09/28] move replayIfPossible to adapted handler implementation --- .../internal/DynamoDBProjectionImpl.scala | 170 +++++++++--------- 1 file changed, 83 insertions(+), 87 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index a7cc52f36..1a9b23dd6 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -203,15 +203,95 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map(_ => Done)( - ExecutionContext.parasitic) + replayIfPossible(envelope).map(_ => Done)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map { + replayIfPossible(envelope).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) } } } + + private def replayIfPossible( + originalEnvelope: Envelope)(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = { + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + sourceProvider match { + // FIXME config to make this case opt in + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + val toSeqNr = originalEventEnvelope.sequenceNr + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import DynamoDBOffsetStore.Validation._ + offsetStore + .validate(envelope) + .flatMap { + case Accepted => + if (isFilteredEvent(envelope)) { + offsetStore.addInflight(envelope) + FutureDone + } else { + delegate + .process(envelope.asInstanceOf[Envelope]) + .map { _ => + offsetStore.addInflight(envelope) + Done + } + } + case Duplicate => + FutureDone + case RejectedSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + case RejectedBacktrackingSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + } + } + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + if (count == expected) { + true + } else { + // it's expected to find all events, otherwise fail the replay attempt + log.warn( + "Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + false + } + } + .recoverWith { exc => + log.warn( + "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + exc) + Future.failed(exc) + } + case None => FutureFalse + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } } } @@ -422,90 +502,6 @@ private[projection] object DynamoDBProjectionImpl { } } - private def replayIfPossible[Offset, Envelope]( - sourceProvider: SourceProvider[Offset, Envelope], - offsetStore: DynamoDBOffsetStore, - originalEnvelope: Envelope, - handler: Handler[Envelope])(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = { - originalEnvelope match { - case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => - sourceProvider match { - // FIXME config to make this case opt in - case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] => - val persistenceId = originalEventEnvelope.persistenceId - offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => - val fromSeqNr = storedSeqNr + 1 - val toSeqNr = originalEventEnvelope.sequenceNr - provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { - case Some(querySource) => - querySource - .mapAsync(1) { envelope => - import DynamoDBOffsetStore.Validation._ - offsetStore - .validate(envelope) - .flatMap { - case Accepted => - if (isFilteredEvent(envelope)) { - offsetStore.addInflight(envelope) - FutureDone - } else { - handler - .process(envelope.asInstanceOf[Envelope]) - .map { _ => - offsetStore.addInflight(envelope) - Done - } - } - case Duplicate => - FutureDone - case RejectedSeqNr => - // this shouldn't happen - throw new RejectedEnvelope( - s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") - case RejectedBacktrackingSeqNr => - // this shouldn't happen - throw new RejectedEnvelope( - s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") - } - } - .runFold(0) { case (acc, _) => acc + 1 } - .map { count => - val expected = toSeqNr - fromSeqNr - if (count == expected) { - true - } else { - // it's expected to find all events, otherwise fail the replay attempt - log.warn( - "Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", - count, - expected, - persistenceId, - fromSeqNr, - toSeqNr) - false - } - } - .recoverWith { exc => - log.warn( - "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", - persistenceId, - fromSeqNr, - originalEventEnvelope.sequenceNr, - exc) - Future.failed(exc) - } - case None => FutureFalse - } - } - - case _ => - triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) - } - case _ => - FutureFalse // no replay support for non typed envelopes - } - } - private def throwRejectedEnvelope[Offset, Envelope]( sourceProvider: SourceProvider[Offset, Envelope], envelope: Envelope): Nothing = { From 5d9dcb81994b34bf9bddc6cdfc80163c495b1e1a Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 19:02:13 +1300 Subject: [PATCH 10/28] add opt-in config setting for replay --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 19 ++++++++--- .../src/main/resources/reference.conf | 3 ++ .../dynamodb/DynamoDBProjectionSettings.scala | 32 +++++++++++++++---- .../internal/DynamoDBOffsetStore.scala | 2 +- .../internal/DynamoDBProjectionImpl.scala | 4 +-- 5 files changed, 47 insertions(+), 13 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index 165cc5431..159c5743e 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -904,12 +904,18 @@ class DynamoDBTimestampOffsetProjectionSpec val sourceProvider = createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + implicit val offsetStore: DynamoDBOffsetStore = new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) val projectionRef = spawn( - ProjectionBehavior(DynamoDBProjection - .atLeastOnce(projectionId, Some(settings), sourceProvider, handler = () => new ConcatHandler(repository)))) + ProjectionBehavior( + DynamoDBProjection + .atLeastOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new ConcatHandler(repository)))) eventually { projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1) @@ -960,8 +966,13 @@ class DynamoDBTimestampOffsetProjectionSpec new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) val projectionRef = spawn( - ProjectionBehavior(DynamoDBProjection - .atLeastOnce(projectionId, Some(settings), sourceProvider, handler = () => new ConcatHandler(repository)))) + ProjectionBehavior( + DynamoDBProjection + .atLeastOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new ConcatHandler(repository)))) eventually { projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) diff --git a/akka-projection-dynamodb/src/main/resources/reference.conf b/akka-projection-dynamodb/src/main/resources/reference.conf index ef1047d41..b32d13c87 100644 --- a/akka-projection-dynamodb/src/main/resources/reference.conf +++ b/akka-projection-dynamodb/src/main/resources/reference.conf @@ -44,6 +44,9 @@ akka.projection.dynamodb { } } + # Replay missed events for a particular persistence id when a sequence number is rejected by validation. + replay-on-rejected-sequence-numbers = off + # By default it shares DynamoDB client with akka-persistence-dynamodb (write side). # To use a separate client for projections this can be # set to another config path that defines the config based on diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala index d97278e5d..020cef036 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala @@ -48,7 +48,8 @@ object DynamoDBProjectionSettings { offsetBatchSize = config.getInt("offset-store.offset-batch-size"), offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"), timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")), - retrySettings = RetrySettings(config.getConfig("offset-store.retries"))) + retrySettings = RetrySettings(config.getConfig("offset-store.retries")), + replayOnRejectedSequenceNumbers = config.getBoolean("replay-on-rejected-sequence-numbers")) } /** @@ -72,7 +73,8 @@ final class DynamoDBProjectionSettings private ( val offsetBatchSize: Int, val offsetSliceReadParallelism: Int, val timeToLiveSettings: TimeToLiveSettings, - val retrySettings: RetrySettings) { + val retrySettings: RetrySettings, + val replayOnRejectedSequenceNumbers: Boolean) { // 25 is a hard limit of batch writes in DynamoDB require(offsetBatchSize <= 25, s"offset-batch-size must be <= 25, was [$offsetBatchSize]") @@ -122,6 +124,9 @@ final class DynamoDBProjectionSettings private ( def withRetrySettings(retrySettings: RetrySettings): DynamoDBProjectionSettings = copy(retrySettings = retrySettings) + def withReplayOnRejectedSequenceNumbers(replayOnRejectedSequenceNumbers: Boolean): DynamoDBProjectionSettings = + copy(replayOnRejectedSequenceNumbers = replayOnRejectedSequenceNumbers) + @nowarn("msg=deprecated") private def copy( timestampOffsetTable: String = timestampOffsetTable, @@ -132,7 +137,8 @@ final class DynamoDBProjectionSettings private ( offsetBatchSize: Int = offsetBatchSize, offsetSliceReadParallelism: Int = offsetSliceReadParallelism, timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings, - retrySettings: RetrySettings = retrySettings) = + retrySettings: RetrySettings = retrySettings, + replayOnRejectedSequenceNumbers: Boolean = replayOnRejectedSequenceNumbers) = new DynamoDBProjectionSettings( timestampOffsetTable, useClient, @@ -144,10 +150,24 @@ final class DynamoDBProjectionSettings private ( offsetBatchSize, offsetSliceReadParallelism, timeToLiveSettings, - retrySettings) + retrySettings, + replayOnRejectedSequenceNumbers) - override def toString = - s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $backtrackingWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)" + @nowarn("msg=deprecated") + override def toString: String = + s"DynamoDBProjectionSettings(" + + s"timestampOffsetTable=$timestampOffsetTable, " + + s"useClient=$useClient, " + + s"timeWindow=$timeWindow, " + + s"backtrackingWindow=$backtrackingWindow, " + + s"keepNumberOfEntries=$keepNumberOfEntries, " + + s"evictInterval=$evictInterval, " + + s"warnAboutFilteredEventsInFlow=$warnAboutFilteredEventsInFlow, " + + s"offsetBatchSize=$offsetBatchSize, " + + s"offsetSliceReadParallelism=$offsetSliceReadParallelism, " + + s"timeToLiveSettings=$timeToLiveSettings, " + + s"retrySettings=$retrySettings, " + + s"replayOnRejectedSequenceNumbers=$replayOnRejectedSequenceNumbers)" } object TimeToLiveSettings { diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 932893e55..946e443d3 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -203,7 +203,7 @@ private[projection] class DynamoDBOffsetStore( projectionId: ProjectionId, sourceProvider: Option[BySlicesSourceProvider], system: ActorSystem[_], - settings: DynamoDBProjectionSettings, + val settings: DynamoDBProjectionSettings, client: DynamoDbAsyncClient, clock: Clock = Clock.systemUTC()) { diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 1a9b23dd6..bf8284e83 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -217,8 +217,8 @@ private[projection] object DynamoDBProjectionImpl { originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { - // FIXME config to make this case opt in - case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] => + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] + if offsetStore.settings.replayOnRejectedSequenceNumbers => val persistenceId = originalEventEnvelope.persistenceId offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 From 05e65ecb6036ee24bcf3b124aec5942d7b7cc7a5 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 19:20:35 +1300 Subject: [PATCH 11/28] throw on replay failed, fix off-by-one, add offset store log prefix --- .../internal/DynamoDBOffsetStore.scala | 2 +- .../internal/DynamoDBProjectionImpl.scala | 31 +++++++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 946e443d3..2d8ae583f 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -217,7 +217,7 @@ private[projection] class DynamoDBOffsetStore( } private val logger = LoggerFactory.getLogger(this.getClass) - private val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:" + val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:" private val dao = new OffsetStoreDao(system, settings, projectionId, client) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index bf8284e83..ee09de564 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -203,12 +203,15 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - replayIfPossible(envelope).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible(envelope).map { + case true => Done + case false => throwRejectedEnvelopeAfterFailedReplay("query", sourceProvider, envelope) + }(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => replayIfPossible(envelope).map { case true => Done - case false => throwRejectedEnvelope(sourceProvider, envelope) - } + case false => throwRejectedEnvelopeAfterFailedReplay("backtracking", sourceProvider, envelope) + }(ExecutionContext.parasitic) } } @@ -257,13 +260,14 @@ private[projection] object DynamoDBProjectionImpl { } .runFold(0) { case (acc, _) => acc + 1 } .map { count => - val expected = toSeqNr - fromSeqNr + val expected = toSeqNr - fromSeqNr + 1 if (count == expected) { true } else { // it's expected to find all events, otherwise fail the replay attempt log.warn( - "Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, count, expected, persistenceId, @@ -274,7 +278,8 @@ private[projection] object DynamoDBProjectionImpl { } .recoverWith { exc => log.warn( - "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr, @@ -514,6 +519,20 @@ private[projection] object DynamoDBProjectionImpl { } } + private def throwRejectedEnvelopeAfterFailedReplay[Offset, Envelope]( + source: String, + sourceProvider: SourceProvider[Offset, Envelope], + envelope: Envelope): Nothing = { + extractOffsetPidSeqNr(sourceProvider, envelope) match { + case OffsetPidSeqNr(_, Some((pid, seqNr))) => + throw new RejectedEnvelope( + s"Replay failed, after rejected envelope from $source, persistenceId [$pid], seqNr [$seqNr], due to unexpected sequence number.") + case OffsetPidSeqNr(_, None) => + throw new RejectedEnvelope( + s"Replay failed, after rejected envelope from $source, due to unexpected sequence number.") + } + } + @nowarn("msg=never used") abstract class AdaptedHandler[E](val delegate: Handler[E])(implicit ec: ExecutionContext, system: ActorSystem[_]) extends Handler[E] { From 3bd2dc825bcf6294ac1ba1894275803f9c85634c Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 19:28:48 +1300 Subject: [PATCH 12/28] only fail replay for queries on missing replay (unexpected count) --- .../dynamodb/internal/DynamoDBProjectionImpl.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index ee09de564..66e78798a 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -203,20 +203,18 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - replayIfPossible(envelope).map { - case true => Done - case false => throwRejectedEnvelopeAfterFailedReplay("query", sourceProvider, envelope) - }(ExecutionContext.parasitic) + replayIfPossible("query", envelope).map(_ => Done)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - replayIfPossible(envelope).map { + replayIfPossible("backtracking", envelope).map { case true => Done case false => throwRejectedEnvelopeAfterFailedReplay("backtracking", sourceProvider, envelope) }(ExecutionContext.parasitic) } } - private def replayIfPossible( - originalEnvelope: Envelope)(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = { + private def replayIfPossible(source: String, originalEnvelope: Envelope)( + implicit ec: ExecutionContext, + system: ActorSystem[_]): Future[Boolean] = { originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -273,7 +271,7 @@ private[projection] object DynamoDBProjectionImpl { persistenceId, fromSeqNr, toSeqNr) - false + throwRejectedEnvelopeAfterFailedReplay(source, sourceProvider, originalEnvelope) } } .recoverWith { exc => From 886fba0582842d4e36f6b682c2d766d4db39a235 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 20:21:40 +1300 Subject: [PATCH 13/28] add initial replay and tests for at-least-once grouped --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 120 ++++++++++++++++++ .../internal/DynamoDBProjectionImpl.scala | 105 ++++++++++++++- 2 files changed, 222 insertions(+), 3 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index 159c5743e..d2e2931c9 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -1565,6 +1565,126 @@ class DynamoDBTimestampOffsetProjectionSpec latestOffsetShouldBe(envelopes.last.offset) } } + + "replay rejected sequence numbers for at-least-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val results = new ConcurrentHashMap[String, String]() + + val handler: Handler[Seq[EventEnvelope[String]]] = + (envelopes: Seq[EventEnvelope[String]]) => { + Future { + envelopes.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done) + } + + val projection = + DynamoDBProjection + .atLeastOnceGroupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => handler) + .withGroup(2, 3.seconds) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|" + results.get(pid2) shouldBe "|e1|e2|e3|" + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + } + + "replay rejected sequence numbers due to clock skew on event write for at-least-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val results = new ConcurrentHashMap[String, String]() + + val handler: Handler[Seq[EventEnvelope[String]]] = + (envelopes: Seq[EventEnvelope[String]]) => { + Future { + envelopes.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done) + } + + val projection = + DynamoDBProjection + .atLeastOnceGroupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => handler) + .withGroup(2, 3.seconds) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + results.get(pid2) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + } } "A DynamoDB flow projection with TimestampOffset" must { diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 66e78798a..34964e3a1 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -207,7 +207,7 @@ private[projection] object DynamoDBProjectionImpl { case RejectedBacktrackingSeqNr => replayIfPossible("backtracking", envelope).map { case true => Done - case false => throwRejectedEnvelopeAfterFailedReplay("backtracking", sourceProvider, envelope) + case false => throwRejectedEnvelope(sourceProvider, envelope) }(ExecutionContext.parasitic) } } @@ -224,6 +224,12 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr + log.debug( + s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].", + offsetStore.logPrefix, + persistenceId, + fromSeqNr, + toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -406,9 +412,9 @@ private[projection] object DynamoDBProjectionImpl { val replayDone = Future.sequence(isAcceptedEnvelopes.map { case (env, RejectedSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible("query", env).map(_ => Done)(ExecutionContext.parasitic) case (env, RejectedBacktrackingSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map { + replayIfPossible("backtracking", env).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, env) } @@ -441,6 +447,99 @@ private[projection] object DynamoDBProjectionImpl { } } } + + private def replayIfPossible(source: String, originalEnvelope: Envelope)( + implicit ec: ExecutionContext, + system: ActorSystem[_]): Future[Boolean] = { + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + sourceProvider match { + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] + if offsetStore.settings.replayOnRejectedSequenceNumbers => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + val toSeqNr = originalEventEnvelope.sequenceNr + log.debug( + s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].", + offsetStore.logPrefix, + persistenceId, + fromSeqNr, + toSeqNr) + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import DynamoDBOffsetStore.Validation._ + offsetStore + .validate(envelope) + .flatMap { + case Accepted => + // FIXME: should we be grouping the replayed envelopes? based on what? + loadEnvelope(envelope.asInstanceOf[Envelope], sourceProvider).flatMap { + loadedEnvelope => + val offset = extractOffsetPidSeqNr(sourceProvider, loadedEnvelope) + if (isFilteredEvent(loadedEnvelope)) { + offsetStore.saveOffset(offset) + } else { + delegate + .process(Seq(loadedEnvelope)) + .flatMap { _ => + offsetStore.saveOffset(offset) + } + } + } + case Duplicate => + FutureDone + case RejectedSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + case RejectedBacktrackingSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + } + } + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + 1 + if (count == expected) { + true + } else { + // it's expected to find all events, otherwise fail the replay attempt + log.warn( + "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + throwRejectedEnvelopeAfterFailedReplay(source, sourceProvider, originalEnvelope) + } + } + .recoverWith { exc => + log.warn( + "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + exc) + Future.failed(exc) + } + case None => FutureFalse + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } } } From fa7f1d7cfda81d6b0887fb2ef67329605ac668ac Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 20:49:31 +1300 Subject: [PATCH 14/28] remove unnecessary implicits --- .../dynamodb/internal/DynamoDBProjectionImpl.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 34964e3a1..a66408c6f 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -212,9 +212,7 @@ private[projection] object DynamoDBProjectionImpl { } } - private def replayIfPossible(source: String, originalEnvelope: Envelope)( - implicit ec: ExecutionContext, - system: ActorSystem[_]): Future[Boolean] = { + private def replayIfPossible(source: String, originalEnvelope: Envelope): Future[Boolean] = { originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -448,9 +446,7 @@ private[projection] object DynamoDBProjectionImpl { } } - private def replayIfPossible(source: String, originalEnvelope: Envelope)( - implicit ec: ExecutionContext, - system: ActorSystem[_]): Future[Boolean] = { + private def replayIfPossible(source: String, originalEnvelope: Envelope): Future[Boolean] = { originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { From 67fa7def0272ce96489c6b1bea89e4315d289612 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 21:09:56 +1300 Subject: [PATCH 15/28] add initial replay implementation and tests for flow handler --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 107 ++++++++++++++++++ .../internal/DynamoDBProjectionImpl.scala | 100 +++++++++++++++- 2 files changed, 205 insertions(+), 2 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index d2e2931c9..26cca873a 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -1830,6 +1830,113 @@ class DynamoDBTimestampOffsetProjectionSpec latestOffsetShouldBe(envelopes.last.offset) } } + + "replay rejected sequence numbers for flow projection" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + offsetShouldBeEmpty() + + val flowHandler = + FlowWithContext[EventEnvelope[String], ProjectionContext] + .mapAsync(1) { env => + repository.concatToText(env.persistenceId, env.event) + } + + val projection = + DynamoDBProjection + .atLeastOnceFlow( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = flowHandler) + .withSaveOffset(1, 1.minute) + + projectionTestKit.run(projection) { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1) + projectedValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + } + + "replay rejected sequence numbers due to clock skew for flow projection" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + offsetShouldBeEmpty() + + val flowHandler = + FlowWithContext[EventEnvelope[String], ProjectionContext] + .mapAsync(1) { env => + repository.concatToText(env.persistenceId, env.event) + } + + val projection = + DynamoDBProjection + .atLeastOnceFlow( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = flowHandler) + .withSaveOffset(1, 1.minute) + + projectionTestKit.run(projection) { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + } } // FIXME see more tests in R2dbcTimestampOffsetProjectionSpec diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index a66408c6f..7a5070708 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -548,6 +548,102 @@ private[projection] object DynamoDBProjectionImpl { system: ActorSystem[_]): FlowWithContext[Envelope, ProjectionContext, Done, ProjectionContext, _] = { import DynamoDBOffsetStore.Validation._ implicit val ec: ExecutionContext = system.executionContext + + def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + sourceProvider match { + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] + if offsetStore.settings.replayOnRejectedSequenceNumbers => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + val toSeqNr = originalEventEnvelope.sequenceNr + log.debug( + s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].", + offsetStore.logPrefix, + persistenceId, + fromSeqNr, + toSeqNr) + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import DynamoDBOffsetStore.Validation._ + offsetStore + .validate(envelope) + .flatMap { + case Accepted => + if (isFilteredEvent(envelope) && settings.warnAboutFilteredEventsInFlow) { + log.info( + "atLeastOnceFlow doesn't support skipping envelopes. Envelope [{}] still emitted.", + envelope) + } + loadEnvelope(envelope.asInstanceOf[Envelope], sourceProvider).map { loadedEnvelope => + offsetStore.addInflight(loadedEnvelope) + Some(loadedEnvelope) + } + case Duplicate => + Future.successful(None) + case RejectedSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + case RejectedBacktrackingSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + } + } + .collect { + case Some(env) => + // FIXME: should we supply a projection context? + // FIXME: add projection telemetry to all replays? (with a new envelope source?) + env -> ProjectionContextImpl(sourceProvider.extractOffset(env), env, null) + } + .via(handler.asFlow) + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + 1 + if (count == expected) { + true + } else { + // FIXME: filtered envelopes are not passed through, so we can't expect all to be replayed here + // and handler could also filter out envelopes + log.warn( + "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + // throwRejectedEnvelopeAfterFailedReplay(source, sourceProvider, originalEnvelope) + true + } + } + .recoverWith { exc => + log.warn( + "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + exc) + Future.failed(exc) + } + case None => FutureFalse + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } + FlowWithContext[Envelope, ProjectionContext] .mapAsync(1) { env => offsetStore @@ -564,9 +660,9 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => Future.successful(None) case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => None)(ExecutionContext.parasitic) + replayIfPossible(env).map(_ => None)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map { + replayIfPossible(env).map { case true => None case false => throwRejectedEnvelope(sourceProvider, env) } From 257639068a8dc640f291c5c4aff4152505fb1571 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 22:36:00 +1300 Subject: [PATCH 16/28] downgrade expected replay count warning to debug for flow handler --- .../projection/dynamodb/internal/DynamoDBProjectionImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 7a5070708..76c81a3dd 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -610,7 +610,7 @@ private[projection] object DynamoDBProjectionImpl { } else { // FIXME: filtered envelopes are not passed through, so we can't expect all to be replayed here // and handler could also filter out envelopes - log.warn( + log.debug( "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", offsetStore.logPrefix, count, From df0217fd9785da752a2384df36013768d1fea2f7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 11:12:16 +0100 Subject: [PATCH 17/28] cleanup envelope source, log --- .../internal/DynamoDBProjectionImpl.scala | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 76c81a3dd..8834b68d6 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -17,6 +17,7 @@ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.event.Logging import akka.event.LoggingAdapter +import akka.persistence.dynamodb.internal.EnvelopeOrigin import akka.persistence.query.DeletedDurableState import akka.persistence.query.DurableStateChange import akka.persistence.query.UpdatedDurableState @@ -203,16 +204,16 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - replayIfPossible("query", envelope).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible(envelope).map(_ => Done)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - replayIfPossible("backtracking", envelope).map { + replayIfPossible(envelope).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) }(ExecutionContext.parasitic) } } - private def replayIfPossible(source: String, originalEnvelope: Envelope): Future[Boolean] = { + private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -275,7 +276,7 @@ private[projection] object DynamoDBProjectionImpl { persistenceId, fromSeqNr, toSeqNr) - throwRejectedEnvelopeAfterFailedReplay(source, sourceProvider, originalEnvelope) + throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => @@ -410,9 +411,9 @@ private[projection] object DynamoDBProjectionImpl { val replayDone = Future.sequence(isAcceptedEnvelopes.map { case (env, RejectedSeqNr) => - replayIfPossible("query", env).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible(env).map(_ => Done)(ExecutionContext.parasitic) case (env, RejectedBacktrackingSeqNr) => - replayIfPossible("backtracking", env).map { + replayIfPossible(env).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, env) } @@ -446,7 +447,7 @@ private[projection] object DynamoDBProjectionImpl { } } - private def replayIfPossible(source: String, originalEnvelope: Envelope): Future[Boolean] = { + private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -512,7 +513,7 @@ private[projection] object DynamoDBProjectionImpl { persistenceId, fromSeqNr, toSeqNr) - throwRejectedEnvelopeAfterFailedReplay(source, sourceProvider, originalEnvelope) + throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => @@ -709,9 +710,9 @@ private[projection] object DynamoDBProjectionImpl { } private def throwRejectedEnvelopeAfterFailedReplay[Offset, Envelope]( - source: String, sourceProvider: SourceProvider[Offset, Envelope], envelope: Envelope): Nothing = { + val source = envelopeSourceName(envelope) extractOffsetPidSeqNr(sourceProvider, envelope) match { case OffsetPidSeqNr(_, Some((pid, seqNr))) => throw new RejectedEnvelope( @@ -722,6 +723,18 @@ private[projection] object DynamoDBProjectionImpl { } } + private def envelopeSourceName[Envelope](envelope: Envelope): String = { + envelope match { + case env: EventEnvelope[Any @unchecked] => + if (EnvelopeOrigin.fromQuery(env)) "query" + else if (EnvelopeOrigin.fromPubSub(env)) "pubsub" + else if (EnvelopeOrigin.fromBacktracking(env)) "backtracking" + else if (EnvelopeOrigin.fromSnapshot(env)) "snapshot" + else env.source + case _ => "unknown" + } + } + @nowarn("msg=never used") abstract class AdaptedHandler[E](val delegate: Handler[E])(implicit ec: ExecutionContext, system: ActorSystem[_]) extends Handler[E] { From 5bdf207795a9232ccd4ce7bc143bfaf8887fdf97 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 11:49:06 +0100 Subject: [PATCH 18/28] replay sequentially in grouped at-least-once --- .../internal/DynamoDBProjectionImpl.scala | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 8834b68d6..b84a03e82 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -408,18 +408,23 @@ private[projection] object DynamoDBProjectionImpl { override def process(envelopes: Seq[Envelope]): Future[Done] = { import DynamoDBOffsetStore.Validation._ offsetStore.validateAll(envelopes).flatMap { isAcceptedEnvelopes => + // FIXME improvement would be to only replay the last for each pid val replayDone = - Future.sequence(isAcceptedEnvelopes.map { - case (env, RejectedSeqNr) => - replayIfPossible(env).map(_ => Done)(ExecutionContext.parasitic) - case (env, RejectedBacktrackingSeqNr) => - replayIfPossible(env).map { - case true => Done - case false => throwRejectedEnvelope(sourceProvider, env) + isAcceptedEnvelopes.foldLeft(FutureDone) { + case (previous, (env, RejectedSeqNr)) => + previous.flatMap { _ => + replayIfPossible(env).map(_ => Done)(ExecutionContext.parasitic) } - case _ => - FutureDone - }) + case (previous, (env, RejectedBacktrackingSeqNr)) => + previous.flatMap { _ => + replayIfPossible(env).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, env) + } + } + case (previous, _) => + previous + } replayDone.flatMap { _ => val acceptedEnvelopes = isAcceptedEnvelopes.collect { @@ -473,17 +478,14 @@ private[projection] object DynamoDBProjectionImpl { .flatMap { case Accepted => // FIXME: should we be grouping the replayed envelopes? based on what? - loadEnvelope(envelope.asInstanceOf[Envelope], sourceProvider).flatMap { - loadedEnvelope => - val offset = extractOffsetPidSeqNr(sourceProvider, loadedEnvelope) - if (isFilteredEvent(loadedEnvelope)) { + val offset = extractOffsetPidSeqNr(sourceProvider, envelope.asInstanceOf[Envelope]) + if (isFilteredEvent(envelope)) { + offsetStore.saveOffset(offset) + } else { + delegate + .process(Seq(envelope.asInstanceOf[Envelope])) + .flatMap { _ => offsetStore.saveOffset(offset) - } else { - delegate - .process(Seq(loadedEnvelope)) - .flatMap { _ => - offsetStore.saveOffset(offset) - } } } case Duplicate => From 3e0425ed4697da8f21a59db8c26dc56562f80900 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 15:32:40 +0100 Subject: [PATCH 19/28] additional validation to avoid duplicates for grouped at-least-once --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 8 ++-- .../internal/DynamoDBProjectionImpl.scala | 41 +++++++++++++++---- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index 26cca873a..4c50838f7 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -1571,9 +1571,9 @@ class DynamoDBTimestampOffsetProjectionSpec val pid2 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() - val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3) val envelopes = allEnvelopes.filterNot { env => - (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) || + (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5 || env.sequenceNr == 7 || env.sequenceNr == 9)) || (env.persistenceId == pid2 && (env.sequenceNr == 1)) } @@ -1602,12 +1602,12 @@ class DynamoDBTimestampOffsetProjectionSpec Some(settings.withReplayOnRejectedSequenceNumbers(true)), sourceProvider, handler = () => handler) - .withGroup(2, 3.seconds) + .withGroup(8, 3.seconds) offsetShouldBeEmpty() projectionTestKit.run(projection) { - results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|" + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|e10|" results.get(pid2) shouldBe "|e1|e2|e3|" } diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index b84a03e82..350c518d8 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -427,16 +427,12 @@ private[projection] object DynamoDBProjectionImpl { } replayDone.flatMap { _ => - val acceptedEnvelopes = isAcceptedEnvelopes.collect { - case (env, Accepted) => - env - } - if (acceptedEnvelopes.isEmpty) { - FutureDone - } else { - Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { - loadedEnvelopes => + def processAcceptedEnvelopes(envelopes: Seq[Envelope]): Future[Done] = { + if (envelopes.isEmpty) { + FutureDone + } else { + Future.sequence(envelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { loadedEnvelopes => val offsets = loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector val filteredEnvelopes = loadedEnvelopes.filterNot(isFilteredEvent) if (filteredEnvelopes.isEmpty) { @@ -446,8 +442,35 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.saveOffsets(offsets) } } + } } } + + val acceptedEnvelopes = isAcceptedEnvelopes.collect { + case (env, Accepted) => + env + } + val hasRejected = + isAcceptedEnvelopes.exists { + case (_, RejectedSeqNr) => true + case (_, RejectedBacktrackingSeqNr) => true + case _ => false + } + + if (hasRejected) { + // need second validation after replay to remove duplicates + offsetStore + .validateAll(acceptedEnvelopes) + .flatMap { isAcceptedEnvelopes2 => + val acceptedEnvelopes2 = isAcceptedEnvelopes2.collect { + case (env, Accepted) => env + } + processAcceptedEnvelopes(acceptedEnvelopes2) + } + } else { + processAcceptedEnvelopes(acceptedEnvelopes) + } + } } } From 362e54348633a8249fd10c7d54d4e1b2ed2cd8d1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 15:43:56 +0100 Subject: [PATCH 20/28] no need to loadEnvelope of replayed --- .../dynamodb/internal/DynamoDBProjectionImpl.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 350c518d8..be43a509a 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -598,19 +598,17 @@ private[projection] object DynamoDBProjectionImpl { import DynamoDBOffsetStore.Validation._ offsetStore .validate(envelope) - .flatMap { + .map { case Accepted => if (isFilteredEvent(envelope) && settings.warnAboutFilteredEventsInFlow) { log.info( "atLeastOnceFlow doesn't support skipping envelopes. Envelope [{}] still emitted.", envelope) } - loadEnvelope(envelope.asInstanceOf[Envelope], sourceProvider).map { loadedEnvelope => - offsetStore.addInflight(loadedEnvelope) - Some(loadedEnvelope) - } + offsetStore.addInflight(envelope) + Some(envelope.asInstanceOf[Envelope]) case Duplicate => - Future.successful(None) + None case RejectedSeqNr => // this shouldn't happen throw new RejectedEnvelope( From fa031e1c289e9787a20964e7e34941e4ee2795b4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 15:51:05 +0100 Subject: [PATCH 21/28] minor test cleanup --- .../dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index 4c50838f7..75fe37b5b 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -897,8 +897,9 @@ class DynamoDBTimestampOffsetProjectionSpec val projectionId = genRandomProjectionId() val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L) val envelopes = allEnvelopes.filterNot { env => - (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) || + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || (env.persistenceId == pid2 && (env.sequenceNr == 1)) } @@ -1572,8 +1573,9 @@ class DynamoDBTimestampOffsetProjectionSpec val projectionId = genRandomProjectionId() val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L) val envelopes = allEnvelopes.filterNot { env => - (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5 || env.sequenceNr == 7 || env.sequenceNr == 9)) || + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || (env.persistenceId == pid2 && (env.sequenceNr == 1)) } @@ -1837,8 +1839,9 @@ class DynamoDBTimestampOffsetProjectionSpec val projectionId = genRandomProjectionId() val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L) val envelopes = allEnvelopes.filterNot { env => - (env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) || + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || (env.persistenceId == pid2 && (env.sequenceNr == 1)) } From c68d93de4c4b89b9eaae8567b6dd5febdbdf6d13 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 16:02:10 +0100 Subject: [PATCH 22/28] enable by default --- akka-projection-dynamodb/src/main/resources/reference.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-projection-dynamodb/src/main/resources/reference.conf b/akka-projection-dynamodb/src/main/resources/reference.conf index b32d13c87..d10181756 100644 --- a/akka-projection-dynamodb/src/main/resources/reference.conf +++ b/akka-projection-dynamodb/src/main/resources/reference.conf @@ -45,7 +45,7 @@ akka.projection.dynamodb { } # Replay missed events for a particular persistence id when a sequence number is rejected by validation. - replay-on-rejected-sequence-numbers = off + replay-on-rejected-sequence-numbers = on # By default it shares DynamoDB client with akka-persistence-dynamodb (write side). # To use a separate client for projections this can be From 673e606602f7452fc343af3f03ade204a183c7d4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 16:12:26 +0100 Subject: [PATCH 23/28] comments in validation --- .../dynamodb/internal/DynamoDBOffsetStore.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 2d8ae583f..c35a21712 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -605,10 +605,14 @@ private[projection] class DynamoDBOffsetStore( FutureAccepted } else if (!recordWithOffset.fromBacktracking) { logUnexpected() + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. FutureRejectedSeqNr } else { logUnexpected() - // This will result in projection restart (with normal configuration) + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. + // Otherwise this will result in projection restart (with normal configuration). FutureRejectedBacktrackingSeqNr } } else if (seqNr == 1) { @@ -618,6 +622,7 @@ private[projection] class DynamoDBOffsetStore( // always accept starting from snapshots when there was no previous event seen FutureAccepted } else { + println(s"# validateEventTimestamp $recordWithOffset") // FIXME validateEventTimestamp(currentState, recordWithOffset) } } else { @@ -682,6 +687,8 @@ private[projection] class DynamoDBOffsetStore( previousTimestamp, currentState.startTimestampBySlice(slice), settings.backtrackingWindow) + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. RejectedBacktrackingSeqNr } else { // This may happen rather frequently when using `publish-events`, after reconnecting and such. @@ -690,6 +697,8 @@ private[projection] class DynamoDBOffsetStore( seqNr, pid, recordWithOffset.offset) + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. // Backtracking will emit missed event again. RejectedSeqNr } From 2f237b9e1ff08a94f1432b42592100b2cccbf339 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 16:28:18 +0100 Subject: [PATCH 24/28] log warning every 1000 --- .../internal/DynamoDBOffsetStore.scala | 1 - .../internal/DynamoDBProjectionImpl.scala | 36 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index c35a21712..b09aa3dca 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -622,7 +622,6 @@ private[projection] class DynamoDBOffsetStore( // always accept starting from snapshots when there was no previous event seen FutureAccepted } else { - println(s"# validateEventTimestamp $recordWithOffset") // FIXME validateEventTimestamp(currentState, recordWithOffset) } } else { diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index be43a509a..a107b1370 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -87,6 +87,7 @@ private[projection] object DynamoDBProjectionImpl { } private val loadEnvelopeCounter = new AtomicLong + private val replayRejectedCounter = new AtomicLong def loadEnvelope[Envelope](env: Envelope, sourceProvider: SourceProvider[_, Envelope])( implicit @@ -223,12 +224,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - log.debug( - s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - toSeqNr) + logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -485,12 +481,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - log.debug( - s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - toSeqNr) + logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -585,12 +576,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - log.debug( - s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - toSeqNr) + logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -699,6 +685,20 @@ private[projection] object DynamoDBProjectionImpl { .via(handler) } + private def logReplayRejected( + offsetStore: DynamoDBOffsetStore, + persistenceId: String, + fromSeqNr: Long, + toSeqNr: Long): Unit = { + val msg = + "{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]." + val c = replayRejectedCounter.incrementAndGet() + if (c == 1 || c % 1000 == 0) + log.warn(msg, offsetStore.logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + else + log.debug(msg, offsetStore.logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + } + private def triggerReplayIfPossible[Offset, Envelope]( sourceProvider: SourceProvider[Offset, Envelope], offsetStore: DynamoDBOffsetStore, From ecf092c039d069b99e1413defd8b34466a45b961 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 16:52:50 +0100 Subject: [PATCH 25/28] exactly once --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 96 +++++++++++++++++++ .../internal/DynamoDBProjectionImpl.scala | 86 ++++++++++++++++- 2 files changed, 179 insertions(+), 3 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index 75fe37b5b..288c77588 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -1246,6 +1246,102 @@ class DynamoDBTimestampOffsetProjectionSpec } latestOffsetShouldBe(envelopes.last.offset) } + + "replay rejected sequence numbers" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val projectionRef = spawn( + ProjectionBehavior( + DynamoDBProjection + .exactlyOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new TransactConcatHandler))) + + eventually { + projectedTestValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1) + projectedTestValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } + + "replay rejected sequence numbers due to clock skew on event write" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val projectionRef = spawn( + ProjectionBehavior( + DynamoDBProjection + .exactlyOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new TransactConcatHandler))) + + eventually { + projectedTestValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedTestValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + latestOffsetShouldBe(envelopes.last.offset) + } + + projectionRef ! ProjectionBehavior.Stop + } } "A DynamoDB grouped projection with TimestampOffset" must { diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index a107b1370..ad9b516a8 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -328,14 +328,94 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible(envelope).map(_ => Done)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + replayIfPossible(envelope).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) - } + }(ExecutionContext.parasitic) } } + + private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + sourceProvider match { + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] + if offsetStore.settings.replayOnRejectedSequenceNumbers => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + val toSeqNr = originalEventEnvelope.sequenceNr + logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import DynamoDBOffsetStore.Validation._ + offsetStore + .validate(envelope) + .flatMap { + case Accepted => + val offset = extractOffsetPidSeqNr(sourceProvider, envelope.asInstanceOf[Envelope]) + if (isFilteredEvent(envelope)) { + offsetStore.saveOffset(offset) + } else { + delegate.process(envelope.asInstanceOf[Envelope]).flatMap { writeItems => + offsetStore.transactSaveOffset(writeItems, offset) + } + } + case Duplicate => + FutureDone + case RejectedSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + case RejectedBacktrackingSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + } + } + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + 1 + if (count == expected) { + true + } else { + // it's expected to find all events, otherwise fail the replay attempt + log.warn( + "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) + } + } + .recoverWith { exc => + log.warn( + "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + exc) + Future.failed(exc) + } + case None => FutureFalse + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } } } From ccfbf4b435aefd5aa966087638d92ad5f79dfc97 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 17:26:39 +0100 Subject: [PATCH 26/28] exactly once grouped --- ...ynamoDBTimestampOffsetProjectionSpec.scala | 101 +++++++++++ .../internal/DynamoDBProjectionImpl.scala | 158 +++++++++++++++--- 2 files changed, 237 insertions(+), 22 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index 288c77588..617a5ad29 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -1488,6 +1488,107 @@ class DynamoDBTimestampOffsetProjectionSpec latestOffsetShouldBe(envelopes.last.offset) } + "replay rejected sequence numbers for exactly-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val handlerProbe = createTestProbe[String]("calls-to-handler") + + val projectionRef = spawn( + ProjectionBehavior( + DynamoDBProjection + .exactlyOnceGroupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new TransactGroupedConcatHandler(handlerProbe.ref)) + .withGroup(8, 3.seconds))) + + eventually { + projectedTestValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9|e10")(pid1) + projectedTestValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } + + "replay rejected sequence numbers due to clock skew on event write for exactly-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val handlerProbe = createTestProbe[String]("calls-to-handler") + + val projectionRef = spawn( + ProjectionBehavior( + DynamoDBProjection + .exactlyOnceGroupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new TransactGroupedConcatHandler(handlerProbe.ref)) + .withGroup(8, 3.seconds))) + + eventually { + projectedTestValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedTestValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } + "handle at-least-once grouped projection" in { val pid1 = UUID.randomUUID().toString val pid2 = UUID.randomUUID().toString diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index ad9b516a8..a5f66f1ad 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -431,30 +431,32 @@ private[projection] object DynamoDBProjectionImpl { override def process(envelopes: Seq[Envelope]): Future[Done] = { import DynamoDBOffsetStore.Validation._ offsetStore.validateAll(envelopes).flatMap { isAcceptedEnvelopes => + // For simplicity we process the replayed envelopes one by one (group of 1), and also store the + // offset for each separately. + // Important to replay them sequentially to avoid concurrent processing and offset storage. val replayDone = - Future.sequence(isAcceptedEnvelopes.map { - case (env, RejectedSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => Done)(ExecutionContext.parasitic) - case (env, RejectedBacktrackingSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map { - case true => Done - case false => throwRejectedEnvelope(sourceProvider, env) + isAcceptedEnvelopes.foldLeft(FutureDone) { + case (previous, (env, RejectedSeqNr)) => + previous.flatMap { _ => + replayIfPossible(env).map(_ => Done)(ExecutionContext.parasitic) } - case _ => - FutureDone - }) - - replayDone.flatMap { _ => - val acceptedEnvelopes = isAcceptedEnvelopes.collect { - case (env, Accepted) => - env + case (previous, (env, RejectedBacktrackingSeqNr)) => + previous.flatMap { _ => + replayIfPossible(env).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, env) + } + } + case (previous, _) => + previous } - if (acceptedEnvelopes.isEmpty) { - FutureDone - } else { - Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { - loadedEnvelopes => + replayDone.flatMap { _ => + def processAcceptedEnvelopes(envelopes: Seq[Envelope]): Future[Done] = { + if (envelopes.isEmpty) { + FutureDone + } else { + Future.sequence(envelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { loadedEnvelopes => val offsets = loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector val filteredEnvelopes = loadedEnvelopes.filterNot(isFilteredEvent) if (filteredEnvelopes.isEmpty) { @@ -464,11 +466,119 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.transactSaveOffsets(writeItems, offsets) } } + } + } + } + + val acceptedEnvelopes = isAcceptedEnvelopes.collect { + case (env, Accepted) => + env + } + val hasRejected = + isAcceptedEnvelopes.exists { + case (_, RejectedSeqNr) => true + case (_, RejectedBacktrackingSeqNr) => true + case _ => false } + + if (hasRejected) { + // need second validation after replay to remove duplicates + offsetStore + .validateAll(acceptedEnvelopes) + .flatMap { isAcceptedEnvelopes2 => + val acceptedEnvelopes2 = isAcceptedEnvelopes2.collect { + case (env, Accepted) => env + } + processAcceptedEnvelopes(acceptedEnvelopes2) + } + } else { + processAcceptedEnvelopes(acceptedEnvelopes) } } } } + + private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + sourceProvider match { + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] + if offsetStore.settings.replayOnRejectedSequenceNumbers => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + val toSeqNr = originalEventEnvelope.sequenceNr + logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import DynamoDBOffsetStore.Validation._ + offsetStore + .validate(envelope) + .flatMap { + case Accepted => + val offset = extractOffsetPidSeqNr(sourceProvider, envelope.asInstanceOf[Envelope]) + if (isFilteredEvent(envelope)) { + offsetStore.saveOffset(offset) + } else { + delegate + .process(Seq(envelope.asInstanceOf[Envelope])) + .flatMap { writeItems => + offsetStore.transactSaveOffset(writeItems, offset) + } + } + case Duplicate => + FutureDone + case RejectedSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + case RejectedBacktrackingSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + } + } + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + 1 + if (count == expected) { + true + } else { + // it's expected to find all events, otherwise fail the replay attempt + log.warn( + "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) + } + } + .recoverWith { exc => + log.warn( + "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + offsetStore.logPrefix, + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + exc) + Future.failed(exc) + } + case None => FutureFalse + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } } } @@ -484,7 +594,9 @@ private[projection] object DynamoDBProjectionImpl { override def process(envelopes: Seq[Envelope]): Future[Done] = { import DynamoDBOffsetStore.Validation._ offsetStore.validateAll(envelopes).flatMap { isAcceptedEnvelopes => - // FIXME improvement would be to only replay the last for each pid + // For simplicity we process the replayed envelopes one by one (group of 1), and also store the + // offset for each separately. + // Important to replay them sequentially to avoid concurrent processing and offset storage. val replayDone = isAcceptedEnvelopes.foldLeft(FutureDone) { case (previous, (env, RejectedSeqNr)) => @@ -571,7 +683,6 @@ private[projection] object DynamoDBProjectionImpl { .validate(envelope) .flatMap { case Accepted => - // FIXME: should we be grouping the replayed envelopes? based on what? val offset = extractOffsetPidSeqNr(sourceProvider, envelope.asInstanceOf[Envelope]) if (isFilteredEvent(envelope)) { offsetStore.saveOffset(offset) @@ -779,6 +890,9 @@ private[projection] object DynamoDBProjectionImpl { log.debug(msg, offsetStore.logPrefix, persistenceId, fromSeqNr, toSeqNr, c) } + /** + * This replay mechanism is used by GrpcReadJournal + */ private def triggerReplayIfPossible[Offset, Envelope]( sourceProvider: SourceProvider[Offset, Envelope], offsetStore: DynamoDBOffsetStore, From ce483e5f465bf8878ebd37455806f52ec36675db Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 17:56:40 +0100 Subject: [PATCH 27/28] extract some trivial logging --- .../internal/DynamoDBProjectionImpl.scala | 136 +++++++----------- 1 file changed, 55 insertions(+), 81 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index a5f66f1ad..8c8fef6b4 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -215,6 +215,7 @@ private[projection] object DynamoDBProjectionImpl { } private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -224,7 +225,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -264,25 +265,12 @@ private[projection] object DynamoDBProjectionImpl { true } else { // it's expected to find all events, otherwise fail the replay attempt - log.warn( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - count, - expected, - persistenceId, - fromSeqNr, - toSeqNr) + logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected) throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => - log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - originalEventEnvelope.sequenceNr, - exc) + logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) Future.failed(exc) } case None => FutureFalse @@ -338,6 +326,7 @@ private[projection] object DynamoDBProjectionImpl { } private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -347,7 +336,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -384,25 +373,12 @@ private[projection] object DynamoDBProjectionImpl { true } else { // it's expected to find all events, otherwise fail the replay attempt - log.warn( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - count, - expected, - persistenceId, - fromSeqNr, - toSeqNr) + logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected) throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => - log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - originalEventEnvelope.sequenceNr, - exc) + logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) Future.failed(exc) } case None => FutureFalse @@ -499,6 +475,7 @@ private[projection] object DynamoDBProjectionImpl { } private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -508,7 +485,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -547,25 +524,12 @@ private[projection] object DynamoDBProjectionImpl { true } else { // it's expected to find all events, otherwise fail the replay attempt - log.warn( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - count, - expected, - persistenceId, - fromSeqNr, - toSeqNr) + logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected) throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => - log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - originalEventEnvelope.sequenceNr, - exc) + logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) Future.failed(exc) } case None => FutureFalse @@ -664,6 +628,7 @@ private[projection] object DynamoDBProjectionImpl { } private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -673,7 +638,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -712,25 +677,12 @@ private[projection] object DynamoDBProjectionImpl { true } else { // it's expected to find all events, otherwise fail the replay attempt - log.warn( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - count, - expected, - persistenceId, - fromSeqNr, - toSeqNr) + logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected) throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => - log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - originalEventEnvelope.sequenceNr, - exc) + logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) Future.failed(exc) } case None => FutureFalse @@ -758,6 +710,7 @@ private[projection] object DynamoDBProjectionImpl { implicit val ec: ExecutionContext = system.executionContext def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => sourceProvider match { @@ -767,7 +720,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -813,24 +766,17 @@ private[projection] object DynamoDBProjectionImpl { // and handler could also filter out envelopes log.debug( "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, + logPrefix, count, expected, persistenceId, fromSeqNr, toSeqNr) - // throwRejectedEnvelopeAfterFailedReplay(source, sourceProvider, originalEnvelope) true } } .recoverWith { exc => - log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", - offsetStore.logPrefix, - persistenceId, - fromSeqNr, - originalEventEnvelope.sequenceNr, - exc) + logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) Future.failed(exc) } case None => FutureFalse @@ -876,18 +822,46 @@ private[projection] object DynamoDBProjectionImpl { .via(handler) } - private def logReplayRejected( - offsetStore: DynamoDBOffsetStore, - persistenceId: String, - fromSeqNr: Long, - toSeqNr: Long): Unit = { + private def logReplayRejected(logPrefix: String, persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Unit = { val msg = "{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]." val c = replayRejectedCounter.incrementAndGet() if (c == 1 || c % 1000 == 0) - log.warn(msg, offsetStore.logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + log.warn(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) else - log.debug(msg, offsetStore.logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + log.debug(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + } + + private def logReplayInvalidCount( + logPrefix: String, + persistenceId: String, + fromSeqNr: Long, + toSeqNr: Long, + count: Int, + expected: Long): Unit = { + log.warn( + "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + logPrefix, + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + } + + private def logReplayException( + logPrefix: String, + persistenceId: String, + fromSeqNr: Long, + toSeqNr: Long, + exc: Throwable): Unit = { + log.warn( + "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + logPrefix, + persistenceId, + fromSeqNr, + toSeqNr, + exc) } /** From 2951934c66ff73d9112565063f47c813f0d2fc8a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2024 18:00:40 +0100 Subject: [PATCH 28/28] enable mima --- .../mima-filters/1.6.3.backwards.excludes/offset-store.excludes | 2 ++ build.sbt | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 akka-projection-dynamodb/src/main/mima-filters/1.6.3.backwards.excludes/offset-store.excludes diff --git a/akka-projection-dynamodb/src/main/mima-filters/1.6.3.backwards.excludes/offset-store.excludes b/akka-projection-dynamodb/src/main/mima-filters/1.6.3.backwards.excludes/offset-store.excludes new file mode 100644 index 000000000..f0ea8a80d --- /dev/null +++ b/akka-projection-dynamodb/src/main/mima-filters/1.6.3.backwards.excludes/offset-store.excludes @@ -0,0 +1,2 @@ +# internal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#State.evict") diff --git a/build.sbt b/build.sbt index 52cd2827d..c273c886d 100644 --- a/build.sbt +++ b/build.sbt @@ -180,8 +180,6 @@ lazy val dynamodb = .settings(Dependencies.dynamodb) .dependsOn(core, eventsourced) .disablePlugins(CiReleasePlugin) - // FIXME: No previous artifact, disable MiMa until first release - .settings(mimaPreviousArtifacts := Set.empty) lazy val dynamodbIntegration = Project(id = "akka-projection-dynamodb-integration", base = file("akka-projection-dynamodb-integration"))