Skip to content

Commit

Permalink
feat: Local projections starting from snapshots (#1047)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Oct 19, 2023
1 parent 5a540ab commit bb271d5
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.r2dbc

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.r2dbc.scaladsl.R2dbcHandler
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.r2dbc.scaladsl.R2dbcSession
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

import java.util.UUID
import scala.concurrent.Future

object StartFromSnapshotEndToEndSpec {

val config: Config = ConfigFactory
.parseString("""
akka.persistence.snapshot-store.plugin = "akka.persistence.r2dbc.snapshot"
akka.persistence.r2dbc {
query {
refresh-interval = 500 millis
# stress more by using a small buffer (sql limit)
buffer-size = 10
backtracking.behind-current-time = 5 seconds
start-from-snapshot.enabled = true
}
}
""")
.withFallback(TestConfig.config)

object Persister {
sealed trait Command
final case class PersistWithAck(payload: String, replyTo: ActorRef[Done]) extends Command
final case class Ping(replyTo: ActorRef[Done]) extends Command
final case class Stop(replyTo: ActorRef[Done]) extends Command

def apply(pid: PersistenceId): Behavior[Command] = {
Behaviors.setup { context =>
EventSourcedBehavior[Command, String, String](
persistenceId = pid,
"", { (_, command) =>
command match {
case command: PersistWithAck =>
context.log.debugN(
"Persist [{}], pid [{}], seqNr [{}]",
command.payload,
pid.id,
EventSourcedBehavior.lastSequenceNumber(context) + 1)
Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done)
case Ping(replyTo) =>
replyTo ! Done
Effect.none
case Stop(replyTo) =>
replyTo ! Done
Effect.stop()
}
},
(state, evt) => if (state.isBlank) evt else s"$state,$evt").snapshotWhen((_, evt, _) =>
evt.endsWith("snapit!"))
}
}
}

sealed trait HandlerEvt
final case class Processed(projectionId: ProjectionId, envelope: EventEnvelope[String]) extends HandlerEvt
final case object Stopped extends HandlerEvt

class TestHandler(projectionId: ProjectionId, probe: ActorRef[HandlerEvt])
extends R2dbcHandler[EventEnvelope[String]] {
private val log = LoggerFactory.getLogger(getClass)

override def process(session: R2dbcSession, envelope: EventEnvelope[String]): Future[Done] = {
log.debug2("{} Processed {}", projectionId.key, envelope.event)
probe ! Processed(projectionId, envelope)
Future.successful(Done)
}

override def stop(): Future[Done] = {
probe ! Stopped
Future.successful(Done)
}
}

}

class StartFromSnapshotEndToEndSpec
extends ScalaTestWithActorTestKit(StartFromSnapshotEndToEndSpec.config)
with AnyWordSpecLike
with TestDbLifecycle
with TestData
with LogCapturing {
import StartFromSnapshotEndToEndSpec._

override def typedSystem: ActorSystem[_] = system

private val projectionSettings = R2dbcProjectionSettings(system)

override protected def beforeAll(): Unit = {
super.beforeAll()
}

private def startProjections(
entityType: String,
projectionName: String,
nrOfProjections: Int,
processedProbe: ActorRef[HandlerEvt]): Vector[ActorRef[ProjectionBehavior.Command]] = {
val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, nrOfProjections)

sliceRanges.map { range =>
val projectionId = ProjectionId(projectionName, s"${range.min}-${range.max}")
val sourceProvider =
EventSourcedProvider
.eventsBySlicesStartingFromSnapshots[String, String](
system,
R2dbcReadJournal.Identifier,
entityType,
range.min,
range.max,
identity)
val projection = R2dbcProjection
.exactlyOnce(
projectionId,
Some(projectionSettings),
sourceProvider = sourceProvider,
handler = () => new TestHandler(projectionId, processedProbe.ref))
spawn(ProjectionBehavior(projection))
}.toVector
}

s"A R2DBC projection starting from snapshots (dialect ${r2dbcSettings.dialectName})" must {

"work when no previous events seen" in {
val entityType = nextEntityType()

val persistenceId = PersistenceId(entityType, s"p1")
val entity = spawn(Persister(persistenceId), s"$entityType-p1")

// write some before starting the projections
val ackProbe = createTestProbe[Done]()

(1 to 5).foreach { n =>
entity ! Persister.PersistWithAck(n.toString, ackProbe.ref)
}
entity ! Persister.PersistWithAck("6-snapit!", ackProbe.ref)

ackProbe.receiveMessages(6)

val projectionName = UUID.randomUUID().toString
val processedProbe = createTestProbe[HandlerEvt]()
val projections = startProjections(entityType, projectionName, nrOfProjections = 1, processedProbe.ref)

val firstSeenEnvelope = processedProbe.expectMessageType[Processed].envelope
// full state
firstSeenEnvelope.event should ===("1,2,3,4,5,6-snapit!")
firstSeenEnvelope.sequenceNr should ===(6L)

// persist events after snapshot
entity ! Persister.PersistWithAck("7", ackProbe.ref)
ackProbe.receiveMessage()

val afterSnap = processedProbe.expectMessageType[Processed]
afterSnap.envelope.event should ===("7")
afterSnap.envelope.sequenceNr should ===(7L)

projections.foreach(_ ! ProjectionBehavior.Stop)
processedProbe.expectMessage(Stopped)
}

"work when previous events seen" in {
val entityType = nextEntityType()

val persistenceId = PersistenceId(entityType, s"p1")
val entity = spawn(Persister(persistenceId), s"$entityType-p1")

// write some before starting the projections
val ackProbe = createTestProbe[Done]()

(1 to 5).foreach { n =>
entity ! Persister.PersistWithAck(n.toString, ackProbe.ref)
}
ackProbe.receiveMessages(5)

val projectionName = UUID.randomUUID().toString
val handlerProbe = createTestProbe[HandlerEvt]()
val projections = startProjections(entityType, projectionName, nrOfProjections = 1, handlerProbe.ref)

handlerProbe.receiveMessages(5)

// pause projection
projections.foreach(_ ! ProjectionBehavior.Stop)
handlerProbe.expectMessage(Stopped)

// trigger snapshot
entity ! Persister.PersistWithAck("6", ackProbe.ref)
ackProbe.receiveMessage()
entity ! Persister.PersistWithAck("7snapit!", ackProbe.ref)
ackProbe.receiveMessage()

// restart projection
val secondIncarnationOfProjections =
startProjections(entityType, projectionName, nrOfProjections = 1, handlerProbe.ref)

val afterSnap = handlerProbe.expectMessageType[Processed]
// we now started with snap event even though there was one inbetween (seqNr 6 lost)
afterSnap.envelope.event should ===("1,2,3,4,5,6,7snapit!")
afterSnap.envelope.sequenceNr should ===(7L)

secondIncarnationOfProjections.foreach(_ ! ProjectionBehavior.Stop)
handlerProbe.expectMessage(Stopped)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# added flag for events from snapshots
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.this")
ProblemFilters.exclude[MissingTypesProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore$RecordWithOffset$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.apply")
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[projection] object R2dbcOffsetStore {
offset: TimestampOffset,
strictSeqNr: Boolean,
fromBacktracking: Boolean,
fromPubSub: Boolean)
fromPubSub: Boolean,
fromSnapshot: Boolean)
final case class RecordWithProjectionKey(record: Record, projectionKey: String)

object State {
Expand Down Expand Up @@ -617,6 +618,9 @@ private[projection] class R2dbcOffsetStore(
// currentInFlight contains those that have been processed or about to be processed in Flow,
// but offset not saved yet => ok to handle as duplicate
FutureDuplicate
} else if (recordWithOffset.fromSnapshot) {
// snapshots will mean we are starting from some arbitrary offset after last seen offset
FutureAccepted
} else if (!recordWithOffset.fromBacktracking) {
logUnexpected()
FutureRejectedSeqNr
Expand All @@ -628,6 +632,9 @@ private[projection] class R2dbcOffsetStore(
} else if (seqNr == 1) {
// always accept first event if no other event for that pid has been seen
FutureAccepted
} else if (recordWithOffset.fromSnapshot) {
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
Expand Down Expand Up @@ -862,7 +869,8 @@ private[projection] class R2dbcOffsetStore(
timestampOffset,
strictSeqNr = true,
fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope),
fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope)))
fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope),
fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope)))
case change: UpdatedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
val slice = persistenceExt.sliceForPersistenceId(change.persistenceId)
Expand All @@ -872,7 +880,8 @@ private[projection] class R2dbcOffsetStore(
timestampOffset,
strictSeqNr = false,
fromBacktracking = EnvelopeOrigin.fromBacktracking(change),
fromPubSub = false))
fromPubSub = false,
fromSnapshot = false))
case change: DeletedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
val slice = persistenceExt.sliceForPersistenceId(change.persistenceId)
Expand All @@ -882,7 +891,8 @@ private[projection] class R2dbcOffsetStore(
timestampOffset,
strictSeqNr = false,
fromBacktracking = false,
fromPubSub = false))
fromPubSub = false,
fromSnapshot = false))
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object Dependencies {
val akka = sys.props.getOrElse("build.akka.version", "2.9.0-M3")
val akkaPersistenceCassandra = "1.2.0-M1"
val akkaPersistenceJdbc = "5.3.0-M1"
val akkaPersistenceR2dbc = "1.2.0-M6"
val akkaPersistenceR2dbc = "1.2.0-M7"
val alpakka = "7.0.0-M2"
val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "5.0.0-M1")
val slick = "3.4.1"
Expand Down
2 changes: 1 addition & 1 deletion samples/grpc/iot-service-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Global / cancelable := false // ctrl-c
val AkkaVersion = "2.9.0-M3"
val AkkaHttpVersion = "10.6.0-M2"
val AkkaManagementVersion = "1.5.0-M1"
val AkkaPersistenceR2dbcVersion = "1.2.0-M6"
val AkkaPersistenceR2dbcVersion = "1.2.0-M7"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.0-M5")
val AkkaDiagnosticsVersion = "2.1.0-M1"
Expand Down
2 changes: 1 addition & 1 deletion samples/grpc/local-drone-control-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.9.0-M3</akka.version>
<akka-projection.version>1.5.0-M5</akka-projection.version>
<akka-persistence-r2dbc.version>1.2.0-M6</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.2.0-M7</akka-persistence-r2dbc.version>
<akka-management.version>1.5.0-M1</akka-management.version>
<akka-diagnostics.version>2.1.0-M1</akka-diagnostics.version>
<akka-http.version>10.5.1</akka-http.version>
Expand Down
14 changes: 9 additions & 5 deletions samples/grpc/local-drone-control-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name := "local-drone-control"

organization := "com.lightbend.akka.samples"
organizationHomepage := Some(url("https://akka.io"))
licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))
licenses := Seq(
("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))

resolvers += "Akka library repository".at("https://repo.akka.io/maven")

Expand Down Expand Up @@ -33,7 +34,7 @@ Global / cancelable := false // ctrl-c
val AkkaVersion = "2.9.0-M3"
val AkkaHttpVersion = "10.6.0-M2"
val AkkaManagementVersion = "1.5.0-M1"
val AkkaPersistenceR2dbcVersion = "1.2.0-M6"
val AkkaPersistenceR2dbcVersion = "1.2.0-M7"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.0-M5")
val AkkaDiagnosticsVersion = "2.1.0-M1"
Expand Down Expand Up @@ -102,9 +103,12 @@ nativeImageOptions := Seq(
"-Dlogback.configurationFile=logback-native-image.xml" // configured at build time
)

NativeImage / mainClass := sys.props.get("native.mode").collect {
case "clustered" => "local.drones.ClusteredMain"
}.orElse((Compile / run / mainClass).value)
NativeImage / mainClass := sys.props
.get("native.mode")
.collect { case "clustered" =>
"local.drones.ClusteredMain"
}
.orElse((Compile / run / mainClass).value)

// silence warnings for these keys (used in dynamic task)
Global / excludeLintKeys ++= Set(nativeImageJvm, nativeImageVersion)
2 changes: 1 addition & 1 deletion samples/grpc/shopping-analytics-service-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.9.0-M3</akka.version>
<akka-projection.version>1.5.0-M5</akka-projection.version>
<akka-persistence-r2dbc.version>1.2.0-M6</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.2.0-M7</akka-persistence-r2dbc.version>
<akka-management.version>1.5.0-M1</akka-management.version>
<akka-diagnostics.version>2.1.0-M1</akka-diagnostics.version>
<akka-grpc.version>2.4.0-M2</akka-grpc.version>
Expand Down
2 changes: 1 addition & 1 deletion samples/grpc/shopping-analytics-service-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Global / cancelable := false // ctrl-c
val AkkaVersion = "2.9.0-M3"
val AkkaHttpVersion = "10.6.0-M2"
val AkkaManagementVersion = "1.5.0-M1"
val AkkaPersistenceR2dbcVersion = "1.2.0-M6"
val AkkaPersistenceR2dbcVersion = "1.2.0-M7"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.0-M5")
val AkkaDiagnosticsVersion = "2.1.0-M1"
Expand Down
2 changes: 1 addition & 1 deletion samples/grpc/shopping-cart-service-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.9.0-M3</akka.version>
<akka-projection.version>1.5.0-M5</akka-projection.version>
<akka-persistence-r2dbc.version>1.2.0-M6</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.2.0-M7</akka-persistence-r2dbc.version>
<akka-management.version>1.5.0-M1</akka-management.version>
<akka-diagnostics.version>2.1.0-M1</akka-diagnostics.version>
<akka-grpc.version>2.4.0-M2</akka-grpc.version>
Expand Down
Loading

0 comments on commit bb271d5

Please sign in to comment.