diff --git a/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md index cc5a6b167..2305acabd 100644 --- a/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md +++ b/akka-edge-docs/src/main/paradox/guide/2-drone-location-to-cloud-service.md @@ -45,6 +45,29 @@ Two important things to note: 3. The service defines a "location name" which is a unique identifier of the PoP in the format `country/city/part-of-city`, it is used as `originId` for the producer push stream, identifying where the stream of events come from. +### Snapshots as starting points + +One optimization to reduce the amount of events sent over the network if the local control service has been disconnected +from the central cloud service is to use snapshots as starting points. Only delivering the latest coarse grained +coordinate is enough, so we create a snapshot for each CoarseGrainedLocationChanged event: + +Scala +: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #startFromSnapshot } + +Java +: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #startFromSnapshot } + +The Projection for pushing the events is using `eventsBySlicesStartingFromSnapshots`: + +Scala +: @@snip [DroneEvents.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala) { #startFromSnapshot } + +Java +: @@snip [DroneEvents.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java) { #startFromSnapshot } + +Note that the `Drone.State`, i.e. the snapshot, is transformed to an event. This snapshot event should represent a possible +starting point for the consumer. In this case it represents the latest coarse grained coordinate of the drone. + ## Producer Push Destination The producer push destination is a gRPC service where producers push events, the events are persisted in a local journal @@ -59,10 +82,10 @@ producer `originId` on producer connection, and put it in a tag for the event. The setup logic looks like this: Scala -: @@snip [DroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer } +: @@snip [LocalDroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer } Java -: @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer } +: @@snip [LocalDroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer } The returned @scala[PartialFunction]@java[Function] is an Akka HTTP gRPC request handler that can be bound directly in an HTTP server or combined with multiple other request handlers and then bound as a single server: @@ -84,7 +107,6 @@ Scala Java : @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf) { } - ## Consuming the pushed events What we have set up only means that the pushed events are written into our local journal, to do something useful with the @@ -102,8 +124,6 @@ Scala Java : @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventProjection } -FIXME this is a lot in one snippet, split it up in multiple parts? - ## Durable State Drone Overview For the cloud representation of the drones, only containing the rough location, we use @extref[Durable State](akka:typed/durable-state/persistence.html), diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java index 57a95df0f..9617ab1f9 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java @@ -215,6 +215,13 @@ private Effect onGetCurrentPosition(State state, GetCurrentPositio .map(position -> Effect().reply(command.replyTo, StatusReply.success(position))) .orElse(Effect().reply(command.replyTo, StatusReply.error("Position of drone is unknown"))); } + // #commandHandler + // #startFromSnapshot + @Override + public boolean shouldSnapshot(State state, Event event, long sequenceNr) { + return event instanceof CoarseGrainedLocationChanged; + } + // #startFromSnapshot } diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java index cd410d591..d38b5c641 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java @@ -66,12 +66,17 @@ public static Behavior eventToCloudPushBehavior( R2dbcProjection.atLeastOnceFlow( ProjectionId.of("drone-event-push", "0-" + maxSlice), Optional.empty(), - EventSourcedProvider.eventsBySlices( + // #startFromSnapshot + EventSourcedProvider.eventsBySlicesStartingFromSnapshots( system, R2dbcReadJournal.Identifier(), eventProducer.eventProducerSource().entityType(), 0, - maxSlice), + maxSlice, + // start from latest drone snapshot and don't replay history + (Drone.State state) -> + new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())), + // #startFromSnapshot eventProducer.handler(system), system)); } @@ -129,12 +134,14 @@ private static Behavior projectionForPartition( R2dbcProjection.atLeastOnceFlow( ProjectionId.of("drone-event-push", minSlice + "-" + maxSlice), Optional.empty(), - EventSourcedProvider.eventsBySlices( + EventSourcedProvider.eventsBySlicesStartingFromSnapshots( system, R2dbcReadJournal.Identifier(), eventProducer.eventProducerSource().entityType(), minSlice, - maxSlice), + maxSlice, + (Drone.State state) -> + new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())), eventProducer.handler(system), system)); } diff --git a/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf b/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf index 4d27f6e45..ea806a02c 100644 --- a/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf +++ b/samples/grpc/local-drone-control-java/src/main/resources/persistence-h2.conf @@ -27,3 +27,7 @@ akka { } } } + +// #startFromSnapshot +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true +// #startFromSnapshot diff --git a/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf b/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf index 694f3ee02..4ad66bb14 100644 --- a/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf +++ b/samples/grpc/local-drone-control-java/src/main/resources/persistence-postgres.conf @@ -31,3 +31,5 @@ akka { } } } + +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf index 4d27f6e45..ea806a02c 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-h2.conf @@ -27,3 +27,7 @@ akka { } } } + +// #startFromSnapshot +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true +// #startFromSnapshot diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf index 694f3ee02..4ad66bb14 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/persistence-postgres.conf @@ -31,3 +31,5 @@ akka { } } } + +akka.persistence.r2dbc.query.start-from-snapshot.enabled = true diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala index a21f89022..f34c82e51 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala @@ -96,6 +96,11 @@ object Drone { .receiveSignal { case (_, PostStop) => telemetry.droneEntityPassivated() } + // #startFromSnapshot + .snapshotWhen { (_, event, _) => + event.isInstanceOf[CoarseGrainedLocationChanged] + } + // #startFromSnapshot } // #commandHandler diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala index 4a89af47c..dab58076e 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala @@ -62,12 +62,20 @@ object DroneEvents { R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]]( ProjectionId("drone-event-push", s"0-$maxSlice"), settings = None, - sourceProvider = EventSourcedProvider.eventsBySlices[Drone.Event]( - system, - R2dbcReadJournal.Identifier, - eventProducer.eventProducerSource.entityType, - 0, - maxSlice), + // #startFromSnapshot + sourceProvider = EventSourcedProvider + .eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event]( + system, + R2dbcReadJournal.Identifier, + eventProducer.eventProducerSource.entityType, + 0, + maxSlice, + // start from latest drone snapshot and don't replay history + { (state: Drone.State) => + Drone.CoarseGrainedLocationChanged( + state.coarseGrainedCoordinates.get) + }), + // #startFromSnapshot handler = eventProducer.handler())) } @@ -114,12 +122,18 @@ object DroneEvents { R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]]( ProjectionId("drone-event-push", s"$minSlice-$maxSlice"), settings = None, - sourceProvider = EventSourcedProvider.eventsBySlices[Drone.Event]( - system, - R2dbcReadJournal.Identifier, - eventProducer.eventProducerSource.entityType, - minSlice, - maxSlice), + sourceProvider = EventSourcedProvider + .eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event]( + system, + R2dbcReadJournal.Identifier, + eventProducer.eventProducerSource.entityType, + minSlice, + maxSlice, + // start from latest drone snapshot and don't replay history + { (state: Drone.State) => + Drone.CoarseGrainedLocationChanged( + state.coarseGrainedCoordinates.get) + }), handler = eventProducer.handler())) }