Skip to content

Commit

Permalink
Java sample, and include in docs
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 25, 2023
1 parent 6e12ccb commit f4ad00a
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ Two important things to note:
3. The service defines a "location name" which is a unique identifier of the PoP in the format `country/city/part-of-city`,
it is used as `originId` for the producer push stream, identifying where the stream of events come from.

### Snapshots as starting points

One optimization to reduce the amount of events sent over the network if the local control service has been disconnected
from the central cloud service is to use snapshots as starting points. Only delivering the latest coarse grained
coordinate is enough, so we create a snapshot for each CoarseGrainedLocationChanged event:

Scala
: @@snip [Drone.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala) { #startFromSnapshot }

Java
: @@snip [Drone.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/Drone.java) { #startFromSnapshot }

The Projection for pushing the events is using `eventsBySlicesStartingFromSnapshots`:

Scala
: @@snip [DroneEvents.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala) { #startFromSnapshot }

Java
: @@snip [DroneEvents.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneEvents.java) { #startFromSnapshot }

Note that the `Drone.State`, i.e. the snapshot, is transformed to an event. This snapshot event should represent a possible
starting point for the consumer. In this case it represents the latest coarse grained coordinate of the drone.

## Producer Push Destination

The producer push destination is a gRPC service where producers push events, the events are persisted in a local journal
Expand All @@ -59,10 +82,10 @@ producer `originId` on producer connection, and put it in a tag for the event.
The setup logic looks like this:

Scala
: @@snip [DroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer }
: @@snip [LocalDroneEvents.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala) { #eventConsumer }

Java
: @@snip [DroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer }
: @@snip [LocalDroneEvents.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java) { #eventConsumer }

The returned @scala[PartialFunction]@java[Function] is an Akka HTTP gRPC request handler that can be bound directly in an HTTP server or
combined with multiple other request handlers and then bound as a single server:
Expand All @@ -84,7 +107,6 @@ Scala
Java
: @@snip [persistence.conf](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf) { }


## Consuming the pushed events

What we have set up only means that the pushed events are written into our local journal, to do something useful with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ private Effect<Event, State> onGetCurrentPosition(State state, GetCurrentPositio
.map(position -> Effect().reply(command.replyTo, StatusReply.success(position)))
.orElse(Effect().reply(command.replyTo, StatusReply.error("Position of drone is unknown")));
}

// #commandHandler

// #startFromSnapshot
@Override
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
return event instanceof CoarseGrainedLocationChanged;
}
// #startFromSnapshot
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ public static Behavior<ProjectionBehavior.Command> eventToCloudPushBehavior(
R2dbcProjection.atLeastOnceFlow(
ProjectionId.of("drone-event-push", "0-" + maxSlice),
Optional.empty(),
EventSourcedProvider.eventsBySlices(
// #startFromSnapshot
EventSourcedProvider.eventsBySlicesStartingFromSnapshots(
system,
R2dbcReadJournal.Identifier(),
eventProducer.eventProducerSource().entityType(),
0,
maxSlice),
maxSlice,
// start from latest drone snapshot and don't replay history
(Drone.State state) ->
new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())),
// #startFromSnapshot
eventProducer.handler(system),
system));
}
Expand Down Expand Up @@ -129,12 +134,14 @@ private static Behavior<ProjectionBehavior.Command> projectionForPartition(
R2dbcProjection.atLeastOnceFlow(
ProjectionId.of("drone-event-push", minSlice + "-" + maxSlice),
Optional.empty(),
EventSourcedProvider.eventsBySlices(
EventSourcedProvider.eventsBySlicesStartingFromSnapshots(
system,
R2dbcReadJournal.Identifier(),
eventProducer.eventProducerSource().entityType(),
minSlice,
maxSlice),
maxSlice,
(Drone.State state) ->
new Drone.CoarseGrainedLocationChanged(state.coarseGrainedCoordinates().get())),
eventProducer.handler(system),
system));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ akka {
}
}
}

// #startFromSnapshot
akka.persistence.r2dbc.query.start-from-snapshot.enabled = true
// #startFromSnapshot
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ akka {
}
}
}

akka.persistence.r2dbc.query.start-from-snapshot.enabled = true

0 comments on commit f4ad00a

Please sign in to comment.