diff --git a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/StartFromSnapshotEndToEndSpec.scala b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/StartFromSnapshotEndToEndSpec.scala new file mode 100644 index 000000000..53e676fa8 --- /dev/null +++ b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/StartFromSnapshotEndToEndSpec.scala @@ -0,0 +1,235 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.projection.r2dbc + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.LoggerOps +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.projection.ProjectionBehavior +import akka.projection.ProjectionId +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.r2dbc.scaladsl.R2dbcHandler +import akka.projection.r2dbc.scaladsl.R2dbcProjection +import akka.projection.r2dbc.scaladsl.R2dbcSession +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +import java.util.UUID +import scala.concurrent.Future + +object StartFromSnapshotEndToEndSpec { + + val config: Config = ConfigFactory + .parseString(""" + akka.persistence.snapshot-store.plugin = "akka.persistence.r2dbc.snapshot" + akka.persistence.r2dbc { + query { + refresh-interval = 500 millis + # stress more by using a small buffer (sql limit) + buffer-size = 10 + + backtracking.behind-current-time = 5 seconds + + start-from-snapshot.enabled = true + } + } + """) + .withFallback(TestConfig.config) + + object Persister { + sealed trait Command + final case class PersistWithAck(payload: String, replyTo: ActorRef[Done]) extends Command + final case class Ping(replyTo: ActorRef[Done]) extends Command + final case class Stop(replyTo: ActorRef[Done]) extends Command + + def apply(pid: PersistenceId): Behavior[Command] = { + Behaviors.setup { context => + EventSourcedBehavior[Command, String, String]( + persistenceId = pid, + "", { (_, command) => + command match { + case command: PersistWithAck => + context.log.debugN( + "Persist [{}], pid [{}], seqNr [{}]", + command.payload, + pid.id, + EventSourcedBehavior.lastSequenceNumber(context) + 1) + Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done) + case Ping(replyTo) => + replyTo ! Done + Effect.none + case Stop(replyTo) => + replyTo ! Done + Effect.stop() + } + }, + (state, evt) => if (state.isBlank) evt else s"$state,$evt").snapshotWhen((_, evt, _) => + evt.endsWith("snapit!")) + } + } + } + + sealed trait HandlerEvt + final case class Processed(projectionId: ProjectionId, envelope: EventEnvelope[String]) extends HandlerEvt + final case object Stopped extends HandlerEvt + + class TestHandler(projectionId: ProjectionId, probe: ActorRef[HandlerEvt]) + extends R2dbcHandler[EventEnvelope[String]] { + private val log = LoggerFactory.getLogger(getClass) + + override def process(session: R2dbcSession, envelope: EventEnvelope[String]): Future[Done] = { + log.debug2("{} Processed {}", projectionId.key, envelope.event) + probe ! Processed(projectionId, envelope) + Future.successful(Done) + } + + override def stop(): Future[Done] = { + probe ! Stopped + Future.successful(Done) + } + } + +} + +class StartFromSnapshotEndToEndSpec + extends ScalaTestWithActorTestKit(StartFromSnapshotEndToEndSpec.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + import StartFromSnapshotEndToEndSpec._ + + override def typedSystem: ActorSystem[_] = system + + private val projectionSettings = R2dbcProjectionSettings(system) + + override protected def beforeAll(): Unit = { + super.beforeAll() + } + + private def startProjections( + entityType: String, + projectionName: String, + nrOfProjections: Int, + processedProbe: ActorRef[HandlerEvt]): Vector[ActorRef[ProjectionBehavior.Command]] = { + val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, nrOfProjections) + + sliceRanges.map { range => + val projectionId = ProjectionId(projectionName, s"${range.min}-${range.max}") + val sourceProvider = + EventSourcedProvider + .eventsBySlicesStartingFromSnapshots[String, String]( + system, + R2dbcReadJournal.Identifier, + entityType, + range.min, + range.max, + identity) + val projection = R2dbcProjection + .exactlyOnce( + projectionId, + Some(projectionSettings), + sourceProvider = sourceProvider, + handler = () => new TestHandler(projectionId, processedProbe.ref)) + spawn(ProjectionBehavior(projection)) + }.toVector + } + + s"A R2DBC projection starting from snapshots (dialect ${r2dbcSettings.dialectName})" must { + + "work when no previous events seen" in { + val entityType = nextEntityType() + + val persistenceId = PersistenceId(entityType, s"p1") + val entity = spawn(Persister(persistenceId), s"$entityType-p1") + + // write some before starting the projections + val ackProbe = createTestProbe[Done]() + + (1 to 5).foreach { n => + entity ! Persister.PersistWithAck(n.toString, ackProbe.ref) + } + entity ! Persister.PersistWithAck("6-snapit!", ackProbe.ref) + + ackProbe.receiveMessages(6) + + val projectionName = UUID.randomUUID().toString + val processedProbe = createTestProbe[HandlerEvt]() + val projections = startProjections(entityType, projectionName, nrOfProjections = 1, processedProbe.ref) + + val firstSeenEnvelope = processedProbe.expectMessageType[Processed].envelope + // full state + firstSeenEnvelope.event should ===("1,2,3,4,5,6-snapit!") + firstSeenEnvelope.sequenceNr should ===(6L) + + // persist events after snapshot + entity ! Persister.PersistWithAck("7", ackProbe.ref) + ackProbe.receiveMessage() + + val afterSnap = processedProbe.expectMessageType[Processed] + afterSnap.envelope.event should ===("7") + afterSnap.envelope.sequenceNr should ===(7L) + + projections.foreach(_ ! ProjectionBehavior.Stop) + processedProbe.expectMessage(Stopped) + } + + "work when previous events seen" in { + val entityType = nextEntityType() + + val persistenceId = PersistenceId(entityType, s"p1") + val entity = spawn(Persister(persistenceId), s"$entityType-p1") + + // write some before starting the projections + val ackProbe = createTestProbe[Done]() + + (1 to 5).foreach { n => + entity ! Persister.PersistWithAck(n.toString, ackProbe.ref) + } + ackProbe.receiveMessages(5) + + val projectionName = UUID.randomUUID().toString + val handlerProbe = createTestProbe[HandlerEvt]() + val projections = startProjections(entityType, projectionName, nrOfProjections = 1, handlerProbe.ref) + + handlerProbe.receiveMessages(5) + + // pause projection + projections.foreach(_ ! ProjectionBehavior.Stop) + handlerProbe.expectMessage(Stopped) + + // trigger snapshot + entity ! Persister.PersistWithAck("6", ackProbe.ref) + ackProbe.receiveMessage() + entity ! Persister.PersistWithAck("7snapit!", ackProbe.ref) + ackProbe.receiveMessage() + + // restart projection + val secondIncarnationOfProjections = + startProjections(entityType, projectionName, nrOfProjections = 1, handlerProbe.ref) + + val afterSnap = handlerProbe.expectMessageType[Processed] + // we now started with snap event even though there was one inbetween (seqNr 6 lost) + afterSnap.envelope.event should ===("1,2,3,4,5,6,7snapit!") + afterSnap.envelope.sequenceNr should ===(7L) + + secondIncarnationOfProjections.foreach(_ ! ProjectionBehavior.Stop) + handlerProbe.expectMessage(Stopped) + } + } + +} diff --git a/akka-projection-r2dbc/src/main/mima-filters/1.5.0-M5.backwards.excludes/snapshot-events.backwards.excludes b/akka-projection-r2dbc/src/main/mima-filters/1.5.0-M5.backwards.excludes/snapshot-events.backwards.excludes new file mode 100644 index 000000000..8a354cc7b --- /dev/null +++ b/akka-projection-r2dbc/src/main/mima-filters/1.5.0-M5.backwards.excludes/snapshot-events.backwards.excludes @@ -0,0 +1,5 @@ +# added flag for events from snapshots +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.this") +ProblemFilters.exclude[MissingTypesProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore$RecordWithOffset$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.apply") \ No newline at end of file diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 6e839aa63..aee1c07c6 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -54,7 +54,8 @@ private[projection] object R2dbcOffsetStore { offset: TimestampOffset, strictSeqNr: Boolean, fromBacktracking: Boolean, - fromPubSub: Boolean) + fromPubSub: Boolean, + fromSnapshot: Boolean) final case class RecordWithProjectionKey(record: Record, projectionKey: String) object State { @@ -617,6 +618,9 @@ private[projection] class R2dbcOffsetStore( // currentInFlight contains those that have been processed or about to be processed in Flow, // but offset not saved yet => ok to handle as duplicate FutureDuplicate + } else if (recordWithOffset.fromSnapshot) { + // snapshots will mean we are starting from some arbitrary offset after last seen offset + FutureAccepted } else if (!recordWithOffset.fromBacktracking) { logUnexpected() FutureRejectedSeqNr @@ -628,6 +632,9 @@ private[projection] class R2dbcOffsetStore( } else if (seqNr == 1) { // always accept first event if no other event for that pid has been seen FutureAccepted + } else if (recordWithOffset.fromSnapshot) { + // always accept starting from snapshots when there was no previous event seen + FutureAccepted } else { // Haven't see seen this pid within the time window. Since events can be missed // when read at the tail we will only accept it if the event with previous seqNr has timestamp @@ -862,7 +869,8 @@ private[projection] class R2dbcOffsetStore( timestampOffset, strictSeqNr = true, fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope), - fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope))) + fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope), + fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope))) case change: UpdatedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] => val timestampOffset = change.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(change.persistenceId) @@ -872,7 +880,8 @@ private[projection] class R2dbcOffsetStore( timestampOffset, strictSeqNr = false, fromBacktracking = EnvelopeOrigin.fromBacktracking(change), - fromPubSub = false)) + fromPubSub = false, + fromSnapshot = false)) case change: DeletedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] => val timestampOffset = change.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(change.persistenceId) @@ -882,7 +891,8 @@ private[projection] class R2dbcOffsetStore( timestampOffset, strictSeqNr = false, fromBacktracking = false, - fromPubSub = false)) + fromPubSub = false, + fromSnapshot = false)) case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] => // in case additional types are added throw new IllegalArgumentException( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ae95d431b..4d9d33b86 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -22,7 +22,7 @@ object Dependencies { val akka = sys.props.getOrElse("build.akka.version", "2.9.0-M3") val akkaPersistenceCassandra = "1.2.0-M1" val akkaPersistenceJdbc = "5.3.0-M1" - val akkaPersistenceR2dbc = "1.2.0-M6" + val akkaPersistenceR2dbc = "1.2.0-M7" val alpakka = "7.0.0-M2" val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "5.0.0-M1") val slick = "3.4.1" diff --git a/samples/grpc/iot-service-scala/build.sbt b/samples/grpc/iot-service-scala/build.sbt index 05db547c8..2fff10770 100644 --- a/samples/grpc/iot-service-scala/build.sbt +++ b/samples/grpc/iot-service-scala/build.sbt @@ -31,7 +31,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = "2.9.0-M3" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" -val AkkaPersistenceR2dbcVersion = "1.2.0-M6" +val AkkaPersistenceR2dbcVersion = "1.2.0-M7" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M5") val AkkaDiagnosticsVersion = "2.1.0-M1" diff --git a/samples/grpc/local-drone-control-java/pom.xml b/samples/grpc/local-drone-control-java/pom.xml index 2652555db..334b0375f 100644 --- a/samples/grpc/local-drone-control-java/pom.xml +++ b/samples/grpc/local-drone-control-java/pom.xml @@ -19,7 +19,7 @@ UTF-8 2.9.0-M3 1.5.0-M5 - 1.2.0-M6 + 1.2.0-M7 1.5.0-M1 2.1.0-M1 10.5.1 diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index cfa59ae83..2879e738c 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -2,7 +2,8 @@ name := "local-drone-control" organization := "com.lightbend.akka.samples" organizationHomepage := Some(url("https://akka.io")) -licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) +licenses := Seq( + ("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) resolvers += "Akka library repository".at("https://repo.akka.io/maven") @@ -33,7 +34,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = "2.9.0-M3" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" -val AkkaPersistenceR2dbcVersion = "1.2.0-M6" +val AkkaPersistenceR2dbcVersion = "1.2.0-M7" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M5") val AkkaDiagnosticsVersion = "2.1.0-M1" @@ -102,9 +103,12 @@ nativeImageOptions := Seq( "-Dlogback.configurationFile=logback-native-image.xml" // configured at build time ) -NativeImage / mainClass := sys.props.get("native.mode").collect { - case "clustered" => "local.drones.ClusteredMain" -}.orElse((Compile / run / mainClass).value) +NativeImage / mainClass := sys.props + .get("native.mode") + .collect { case "clustered" => + "local.drones.ClusteredMain" + } + .orElse((Compile / run / mainClass).value) // silence warnings for these keys (used in dynamic task) Global / excludeLintKeys ++= Set(nativeImageJvm, nativeImageVersion) diff --git a/samples/grpc/shopping-analytics-service-java/pom.xml b/samples/grpc/shopping-analytics-service-java/pom.xml index a34368e9e..d3e6f518e 100644 --- a/samples/grpc/shopping-analytics-service-java/pom.xml +++ b/samples/grpc/shopping-analytics-service-java/pom.xml @@ -19,7 +19,7 @@ UTF-8 2.9.0-M3 1.5.0-M5 - 1.2.0-M6 + 1.2.0-M7 1.5.0-M1 2.1.0-M1 2.4.0-M2 diff --git a/samples/grpc/shopping-analytics-service-scala/build.sbt b/samples/grpc/shopping-analytics-service-scala/build.sbt index 1392218d1..7ef402bb1 100644 --- a/samples/grpc/shopping-analytics-service-scala/build.sbt +++ b/samples/grpc/shopping-analytics-service-scala/build.sbt @@ -31,7 +31,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = "2.9.0-M3" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" -val AkkaPersistenceR2dbcVersion = "1.2.0-M6" +val AkkaPersistenceR2dbcVersion = "1.2.0-M7" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M5") val AkkaDiagnosticsVersion = "2.1.0-M1" diff --git a/samples/grpc/shopping-cart-service-java/pom.xml b/samples/grpc/shopping-cart-service-java/pom.xml index 0ce2cc175..3831b892d 100644 --- a/samples/grpc/shopping-cart-service-java/pom.xml +++ b/samples/grpc/shopping-cart-service-java/pom.xml @@ -19,7 +19,7 @@ UTF-8 2.9.0-M3 1.5.0-M5 - 1.2.0-M6 + 1.2.0-M7 1.5.0-M1 2.1.0-M1 2.4.0-M2 diff --git a/samples/grpc/shopping-cart-service-scala/build.sbt b/samples/grpc/shopping-cart-service-scala/build.sbt index 36b213fd1..54912c5ab 100644 --- a/samples/grpc/shopping-cart-service-scala/build.sbt +++ b/samples/grpc/shopping-cart-service-scala/build.sbt @@ -31,7 +31,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = "2.9.0-M3" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" -val AkkaPersistenceR2dbcVersion = "1.2.0-M6" +val AkkaPersistenceR2dbcVersion = "1.2.0-M7" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M5") val AkkaDiagnosticsVersion = "2.1.0-M1" diff --git a/samples/replicated/shopping-cart-service-java/pom.xml b/samples/replicated/shopping-cart-service-java/pom.xml index 9586472ce..918c7df3a 100644 --- a/samples/replicated/shopping-cart-service-java/pom.xml +++ b/samples/replicated/shopping-cart-service-java/pom.xml @@ -19,7 +19,7 @@ UTF-8 2.9.0-M3 1.5.0-M5 - 1.2.0-M6 + 1.2.0-M7 1.5.0-M1 2.1.0-M1 2.4.0-M2 diff --git a/samples/replicated/shopping-cart-service-scala/build.sbt b/samples/replicated/shopping-cart-service-scala/build.sbt index fc97b265c..1aa75bcaf 100644 --- a/samples/replicated/shopping-cart-service-scala/build.sbt +++ b/samples/replicated/shopping-cart-service-scala/build.sbt @@ -32,7 +32,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = sys.props.getOrElse("akka.version", "2.9.0-M3") val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" -val AkkaPersistenceR2dbcVersion = "1.2.0-M6" +val AkkaPersistenceR2dbcVersion = "1.2.0-M7" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M5") val AkkaDiagnosticsVersion = "2.1.0-M1"