Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Start from snapshot for drone push projection #1044

Merged
merged 6 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ Two important things to note:
3. The service defines a "location name" which is a unique identifier of the PoP in the format `country/city/part-of-city`,
it is used as `originId` for the producer push stream, identifying where the stream of events come from.

### Snapshots as starting points

One optimization to reduce the amount of events sent over the network if the local control service has been disconnected
from the central cloud service is to use snapshots as starting points. Only delivering the latest coarse grained
coordinate is enough, so we create a snapshot for each CoarseGrainedLocationChanged event:

Scala
: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #startFromSnapshot }

Java
: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #startFromSnapshot }

The Projection for pushing the events is using `eventsBySlicesStartingFromSnapshots`:

Scala
: @@snip [DroneEvents.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala) { #startFromSnapshot }

Java
: @@snip [DroneEvents.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java) { #startFromSnapshot }

Note that the `Drone.State`, i.e. the snapshot, is transformed to an event. This snapshot event should represent a possible
starting point for the consumer. In this case it represents the latest coarse grained coordinate of the drone.

## Producer Push Destination

The producer push destination is a gRPC service where producers push events, the events are persisted in a local journal
Expand All @@ -59,10 +82,10 @@ producer `originId` on producer connection, and put it in a tag for the event.
The setup logic looks like this:

Scala
: @@snip [DroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer }
: @@snip [LocalDroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer }

Java
: @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer }
: @@snip [LocalDroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer }

The returned @scala[PartialFunction]@java[Function] is an Akka HTTP gRPC request handler that can be bound directly in an HTTP server or
combined with multiple other request handlers and then bound as a single server:
Expand All @@ -84,7 +107,6 @@ Scala
Java
: @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf) { }


## Consuming the pushed events

What we have set up only means that the pushed events are written into our local journal, to do something useful with the
Expand All @@ -102,8 +124,6 @@ Scala
Java
: @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventProjection }

FIXME this is a lot in one snippet, split it up in multiple parts?

## Durable State Drone Overview

For the cloud representation of the drones, only containing the rough location, we use @extref[Durable State](akka:typed/durable-state/persistence.html),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ private Effect<Event, State> onGetCurrentPosition(State state, GetCurrentPositio
.map(position -> Effect().reply(command.replyTo, StatusReply.success(position)))
.orElse(Effect().reply(command.replyTo, StatusReply.error("Position of drone is unknown")));
}

// #commandHandler

// #startFromSnapshot
@Override
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
return event instanceof CoarseGrainedLocationChanged;
}
// #startFromSnapshot
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ public static Behavior<ProjectionBehavior.Command> eventToCloudPushBehavior(
R2dbcProjection.atLeastOnceFlow(
ProjectionId.of("drone-event-push", "0-" + maxSlice),
Optional.empty(),
EventSourcedProvider.eventsBySlices(
// #startFromSnapshot
EventSourcedProvider.eventsBySlicesStartingFromSnapshots(
system,
R2dbcReadJournal.Identifier(),
eventProducer.eventProducerSource().entityType(),
0,
maxSlice),
maxSlice,
// start from latest drone snapshot and don't replay history
(Drone.State state) ->
new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())),
// #startFromSnapshot
eventProducer.handler(system),
system));
}
Expand Down Expand Up @@ -129,12 +134,14 @@ private static Behavior<ProjectionBehavior.Command> projectionForPartition(
R2dbcProjection.atLeastOnceFlow(
ProjectionId.of("drone-event-push", minSlice + "-" + maxSlice),
Optional.empty(),
EventSourcedProvider.eventsBySlices(
EventSourcedProvider.eventsBySlicesStartingFromSnapshots(
system,
R2dbcReadJournal.Identifier(),
eventProducer.eventProducerSource().entityType(),
minSlice,
maxSlice),
maxSlice,
(Drone.State state) ->
new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())),
eventProducer.handler(system),
system));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ akka {
}
}
}

// #startFromSnapshot
akka.persistence.r2dbc.query.start-from-snapshot.enabled = true
// #startFromSnapshot
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ akka {
}
}
}

akka.persistence.r2dbc.query.start-from-snapshot.enabled = true
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ akka {
}
}
}

// #startFromSnapshot
akka.persistence.r2dbc.query.start-from-snapshot.enabled = true
// #startFromSnapshot
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ akka {
}
}
}

akka.persistence.r2dbc.query.start-from-snapshot.enabled = true
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ object Drone {
.receiveSignal { case (_, PostStop) =>
telemetry.droneEntityPassivated()
}
// #startFromSnapshot
.snapshotWhen { (_, event, _) =>
event.isInstanceOf[CoarseGrainedLocationChanged]
}
// #startFromSnapshot
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tag for every coarse grained so we always have such a starting point

}

// #commandHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ object DroneEvents {
R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]](
ProjectionId("drone-event-push", s"0-$maxSlice"),
settings = None,
sourceProvider = EventSourcedProvider.eventsBySlices[Drone.Event](
system,
R2dbcReadJournal.Identifier,
eventProducer.eventProducerSource.entityType,
0,
maxSlice),
// #startFromSnapshot
sourceProvider = EventSourcedProvider
.eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event](
system,
R2dbcReadJournal.Identifier,
eventProducer.eventProducerSource.entityType,
0,
maxSlice,
// start from latest drone snapshot and don't replay history
{ (state: Drone.State) =>
Drone.CoarseGrainedLocationChanged(
state.coarseGrainedCoordinates.get)
}),
// #startFromSnapshot
handler = eventProducer.handler()))
}

Expand Down Expand Up @@ -114,12 +122,18 @@ object DroneEvents {
R2dbcProjection.atLeastOnceFlow[Offset, EventEnvelope[Drone.Event]](
ProjectionId("drone-event-push", s"$minSlice-$maxSlice"),
settings = None,
sourceProvider = EventSourcedProvider.eventsBySlices[Drone.Event](
system,
R2dbcReadJournal.Identifier,
eventProducer.eventProducerSource.entityType,
minSlice,
maxSlice),
sourceProvider = EventSourcedProvider
.eventsBySlicesStartingFromSnapshots[Drone.State, Drone.Event](
system,
R2dbcReadJournal.Identifier,
eventProducer.eventProducerSource.entityType,
minSlice,
maxSlice,
// start from latest drone snapshot and don't replay history
{ (state: Drone.State) =>
Drone.CoarseGrainedLocationChanged(
state.coarseGrainedCoordinates.get)
}),
handler = eventProducer.handler()))

}
Expand Down