From 59c06e70782b1153f863ccc37326c32306c2d379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 18 Oct 2023 14:07:26 +0200 Subject: [PATCH 1/6] docs: Start from snapshot for drone push projection --- .../src/main/resources/persistence-h2.conf | 4 ++++ .../src/main/resources/persistence-postgres.conf | 2 ++ .../src/main/scala/local/drones/Drone.scala | 5 +++++ .../src/main/scala/local/drones/DroneEvents.scala | 10 +++++++++- 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf index 4d27f6e45..ea806a02c 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf @@ -27,3 +27,7 @@ akka { } } } + +// #startFromSnapshot +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true +// #startFromSnapshot diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf index 694f3ee02..4ad66bb14 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf @@ -31,3 +31,5 @@ akka { } } } + +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala index a21f89022..f34c82e51 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala @@ -96,6 +96,11 @@ object Drone { .receiveSignal { case (_, PostStop) => telemetry.droneEntityPassivated() } + // #startFromSnapshot + .snapshotWhen { (_, event, _) => + event.isInstanceOf[CoarseGrainedLocationChanged] + } + // #startFromSnapshot } // #commandHandler diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala index 4a89af47c..79f178525 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala @@ -101,7 +101,15 @@ object DroneEvents { EventProducerSettings(system), // only push coarse grained coordinate changes producerFilter = envelope => - envelope.event.isInstanceOf[Drone.CoarseGrainedLocationChanged]), + envelope.event.isInstanceOf[Drone.CoarseGrainedLocationChanged]) + // #startFromSnapshot + // start from latest drone snapshot and don't replay history + .withStartingFromSnapshots[ + Drone.State, + Drone.CoarseGrainedLocationChanged](state => + Drone.CoarseGrainedLocationChanged( + state.coarseGrainedCoordinates.get)), + // #startFromSnapshot GrpcClientSettings.fromConfig("central-drone-control")) def projectionForPartition( From df9698375984a2578e80a6b22a84367f43a3ec4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 18 Oct 2023 14:27:08 +0200 Subject: [PATCH 2/6] Right place to put the start from snapshot transform --- .../main/scala/local/drones/DroneEvents.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala index 79f178525..f81307099 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala @@ -101,15 +101,7 @@ object DroneEvents { EventProducerSettings(system), // only push coarse grained coordinate changes producerFilter = envelope => - envelope.event.isInstanceOf[Drone.CoarseGrainedLocationChanged]) - // #startFromSnapshot - // start from latest drone snapshot and don't replay history - .withStartingFromSnapshots[ - Drone.State, - Drone.CoarseGrainedLocationChanged](state => - Drone.CoarseGrainedLocationChanged( - state.coarseGrainedCoordinates.get)), - // #startFromSnapshot + envelope.event.isInstanceOf[Drone.CoarseGrainedLocationChanged]), GrpcClientSettings.fromConfig("central-drone-control")) def projectionForPartition( @@ -122,12 +114,21 @@ object DroneEvents { R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]]( ProjectionId("drone-event-push", s"$minSlice-$maxSlice"), settings = None, - sourceProvider = EventSourcedProvider.eventsBySlices[Drone.Event]( - system, - R2dbcReadJournal.Identifier, - eventProducer.eventProducerSource.entityType, - minSlice, - maxSlice), + // FIXME EventProducerSettings.EventProducerSource.startingFromSnapshots not used and no public accessor to pick it up from there, confusing. + // #startFromSnapshot + sourceProvider = EventSourcedProvider + .eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event]( + system, + R2dbcReadJournal.Identifier, + eventProducer.eventProducerSource.entityType, + minSlice, + maxSlice, + // start from latest drone snapshot and don't replay history + { (state: Drone.State) => + Drone.CoarseGrainedLocationChanged( + state.coarseGrainedCoordinates.get) + }), + // #startFromSnapshot handler = eventProducer.handler())) } From 46e5d98814b096c50ba30ecac3925fe673eea3b9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 20 Oct 2023 09:48:20 +0200 Subject: [PATCH 3/6] eventsBySlicesStartingFromSnapshots also in the single node setup --- .../main/scala/local/drones/DroneEvents.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala index f81307099..bc503c778 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala @@ -62,12 +62,21 @@ object DroneEvents { R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]]( ProjectionId("drone-event-push", s"0-$maxSlice"), settings = None, - sourceProvider = EventSourcedProvider.eventsBySlices[Drone.Event]( - system, - R2dbcReadJournal.Identifier, - eventProducer.eventProducerSource.entityType, - 0, - maxSlice), + // FIXME EventProducerSettings.EventProducerSource.startingFromSnapshots not used and no public accessor to pick it up from there, confusing. + // #startFromSnapshot + sourceProvider = EventSourcedProvider + .eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event]( + system, + R2dbcReadJournal.Identifier, + eventProducer.eventProducerSource.entityType, + 0, + maxSlice, + // start from latest drone snapshot and don't replay history + { (state: Drone.State) => + Drone.CoarseGrainedLocationChanged( + state.coarseGrainedCoordinates.get) + }), + // #startFromSnapshot handler = eventProducer.handler())) } @@ -114,8 +123,6 @@ object DroneEvents { R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]]( ProjectionId("drone-event-push", s"$minSlice-$maxSlice"), settings = None, - // FIXME EventProducerSettings.EventProducerSource.startingFromSnapshots not used and no public accessor to pick it up from there, confusing. - // #startFromSnapshot sourceProvider = EventSourcedProvider .eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event]( system, @@ -128,7 +135,6 @@ object DroneEvents { Drone.CoarseGrainedLocationChanged( state.coarseGrainedCoordinates.get) }), - // #startFromSnapshot handler = eventProducer.handler())) } From 6e12ccb3144b75c05b6daf3a89882b9e9acdb700 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 20 Oct 2023 11:17:29 +0200 Subject: [PATCH 4/6] remove fixme --- .../src/main/scala/local/drones/DroneEvents.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala index bc503c778..dab58076e 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala @@ -62,7 +62,6 @@ object DroneEvents { R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]]( ProjectionId("drone-event-push", s"0-$maxSlice"), settings = None, - // FIXME EventProducerSettings.EventProducerSource.startingFromSnapshots not used and no public accessor to pick it up from there, confusing. // #startFromSnapshot sourceProvider = EventSourcedProvider .eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event]( From f4ad00a5b1d0c314fa127be11f4ff76b459da4f3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 25 Oct 2023 13:42:45 +0200 Subject: [PATCH 5/6] Java sample, and include in docs --- .../2-drone-location-to-cloud-service.md | 28 +++++++++++++++++-- .../src/main/java/local/drones/Drone.java | 7 +++++ .../main/java/local/drones/DroneEvents.java | 15 +++++++--- .../src/main/resources/persistence-h2.conf | 4 +++ .../main/resources/persistence-postgres.conf | 2 ++ 5 files changed, 49 insertions(+), 7 deletions(-) diff --git a/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md index cc5a6b167..6c6680444 100644 --- a/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md +++ b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md @@ -45,6 +45,29 @@ Two important things to note: 3. The service defines a "location name" which is a unique identifier of the PoP in the format `country/city/part-of-city`, it is used as `originId` for the producer push stream, identifying where the stream of events come from. +### Snapshots as starting points + +One optimization to reduce the amount of events sent over the network if the local control service has been disconnected +from the central cloud service is to use snapshots as starting points. Only delivering the latest coarse grained +coordinate is enough, so we create a snapshot for each CoarseGrainedLocationChanged event: + +Scala +: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #startFromSnapshot } + +Java +: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #startFromSnapshot } + +The Projection for pushing the events is using `eventsBySlicesStartingFromSnapshots`: + +Scala +: @@snip [DroneEvents.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala) { #startFromSnapshot } + +Java +: @@snip [DroneEvents.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java) { #startFromSnapshot } + +Note that the `Drone.State`, i.e. the snapshot, is transformed to an event. This snapshot event should represent a possible +starting point for the consumer. In this case it represents the latest coarse grained coordinate of the drone. + ## Producer Push Destination The producer push destination is a gRPC service where producers push events, the events are persisted in a local journal @@ -59,10 +82,10 @@ producer `originId` on producer connection, and put it in a tag for the event. The setup logic looks like this: Scala -: @@snip [DroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer } +: @@snip [LocalDroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer } Java -: @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer } +: @@snip [LocalDroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer } The returned @scala[PartialFunction]@java[Function] is an Akka HTTP gRPC request handler that can be bound directly in an HTTP server or combined with multiple other request handlers and then bound as a single server: @@ -84,7 +107,6 @@ Scala Java : @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf) { } - ## Consuming the pushed events What we have set up only means that the pushed events are written into our local journal, to do something useful with the diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java index 57a95df0f..9617ab1f9 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java @@ -215,6 +215,13 @@ private Effect onGetCurrentPosition(State state, GetCurrentPositio .map(position -> Effect().reply(command.replyTo, StatusReply.success(position))) .orElse(Effect().reply(command.replyTo, StatusReply.error("Position of drone is unknown"))); } + // #commandHandler + // #startFromSnapshot + @Override + public boolean shouldSnapshot(State state, Event event, long sequenceNr) { + return event instanceof CoarseGrainedLocationChanged; + } + // #startFromSnapshot } diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java index cd410d591..d38b5c641 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java @@ -66,12 +66,17 @@ public static Behavior eventToCloudPushBehavior( R2dbcProjection.atLeastOnceFlow( ProjectionId.of("drone-event-push", "0-" + maxSlice), Optional.empty(), - EventSourcedProvider.eventsBySlices( + // #startFromSnapshot + EventSourcedProvider.eventsBySlicesStartingFromSnapshots( system, R2dbcReadJournal.Identifier(), eventProducer.eventProducerSource().entityType(), 0, - maxSlice), + maxSlice, + // start from latest drone snapshot and don't replay history + (Drone.State state) -> + new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())), + // #startFromSnapshot eventProducer.handler(system), system)); } @@ -129,12 +134,14 @@ private static Behavior projectionForPartition( R2dbcProjection.atLeastOnceFlow( ProjectionId.of("drone-event-push", minSlice + "-" + maxSlice), Optional.empty(), - EventSourcedProvider.eventsBySlices( + EventSourcedProvider.eventsBySlicesStartingFromSnapshots( system, R2dbcReadJournal.Identifier(), eventProducer.eventProducerSource().entityType(), minSlice, - maxSlice), + maxSlice, + (Drone.State state) -> + new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())), eventProducer.handler(system), system)); } diff --git a/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf b/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf index 4d27f6e45..ea806a02c 100644 --- a/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf +++ b/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf @@ -27,3 +27,7 @@ akka { } } } + +// #startFromSnapshot +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true +// #startFromSnapshot diff --git a/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf b/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf index 694f3ee02..4ad66bb14 100644 --- a/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf +++ b/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf @@ -31,3 +31,5 @@ akka { } } } + +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true From 11dee030c4084e84e71b4baece18869f64356d5a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 25 Oct 2023 13:43:09 +0200 Subject: [PATCH 6/6] remove doc fixme --- .../src/main/paradox/guide/2-drone-location-to-cloud-service.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md index 6c6680444..2305acabd 100644 --- a/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md +++ b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md @@ -124,8 +124,6 @@ Scala Java : @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventProjection } -FIXME this is a lot in one snippet, split it up in multiple parts? - ## Durable State Drone Overview For the cloud representation of the drones, only containing the rough location, we use @extref[Durable State](akka:typed/durable-state/persistence.html),