From bbbbea3ba3cc00dddef30aa70a20e4da282f4d94 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 25 Oct 2023 11:48:39 +0200 Subject: [PATCH] fix: EventWriter snapshot event (#1048) * The EventWriter.Write now takes a isSnapshotEvent parameter, which means that sequence number gaps before the snapshot will be filled with filtered event payloads * The EventEnvelope.source is "SN" for snapshot events * No dependency to akka-persistence-r2dbc from akka-projection-grpc so hardcoding source "SN" (can't use EnvelopeOrigin) * Akka 2.9.0 --- .../EventPusherConsumerServiceImpl.scala | 17 ++++++++++++++++- project/Dependencies.scala | 2 +- samples/grpc/iot-service-scala/build.sbt | 2 +- samples/grpc/local-drone-control-java/pom.xml | 2 +- .../grpc/local-drone-control-scala/build.sbt | 2 +- .../pom.xml | 2 +- .../build.sbt | 2 +- .../shopping-analytics-service-java/pom.xml | 2 +- .../shopping-analytics-service-scala/build.sbt | 2 +- samples/grpc/shopping-cart-service-java/pom.xml | 2 +- .../grpc/shopping-cart-service-scala/build.sbt | 2 +- .../shopping-cart-service-java/pom.xml | 2 +- .../shopping-cart-service-scala/build.sbt | 2 +- 13 files changed, 28 insertions(+), 13 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 786033495..0c023a177 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -25,10 +25,23 @@ import akka.projection.grpc.internal.proto.EventConsumerServicePowerApi import akka.stream.scaladsl.Source import io.grpc.Status import org.slf4j.LoggerFactory - import scala.concurrent.ExecutionContext import scala.concurrent.Promise +import akka.persistence.query.typed.EventEnvelope + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object EventPusherConsumerServiceImpl { + // See akka.persistence.r2dbc.internal.EnvelopeOrigin, but we don't have a dependency + // to akka-persistence-r2dbc here + def fromSnapshot(env: EventEnvelope[_]): Boolean = + env.source == "SN" + +} + /** * INTERNAL API * @@ -40,6 +53,7 @@ private[akka] final class EventPusherConsumerServiceImpl( preferProtobuf: ProtoAnySerialization.Prefer)(implicit system: ActorSystem[_]) extends EventConsumerServicePowerApi { + import EventPusherConsumerServiceImpl.fromSnapshot import ProtobufProtocolConversions._ private val logger = LoggerFactory.getLogger(classOf[EventPusherConsumerServiceImpl]) @@ -134,6 +148,7 @@ private[akka] final class EventPusherConsumerServiceImpl( transformedEventEnvelope.persistenceId, transformedEventEnvelope.sequenceNr, transformedEventEnvelope.eventOption.getOrElse(FilteredPayload), + isSnapshotEvent = fromSnapshot(transformedEventEnvelope), transformedEventEnvelope.eventMetadata, transformedEventEnvelope.tags, _))(destination.eventProducerPushDestination.settings.journalWriteTimeout, system.scheduler) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4d9d33b86..352b1853e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,7 +19,7 @@ object Dependencies { val AkkaProjectionVersionInDocs = "1.5" object Versions { - val akka = sys.props.getOrElse("build.akka.version", "2.9.0-M3") + val akka = sys.props.getOrElse("build.akka.version", "2.9.0") val akkaPersistenceCassandra = "1.2.0-M1" val akkaPersistenceJdbc = "5.3.0-M1" val akkaPersistenceR2dbc = "1.2.0-M7" diff --git a/samples/grpc/iot-service-scala/build.sbt b/samples/grpc/iot-service-scala/build.sbt index 2fff10770..fe6b8ec66 100644 --- a/samples/grpc/iot-service-scala/build.sbt +++ b/samples/grpc/iot-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/grpc/local-drone-control-java/pom.xml b/samples/grpc/local-drone-control-java/pom.xml index 334b0375f..48adfd8e7 100644 --- a/samples/grpc/local-drone-control-java/pom.xml +++ b/samples/grpc/local-drone-control-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index 2879e738c..e5a652e2c 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -31,7 +31,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml index 11d581e24..e8fe78fcf 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml +++ b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0 1.5.0-M5 1.2.0-M6 1.5.0-M1 diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt index c96b158cd..f337693f0 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M6" diff --git a/samples/grpc/shopping-analytics-service-java/pom.xml b/samples/grpc/shopping-analytics-service-java/pom.xml index d3e6f518e..f6401d7bf 100644 --- a/samples/grpc/shopping-analytics-service-java/pom.xml +++ b/samples/grpc/shopping-analytics-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/grpc/shopping-analytics-service-scala/build.sbt b/samples/grpc/shopping-analytics-service-scala/build.sbt index 7ef402bb1..496155f56 100644 --- a/samples/grpc/shopping-analytics-service-scala/build.sbt +++ b/samples/grpc/shopping-analytics-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/grpc/shopping-cart-service-java/pom.xml b/samples/grpc/shopping-cart-service-java/pom.xml index 3831b892d..5a215b321 100644 --- a/samples/grpc/shopping-cart-service-java/pom.xml +++ b/samples/grpc/shopping-cart-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/grpc/shopping-cart-service-scala/build.sbt b/samples/grpc/shopping-cart-service-scala/build.sbt index 54912c5ab..5b75beb20 100644 --- a/samples/grpc/shopping-cart-service-scala/build.sbt +++ b/samples/grpc/shopping-cart-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/replicated/shopping-cart-service-java/pom.xml b/samples/replicated/shopping-cart-service-java/pom.xml index 918c7df3a..16f68f3c3 100644 --- a/samples/replicated/shopping-cart-service-java/pom.xml +++ b/samples/replicated/shopping-cart-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/replicated/shopping-cart-service-scala/build.sbt b/samples/replicated/shopping-cart-service-scala/build.sbt index 1aa75bcaf..3315ce651 100644 --- a/samples/replicated/shopping-cart-service-scala/build.sbt +++ b/samples/replicated/shopping-cart-service-scala/build.sbt @@ -29,7 +29,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = sys.props.getOrElse("akka.version", "2.9.0-M3") +val AkkaVersion = sys.props.getOrElse("akka.version", "2.9.0") val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7"