Skip to content

Commit

Permalink
Review feedback addressed
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 14, 2023
1 parent 7b27295 commit 4308ce8
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ public Created(String locationId, int chargingSlots) {

public static final class ChargingStarted implements Event, StartChargingResponse {
public final String droneId;
public final Instant chargeComplete;
public final Instant expectedComplete;

public ChargingStarted(String droneId, Instant chargeComplete) {
public ChargingStarted(String droneId, Instant expectedComplete) {
this.droneId = droneId;
this.chargeComplete = chargeComplete;
this.expectedComplete = expectedComplete;
}
}

Expand All @@ -116,12 +116,12 @@ public ChargingCompleted(String droneId) {

public static final class ChargingDrone {
public final String droneId;
public final Instant chargingDone;
public final Instant expectedComplete;
public final String replicaId;

public ChargingDrone(String droneId, Instant chargingDone, String replicaId) {
public ChargingDrone(String droneId, Instant expectedComplete, String replicaId) {
this.droneId = droneId;
this.chargingDone = chargingDone;
this.expectedComplete = expectedComplete;
this.replicaId = replicaId;
}
}
Expand Down Expand Up @@ -252,25 +252,7 @@ public CommandHandler<Command, Event, State> commandHandler() {
+ getReplicationContext().entityId()
+ " was already created")))
.onCommand(StartCharging.class, this::handleStartCharging)
.onCommand(
CompleteCharging.class,
(state, completeCharging) -> {
context.getLog().info("Drone {} completed charging", completeCharging.droneId);
if (state.dronesCharging.stream()
.anyMatch(
chargingDrone -> chargingDrone.droneId.equals(completeCharging.droneId)))
return Effect()
.persist(new ChargingCompleted(completeCharging.droneId))
.thenReply(completeCharging.replyTo, newState -> StatusReply.ack());
else
return Effect()
.reply(
completeCharging.replyTo,
StatusReply.error(
"Drone "
+ completeCharging.droneId
+ " is not currently charging."));
})
.onCommand(CompleteCharging.class, this::handleCompleteCharging)
.onCommand(
GetState.class,
(state, getState) -> Effect().reply(getState.replyTo, StatusReply.success(state)));
Expand All @@ -285,9 +267,9 @@ private Effect<Event, State> handleStartCharging(State state, StartCharging star
} else if (state.dronesCharging.size() >= state.chargingSlots) {
var earliestFreeSlot =
state.dronesCharging.stream()
.min(Comparator.comparing(chargingDrone -> chargingDrone.chargingDone))
.min(Comparator.comparing(chargingDrone -> chargingDrone.expectedComplete))
.get()
.chargingDone;
.expectedComplete;
context
.getLog()
.info(
Expand All @@ -298,20 +280,36 @@ private Effect<Event, State> handleStartCharging(State state, StartCharging star
.reply(startCharging.replyTo, StatusReply.success(new AllSlotsBusy(earliestFreeSlot)));
} else {
// charge
var chargeCompletedBy = Instant.now().plus(FULL_CHARGE_TIME);
var expectedComplete = Instant.now().plus(FULL_CHARGE_TIME);
context
.getLog()
.info(
"Drone {} requested charging, will complete charging at {}",
startCharging.droneId,
chargeCompletedBy);
var event = new ChargingStarted(startCharging.droneId, chargeCompletedBy);
expectedComplete);
var event = new ChargingStarted(startCharging.droneId, expectedComplete);
return Effect()
.persist(event)
.thenReply(startCharging.replyTo, newState -> StatusReply.success(event));
}
}

private Effect<Event, State> handleCompleteCharging(
State state, CompleteCharging completeCharging) {
context.getLog().info("Drone {} completed charging", completeCharging.droneId);
if (state.dronesCharging.stream()
.anyMatch(chargingDrone -> chargingDrone.droneId.equals(completeCharging.droneId)))
return Effect()
.persist(new ChargingCompleted(completeCharging.droneId))
.thenReply(completeCharging.replyTo, newState -> StatusReply.ack());
else
return Effect()
.reply(
completeCharging.replyTo,
StatusReply.error(
"Drone " + completeCharging.droneId + " is not currently charging."));
}

@Override
public EventHandler<State, Event> eventHandler() {
var noStateHandler =
Expand All @@ -338,7 +336,7 @@ public EventHandler<State, Event> eventHandler() {
newSet.add(
new ChargingDrone(
event.droneId,
event.chargeComplete,
event.expectedComplete,
getReplicationContext().origin().id()));
return new State(
state.chargingSlots,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ private static Behavior<Void> rootBehavior() {
new DroneServiceImpl(
context.getSystem(),
deliveriesQueue,
// FIXME wrong function type
id -> chargingStationReplication.entityRefFactory().apply(id),
chargingStationReplication.entityRefFactory(),
settings);

var grpcInterface =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ public CompletionStage<ChargingResponse> goCharge(GoChargeRequest in) {
chargingStationResponse.thenApply(
message -> {
if (message instanceof ChargingStation.ChargingStarted) {
var chargeComplete = ((ChargingStation.ChargingStarted) message).chargeComplete;
var expectedComplete = ((ChargingStation.ChargingStarted) message).expectedComplete;
return ChargingResponse.newBuilder()
.setStarted(
ChargingStarted.newBuilder()
.setDoneBy(instantToProtoTimestamp(chargeComplete))
.setExpectedComplete(instantToProtoTimestamp(expectedComplete))
.build())
.build();
} else if (message instanceof ChargingStation.AllSlotsBusy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static void start(
ActorSystem<?> system,
DroneService droneService,
DeliveriesQueueService deliveriesQueueService) {
@SuppressWarnings("unchecked")
var service =
ServiceHandler.concatOrNotFound(
DroneServiceHandlerFactory.create(droneService, system),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ private static Behavior<Void> rootBehavior() {
new DroneServiceImpl(
context.getSystem(),
deliveriesQueue,
// FIXME wrong function type
id -> chargingStationReplication.entityRefFactory().apply(id),
chargingStationReplication.entityRefFactory(),
settings);

var grpcInterface =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ message ChargingResponse {
}

message ChargingStarted {
google.protobuf.Timestamp done_by = 1;
google.protobuf.Timestamp expected_complete = 1;
}

message ComeBackLater {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ akka.projection.grpc {
producer {
query-plugin-id = "akka.persistence.r2dbc.query"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ message ChargingResponse {
}

message ChargingStarted {
google.protobuf.Timestamp done_by = 1;
google.protobuf.Timestamp expected_complete = 1;
}

message ComeBackLater {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ object ChargingStation {
// events
sealed trait Event extends CborSerializable
case class Created(locationId: String, chargingSlots: Int) extends Event
case class ChargingStarted(droneId: String, chargeComplete: Instant)
case class ChargingStarted(droneId: String, expectedComplete: Instant)
extends Event
with StartChargingResponse

case class ChargingCompleted(droneId: String) extends Event

case class ChargingDrone(
droneId: String,
chargingDone: Instant,
expectedComplete: Instant,
replicaId: String)
case class State(
chargingSlots: Int,
Expand Down Expand Up @@ -179,7 +179,8 @@ class ChargingStation(
droneId)
Effect.none
} else if (state.dronesCharging.size >= state.chargingSlots) {
val earliestFreeSlot = state.dronesCharging.map(_.chargingDone).min
val earliestFreeSlot =
state.dronesCharging.map(_.expectedComplete).min
context.log.info(
"Drone {} requested charging but all stations busy, earliest free slot {}",
droneId,
Expand All @@ -188,13 +189,13 @@ class ChargingStation(
StatusReply.Success(AllSlotsBusy(earliestFreeSlot)))
} else {
// charge
val chargeCompletedBy =
val expectedComplete =
Instant.now().plusSeconds(FullChargeTime.toSeconds)
context.log.info(
"Drone {} requested charging, expected to complete charging at {}",
droneId,
chargeCompletedBy)
val event = ChargingStarted(droneId, chargeCompletedBy)
expectedComplete)
val event = ChargingStarted(droneId, expectedComplete)
Effect
.persist(event)
.thenReply(replyTo)(_ => StatusReply.Success(event))
Expand Down Expand Up @@ -233,11 +234,11 @@ class ChargingStation(
case Created(_, _) =>
context.log.warn("Saw a second created event, ignoring")
Some(state)
case ChargingStarted(droneId, chargeComplete) =>
case ChargingStarted(droneId, expectedComplete) =>
Some(
state.copy(dronesCharging = state.dronesCharging + ChargingDrone(
droneId,
chargeComplete,
expectedComplete,
replicationContext.origin.id)))
case ChargingCompleted(droneId) =>
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ class DroneServiceImpl(
.askWithStatus[ChargingStation.StartChargingResponse](
ChargingStation.StartCharging(in.droneId, _))
.map {
case ChargingStation.ChargingStarted(_, chargeComplete) =>
case ChargingStation.ChargingStarted(_, expectedComplete) =>
proto.ChargingResponse(
proto.ChargingResponse.Response
.Started(proto.ChargingStarted(Some(Timestamp(chargeComplete)))))
proto.ChargingResponse.Response.Started(
proto.ChargingStarted(Some(Timestamp(expectedComplete)))))
case ChargingStation.AllSlotsBusy(comeBackAt) =>
proto.ChargingResponse(
proto.ChargingResponse.Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ public Created(String locationId, int chargingSlots) {

public static final class ChargingStarted implements Event, StartChargingResponse {
public final String droneId;
public final Instant chargeComplete;
public final Instant expectedComplete;

public ChargingStarted(String droneId, Instant chargeComplete) {
public ChargingStarted(String droneId, Instant expectedComplete) {
this.droneId = droneId;
this.chargeComplete = chargeComplete;
this.expectedComplete = expectedComplete;
}
}

Expand All @@ -116,12 +116,12 @@ public ChargingCompleted(String droneId) {

public static final class ChargingDrone {
public final String droneId;
public final Instant chargingDone;
public final Instant expectedComplete;
public final String replicaId;

public ChargingDrone(String droneId, Instant chargingDone, String replicaId) {
public ChargingDrone(String droneId, Instant expectedComplete, String replicaId) {
this.droneId = droneId;
this.chargingDone = chargingDone;
this.expectedComplete = expectedComplete;
this.replicaId = replicaId;
}
}
Expand Down Expand Up @@ -252,25 +252,7 @@ public CommandHandler<Command, Event, State> commandHandler() {
+ getReplicationContext().entityId()
+ " was already created")))
.onCommand(StartCharging.class, this::handleStartCharging)
.onCommand(
CompleteCharging.class,
(state, completeCharging) -> {
context.getLog().info("Drone {} completed charging", completeCharging.droneId);
if (state.dronesCharging.stream()
.anyMatch(
chargingDrone -> chargingDrone.droneId.equals(completeCharging.droneId)))
return Effect()
.persist(new ChargingCompleted(completeCharging.droneId))
.thenReply(completeCharging.replyTo, newState -> StatusReply.ack());
else
return Effect()
.reply(
completeCharging.replyTo,
StatusReply.error(
"Drone "
+ completeCharging.droneId
+ " is not currently charging."));
})
.onCommand(CompleteCharging.class, this::handleCompleteCharging)
.onCommand(
GetState.class,
(state, getState) -> Effect().reply(getState.replyTo, StatusReply.success(state)));
Expand All @@ -285,9 +267,9 @@ private Effect<Event, State> handleStartCharging(State state, StartCharging star
} else if (state.dronesCharging.size() >= state.chargingSlots) {
var earliestFreeSlot =
state.dronesCharging.stream()
.min(Comparator.comparing(chargingDrone -> chargingDrone.chargingDone))
.min(Comparator.comparing(chargingDrone -> chargingDrone.expectedComplete))
.get()
.chargingDone;
.expectedComplete;
context
.getLog()
.info(
Expand All @@ -298,20 +280,36 @@ private Effect<Event, State> handleStartCharging(State state, StartCharging star
.reply(startCharging.replyTo, StatusReply.success(new AllSlotsBusy(earliestFreeSlot)));
} else {
// charge
var chargeCompletedBy = Instant.now().plus(FULL_CHARGE_TIME);
var expectedComplete = Instant.now().plus(FULL_CHARGE_TIME);
context
.getLog()
.info(
"Drone {} requested charging, will complete charging at {}",
startCharging.droneId,
chargeCompletedBy);
var event = new ChargingStarted(startCharging.droneId, chargeCompletedBy);
expectedComplete);
var event = new ChargingStarted(startCharging.droneId, expectedComplete);
return Effect()
.persist(event)
.thenReply(startCharging.replyTo, newState -> StatusReply.success(event));
}
}

private Effect<Event, State> handleCompleteCharging(
State state, CompleteCharging completeCharging) {
context.getLog().info("Drone {} completed charging", completeCharging.droneId);
if (state.dronesCharging.stream()
.anyMatch(chargingDrone -> chargingDrone.droneId.equals(completeCharging.droneId)))
return Effect()
.persist(new ChargingCompleted(completeCharging.droneId))
.thenReply(completeCharging.replyTo, newState -> StatusReply.ack());
else
return Effect()
.reply(
completeCharging.replyTo,
StatusReply.error(
"Drone " + completeCharging.droneId + " is not currently charging."));
}

@Override
public EventHandler<State, Event> eventHandler() {
var noStateHandler =
Expand All @@ -338,7 +336,7 @@ public EventHandler<State, Event> eventHandler() {
newSet.add(
new ChargingDrone(
event.droneId,
event.chargeComplete,
event.expectedComplete,
getReplicationContext().origin().id()));
return new State(
state.chargingSlots,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public CompletionStage<GetChargingStationStateResponse> getChargingStationState(
d ->
ChargingDrone.newBuilder()
.setDroneId(d.droneId)
.setChargingComplete(
instantToProtoTimestamp(d.chargingDone))
.setExpectedComplete(
instantToProtoTimestamp(d.expectedComplete))
.build())
.collect(Collectors.toList()))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ message GetChargingStationStateResponse {
message ChargingDrone {
string drone_id = 1;
// timestamp when charging is estimated to complete
google.protobuf.Timestamp charging_complete = 2;
google.protobuf.Timestamp expected_complete = 2;
}
Loading

0 comments on commit 4308ce8

Please sign in to comment.