Skip to content

Commit

Permalink
Started on the Java RES impl
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 6, 2023
1 parent f732ad5 commit 14f9a85
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 0 deletions.
47 changes: 47 additions & 0 deletions samples/grpc/restaurant-drone-deliveries-service-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<logback.version>1.3.6</logback.version>
<junit.version>4.13.1</junit.version>
<scala.binary.version>2.13</scala.binary.version>
<protobuf-java.version>3.22.2</protobuf-java.version>
<!-- needs to be defined to allow for overriding through mvn exec:exec -DAPP_CONFIG=local1.conf -->
<APP_CONFIG>application.conf</APP_CONFIG>
<!-- Version of the Docker image is derived from git commit.
Expand Down Expand Up @@ -162,6 +163,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf-java.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -203,12 +210,52 @@
</configuration>
</plugin>

<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<!-- provides the jars of the classpath as properties inside of maven
so that we can refer to one of the jars in the exec plugin config below -->
<goal>properties</goal>
</goals>
</execution>
<execution>
<id>unpack</id>
<phase>generate-sources</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf-java.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/proto</outputDirectory>
<includes>**/*.proto</includes>
</artifactItem>
</artifactItems>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>com.lightbend.akka.grpc</groupId>
<artifactId>akka-grpc-maven-plugin</artifactId>
<version>${akka-grpc-maven-plugin.version}</version>
<configuration>
<language>Java</language>
<protoPaths>
<protoPath>target/proto</protoPath>
<protoPath>src/main/protobuf</protoPath>
</protoPaths>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
package charging;

import static akka.Done.done;

import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.TimerScheduler;
import akka.persistence.typed.javadsl.*;
import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors;
import akka.serialization.jackson.CborSerializable;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

public class ChargingStation
extends ReplicatedEventSourcedBehavior<
ChargingStation.Command, ChargingStation.Event, ChargingStation.State> {

// commands and replies
public interface Command extends CborSerializable {}

public static final class Create implements Command {
public final String locationId;
public final int chargingSlots;
public final ActorRef<Done> replyTo;

public Create(String locationId, int chargingSlots, ActorRef<Done> replyTo) {
this.locationId = locationId;
this.chargingSlots = chargingSlots;
this.replyTo = replyTo;
}
}

public static final class StartCharging implements Command {
public final String droneId;
public final ActorRef<StartChargingResponse> replyTo;

public StartCharging(String droneId, ActorRef<StartChargingResponse> replyTo) {
this.droneId = droneId;
this.replyTo = replyTo;
}
}

interface StartChargingResponse extends CborSerializable {}

public static final class AllSlotsBusy implements StartChargingResponse {
public final Instant firstSlotFreeAt;

public AllSlotsBusy(Instant firstSlotFreeAt) {
this.firstSlotFreeAt = firstSlotFreeAt;
}
}

public static final class GetState implements Command {
public final ActorRef<State> replyTo;

public GetState(ActorRef<State> replyTo) {
this.replyTo = replyTo;
}
}

private static final class CompleteCharging implements Command {
final String droneId;

public CompleteCharging(String droneId) {
this.droneId = droneId;
}
}

// events
public interface Event extends CborSerializable {}

public static final class Created implements Event {
public final String locationId;
public final int chargingSlots;

public Created(String locationId, int chargingSlots) {
this.locationId = locationId;
this.chargingSlots = chargingSlots;
}
}

public static final class ChargingStarted implements Event, StartChargingResponse {
public final String droneId;
public final Instant chargeComplete;

public ChargingStarted(String droneId, Instant chargeComplete) {
this.droneId = droneId;
this.chargeComplete = chargeComplete;
}
}

public static final class ChargingCompleted implements Event {
public final String droneId;

public ChargingCompleted(String droneId) {
this.droneId = droneId;
}
}

public static final class ChargingDrone {
public final String droneId;
public final Instant chargingDone;
public final String replicaId;

public ChargingDrone(String droneId, Instant chargingDone, String replicaId) {
this.droneId = droneId;
this.chargingDone = chargingDone;
this.replicaId = replicaId;
}
}

public static final class State implements CborSerializable {
public final int chargingSlots;
public final Set<ChargingDrone> dronesCharging;
public final String stationLocationId;

public State(int chargingSlots, Set<ChargingDrone> dronesCharging, String stationLocationId) {
this.chargingSlots = chargingSlots;
this.dronesCharging = dronesCharging;
this.stationLocationId = stationLocationId;
}
}

public static final String ENTITY_TYPE = "charging-station";

private static final Duration FULL_CHARGE_TIME = Duration.ofMinutes(5);

public static Behavior<Command> create(
ReplicatedBehaviors<Command, Event, State> replicatedBehaviors) {
return Behaviors.setup(
(ActorContext<Command> context) ->
Behaviors.withTimers(
(TimerScheduler<Command> timers) ->
replicatedBehaviors.setup(
replicationContext ->
new ChargingStation(context, replicationContext, timers))));
}

private static Duration durationUntil(Instant instant) {
return Duration.ofSeconds(instant.getEpochSecond() - Instant.now().getEpochSecond());
}

private final ActorContext<Command> context;
private final TimerScheduler<Command> timers;

public ChargingStation(
ActorContext<Command> context,
ReplicationContext replicationContext,
TimerScheduler<Command> timers) {
super(replicationContext);
this.context = context;
this.timers = timers;
}

@Override
public State emptyState() {
return null;
}

@Override
public CommandHandler<Command, Event, State> commandHandler() {

var noStateHandler =
newCommandHandlerBuilder()
.forNullState()
.onCommand(
Create.class,
(state, create) ->
Effect()
.persist(new Created(create.locationId, create.chargingSlots))
.thenReply(create.replyTo, stateAfter -> done()))
// FIXME no catch all for particular state in Java?
.onCommand(
command -> true,
unexpected -> {
context
.getLog()
.warn(
"Got an unexpected command {} but charging station with id {} not initialized",
unexpected.getClass(),
getReplicationContext().entityId());
return Effect().none();
});

var initializedHandler =
newCommandHandlerBuilder()
.forNonNullState()
.onCommand(
Create.class,
command -> {
context
.getLog()
.warn(
"Got a create command, but station id {} was already created, ignoring",
getReplicationContext().entityId());
return Effect().none();
})
.onCommand(StartCharging.class, this::handleStartCharging)
.onCommand(
CompleteCharging.class,
completeCharging ->
Effect().persist(new ChargingCompleted(completeCharging.droneId)))
.onCommand(
GetState.class, (state, getState) -> Effect().reply(getState.replyTo, state));

return noStateHandler.orElse(initializedHandler).build();
}

private Effect<Event, State> handleStartCharging(State state, StartCharging startCharging) {
if (state.dronesCharging.stream()
.anyMatch(charging -> charging.droneId.equals(startCharging.droneId))) {
context
.getLog()
.warn(
"Drone {} requested charging but is already charging. Ignoring.",
startCharging.droneId);
return Effect().none();
} else if (state.dronesCharging.size() >= state.chargingSlots) {
var earliestFreeSlot =
state.dronesCharging.stream()
.min(Comparator.comparing(chargingDrone -> chargingDrone.chargingDone))
.get()
.chargingDone;
context
.getLog()
.info(
"Drone {} requested charging but all stations busy, earliest free slot {}",
startCharging.droneId,
earliestFreeSlot);
return Effect().reply(startCharging.replyTo, new AllSlotsBusy(earliestFreeSlot));
} else {
// charge
var chargeCompletedBy = Instant.now().plus(FULL_CHARGE_TIME);
context
.getLog()
.info(
"Drone {} requested charging, will complete charging at {}",
startCharging.droneId,
chargeCompletedBy);
var event = new ChargingStarted(startCharging.droneId, chargeCompletedBy);
return Effect()
.persist(event)
.thenRun(
newState -> {
timers.startSingleTimer(
new CompleteCharging(startCharging.droneId), durationUntil(chargeCompletedBy));
// Note: The event is also the reply
startCharging.replyTo.tell(event);
});
}
}

@Override
public EventHandler<State, Event> eventHandler() {
var noStateHandler =
newEventHandlerBuilder()
.forNullState()
.onEvent(
Created.class,
created ->
new State(created.chargingSlots, Collections.emptySet(), created.locationId));

var initializedStateHandler =
newEventHandlerBuilder()
.forNonNullState()
.onEvent(
Created.class,
(state, event) -> {
context.getLog().warn("Saw a second created event, ignoring");
return state;
})
.onEvent(
ChargingStarted.class,
(state, event) -> {
var newSet = new HashSet<>(state.dronesCharging);
newSet.add(
new ChargingDrone(
event.droneId,
event.chargeComplete,
getReplicationContext().origin().id()));
return new State(
state.chargingSlots,
Collections.unmodifiableSet(newSet),
state.stationLocationId);
})
.onEvent(
ChargingCompleted.class,
(state, event) -> {
var newSet =
state.dronesCharging.stream()
.filter(charging -> !charging.droneId.equals(event.droneId))
.collect(Collectors.toSet());
return new State(
state.chargingSlots,
Collections.unmodifiableSet(newSet),
state.stationLocationId);
});

return noStateHandler.orElse(initializedStateHandler).build();
}
}
Loading

0 comments on commit 14f9a85

Please sign in to comment.