Skip to content

Commit

Permalink
Scala version working (given changes in #1094
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 7, 2023
1 parent d5bcedc commit 8c0b9be
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 44 deletions.
13 changes: 13 additions & 0 deletions samples/grpc/local-drone-control-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ Inspect the current state of the local delivery queue
grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue
```

Ask to charge the drone, with a charging station first created in the restaurant-drone-deliveries service:

```shell
grpcurl -d '{"drone_id":"drone1","charging_station_id":"station1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.GoCharge
```

Use the restaurant-drone-deliveries charge station inspection command to see the charging drones, and that they eventually
complete their charging:

```shell
grpcurl -d '{"charging_station_id":"station1"}' -plaintext localhost:8101 charging.ChargingStationService.GetChargingStationState
```

## Running the sample as a multi node service

It is also possible to run this sample service as a multi node Akka Cluster, for that you need to start a PostgreSQL
Expand Down
4 changes: 3 additions & 1 deletion samples/grpc/local-drone-control-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ val AkkaHttpVersion = "10.6.0"
val AkkaManagementVersion = "1.5.0"
val AkkaPersistenceR2dbcVersion = "1.2.0"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.1-M1")
sys.props.getOrElse(
"akka-projection.version",
"1.5.1-M1-11-d5bcedce-20231207-1512-SNAPSHOT")
val AkkaDiagnosticsVersion = "2.1.0"

enablePlugins(AkkaGrpcPlugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ option java_multiple_files = true;
option java_package = "local.drones.proto";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "common/coordinates.proto";

package local.drones;
Expand All @@ -16,6 +17,9 @@ service DroneService {
// deliveries
rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {}
rpc CompleteDelivery (CompleteDeliveryRequest) returns (google.protobuf.Empty) {}

// charging
rpc GoCharge (GoChargeRequest) returns (ChargingResponse) {}
}


Expand All @@ -38,4 +42,24 @@ message RequestNextDeliveryResponse {

message CompleteDeliveryRequest {
string delivery_id = 1;
}

message GoChargeRequest {
string drone_id = 1;
string charging_station_id = 2;
}

message ChargingResponse {
oneof response {
ChargingStarted started = 1;
ComeBackLater come_back_later = 2;
};
}

message ChargingStarted {
google.protobuf.Timestamp done_by = 1;
}

message ComeBackLater {
google.protobuf.Timestamp first_slot_free_at = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ charging-station {
parallel-updates = 1
# only list the cloud replica(s) we want to connect to
replicas: [{
replica-id = "cloud1"
replica-id = cloud1
number-of-consumers = 1
grpc.client {
# same as for producer in grpc.conf, so re-use config from there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object ChargingStation {
implicit system: ActorSystem[_]): EdgeReplication[Command] = {
val replicationSettings =
ReplicationSettings[Command](EntityType, R2dbcReplication())
.withSelfReplicaId(ReplicaId(locationId.replace("/", "_")))
.withSelfReplicaId(ReplicaId(locationId))
Replication.grpcEdgeReplication(replicationSettings)(ChargingStation.apply)
}

Expand All @@ -87,6 +87,9 @@ object ChargingStation {
Behaviors.setup[Command] { context =>
Behaviors.withTimers { timers =>
replicatedBehaviors.setup { replicationContext =>
context.log.info(
"Charging Station {} starting up",
replicationContext.entityId)
new ChargingStation(context, replicationContext, timers).behavior()
}
}
Expand Down Expand Up @@ -185,7 +188,12 @@ class ChargingStation(
}

case CompleteCharging(droneId) =>
Effect.persist(ChargingCompleted(droneId))
if (state.dronesCharging.exists(_.droneId == droneId)) {
context.log.info("Drone {} completed charging", droneId)
Effect.persist(ChargingCompleted(droneId))
} else {

}

case GetState(replyTo) =>
Effect.reply(replyTo)(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.actor.typed.{ ActorSystem, Behavior }
import akka.cluster.typed.{ ClusterSingleton, SingletonActor }
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import charging.ChargingStation

/**
* Main for starting the local-drone-control as a cluster rather than a single self-contained node. Requires
Expand Down Expand Up @@ -53,13 +54,20 @@ object ClusteredMain {
new DeliveriesQueueServiceImpl(settings, deliveriesQueue)(
context.system)

// replicated charging station entity
val chargingStationReplication =
ChargingStation.initEdge(settings.locationId)(context.system)

val grpcInterface =
context.system.settings.config
.getString("local-drone-control.grpc.interface")
val grpcPort =
context.system.settings.config.getInt("local-drone-control.grpc.port")
val droneService =
new DroneServiceImpl(deliveriesQueue, settings)(context.system)
new DroneServiceImpl(
deliveriesQueue,
chargingStationReplication.entityRefFactory,
settings)(context.system)
LocalDroneControlServer.start(
grpcInterface,
grpcPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import akka.Done
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.grpc.GrpcServiceException
import akka.util.Timeout
import charging.ChargingStation
import com.google.protobuf.empty.Empty
import com.google.protobuf.timestamp.Timestamp
import io.grpc.Status
import org.slf4j.LoggerFactory
import local.drones.proto
Expand All @@ -16,6 +19,8 @@ import scala.concurrent.TimeoutException

class DroneServiceImpl(
deliveriesQueue: ActorRef[DeliveriesQueue.Command],
chargingStationEntityRefFactory: String => EntityRef[
ChargingStation.Command],
settings: Settings)(implicit system: ActorSystem[_])
extends proto.DroneService {

Expand Down Expand Up @@ -82,6 +87,28 @@ class DroneServiceImpl(
.map(_ => Empty.defaultInstance)
}

override def goCharge(
in: proto.GoChargeRequest): Future[proto.ChargingResponse] = {
logger.info(
"Requesting charge of {} from {}",
in.droneId,
in.chargingStationId)
val entityRef = chargingStationEntityRefFactory(in.chargingStationId)
entityRef
.ask[ChargingStation.StartChargingResponse](
ChargingStation.StartCharging(in.droneId, _))
.map {
case ChargingStation.ChargingStarted(_, chargeComplete) =>
proto.ChargingResponse(
ChargingResponse.Response.Started(
proto.ChargingStarted(Some(Timestamp(chargeComplete)))))
case ChargingStation.AllSlotsBusy(comeBackAt) =>
proto.ChargingResponse(
ChargingResponse.Response.ComeBackLater(
proto.ComeBackLater(Some(Timestamp(comeBackAt)))))
}
}

private def convertError[T](response: Future[T]): Future[T] = {
response.recoverWith {
case _: TimeoutException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ object Main {
context.system)

// replicated charging station entity
ChargingStation.initEdge(settings.locationId)(context.system)
val chargingStationReplication =
ChargingStation.initEdge(settings.locationId)(context.system)

val grpcInterface =
context.system.settings.config
.getString("local-drone-control.grpc.interface")
val grpcPort =
context.system.settings.config.getInt("local-drone-control.grpc.port")
val droneService =
new DroneServiceImpl(deliveriesQueue, settings)(context.system)
new DroneServiceImpl(
deliveriesQueue,
chargingStationReplication.entityRefFactory,
settings)(context.system)
LocalDroneControlServer.start(
grpcInterface,
grpcPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name := "restaurant-drone-deliveries-service"

organization := "com.lightbend.akka.samples"
organizationHomepage := Some(url("https://akka.io"))
licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))
licenses := Seq(
("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))

resolvers += "Akka library repository".at("https://repo.akka.io/maven")

Expand Down Expand Up @@ -33,7 +34,9 @@ val AkkaHttpVersion = "10.6.0"
val AkkaManagementVersion = "1.5.0"
val AkkaPersistenceR2dbcVersion = "1.2.0"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.1-M1")
sys.props.getOrElse(
"akka-projection.version",
"1.5.1-M1-11-d5bcedce-20231207-1512-SNAPSHOT")
val AkkaDiagnosticsVersion = "2.1.0"

enablePlugins(AkkaGrpcPlugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ akka.projection.grpc.replication.accept-edge-replication = on
# is the same as the ChargingStation.EntityType.

charging-station {
# Note: the replica id must be the same that the edge system has defined for connecting here
self-replica-id = cloud1
self-replica-id = ${?SELF_REPLICA_ID}
entity-event-replication-timeout = 10s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ object DroneDeliveriesServer {
port: Int,
droneOverviewService: central.drones.proto.DroneOverviewService,
restaurantDeliveriesService: central.deliveries.proto.RestaurantDeliveriesService,
deliveryEventsProducerService: PartialFunction[
HttpRequest,
Future[HttpResponse]],
pushedDroneEventsHandler: PartialFunction[
HttpRequest,
Future[HttpResponse]],
chargingStationService: charging.proto.ChargingStationService)(
chargingStationService: charging.proto.ChargingStationService,
eventPullHandler: PartialFunction[HttpRequest, Future[HttpResponse]],
eventPushHandler: PartialFunction[HttpRequest, Future[HttpResponse]])(
implicit system: ActorSystem[_]): Unit = {
import system.executionContext

Expand All @@ -40,8 +36,8 @@ object DroneDeliveriesServer {
DroneOverviewServiceHandler.partial(droneOverviewService),
RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService),
ChargingStationServiceHandler.partial(chargingStationService),
deliveryEventsProducerService,
pushedDroneEventsHandler,
eventPullHandler,
eventPushHandler,
ServerReflection.partial(
List(
DroneOverviewService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.SpawnProtocol
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination
import akka.projection.grpc.producer.scaladsl.EventProducer
import central.deliveries.DeliveryEvents
import central.deliveries.RestaurantDeliveries
import central.deliveries.RestaurantDeliveriesServiceImpl
import central.drones.{Drone, DroneOverviewServiceImpl, LocalDroneEvents}
import central.drones.{ Drone, DroneOverviewServiceImpl, LocalDroneEvents }
import charging.ChargingStation
import charging.ChargingStationServiceImpl
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -46,32 +47,39 @@ object Main {
val port = system.settings.config
.getInt("restaurant-drone-deliveries-service.grpc.port")

val pushedDroneEventsHandler =
LocalDroneEvents.pushedEventsGrpcHandler(system)
val pushedEventsDestination =
LocalDroneEvents.pushedEventsDestination(system)
val deliveryEventsProducerSource =
DeliveryEvents.eventProducerSource(system)
val droneOverviewService = new DroneOverviewServiceImpl(system, settings)
val restaurantDeliveriesService =
new RestaurantDeliveriesServiceImpl(system, settings)

val chargingStationService = new ChargingStationServiceImpl(chargingStationReplication.entityRefFactory)
val chargingStationService = new ChargingStationServiceImpl(
chargingStationReplication.entityRefFactory)

// delivery events and charging station replication both are Akka Projection gRPC push destinations
// and needs to be combined into a single gRPC service handling both:
// FIXME shouldn't this rather combine with pushedDroneEvents handler? Hmmmm.
val eventProducerService = EventProducer.grpcServiceHandler(
// delivery events and charging station replication both are Akka Projection gRPC event
// producers (pulled by the local drone control) and needs to be combined into a single gRPC service handling both:
val eventPullHandler = EventProducer.grpcServiceHandler(
Set(
deliveryEventsProducerSource,
chargingStationReplication.eventProducerService))
chargingStationReplication.eventProducerSource))

// the drone events from edge and the charging station replicated entity are both Akka Projection gRPC
// event push destinations (pushed by local drone control) and needs to be combined into a single gRPC service handling both:
val eventPushHandler = EventProducerPushDestination.grpcServiceHandler(
Set(
pushedEventsDestination,
chargingStationReplication.eventProducerPushDestination.get))(system)

DroneDeliveriesServer.start(
interface,
port,
droneOverviewService,
restaurantDeliveriesService,
eventProducerService,
pushedDroneEventsHandler,
chargingStationService)
chargingStationService,
eventPullHandler,
eventPushHandler)

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package central.drones

import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.scaladsl.{
ClusterSharding,
ShardedDaemonProcess
}
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import akka.persistence.query.Offset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.persistence.typed.PersistenceId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.scaladsl.{ Handler, SourceProvider }
import akka.projection.{ Projection, ProjectionBehavior, ProjectionId }
import akka.projection.scaladsl.Handler
import akka.projection.scaladsl.SourceProvider
import akka.projection.Projection
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.util.Timeout
import central.CoarseGrainedCoordinates
import central.Main.logger

import scala.concurrent.Future
import scala.jdk.DurationConverters.JavaDurationOps

/**
Expand All @@ -34,19 +33,16 @@ object LocalDroneEvents {
// use it here as well when we consume the events
private val ProducerEntityType = "Drone"

def pushedEventsGrpcHandler(implicit system: ActorSystem[_])
: PartialFunction[HttpRequest, Future[HttpResponse]] = {
val destination = EventProducerPushDestination(
def pushedEventsDestination(
implicit system: ActorSystem[_]): EventProducerPushDestination =
EventProducerPushDestination(
DroneEventStreamId,
local.drones.proto.DroneEventsProto.javaDescriptor.getFile :: Nil)
.withTransformationForOrigin((origin, _) =>
EventProducerPushDestination.Transformation.empty
// tag all events with the location name of the local control it came from)
.registerTagMapper[local.drones.proto.CoarseDroneLocation](_ =>
Set("location:" + origin)))

EventProducerPushDestination.grpcServiceHandler(destination)(system)
}
// #eventConsumer

// #eventProjection
Expand Down

0 comments on commit 8c0b9be

Please sign in to comment.