From 8c0b9be5bd495a3f64e832c16983a8224e888c16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 7 Dec 2023 16:33:25 +0100 Subject: [PATCH] Scala version working (given changes in #1094 --- .../grpc/local-drone-control-scala/README.md | 13 ++++++++ .../grpc/local-drone-control-scala/build.sbt | 4 ++- .../protobuf/local/drones/drone_api.proto | 24 ++++++++++++++ .../src/main/resources/replication.conf | 2 +- .../main/scala/charging/ChargingStation.scala | 12 +++++-- .../scala/local/drones/ClusteredMain.scala | 10 +++++- .../scala/local/drones/DroneServiceImpl.scala | 27 ++++++++++++++++ .../src/main/scala/local/drones/Main.scala | 8 +++-- .../build.sbt | 7 ++-- .../src/main/resources/replication.conf | 1 + .../scala/central/DroneDeliveriesServer.scala | 14 +++----- .../src/main/scala/central/Main.scala | 32 ++++++++++++------- .../central/drones/LocalDroneEvents.scala | 24 ++++++-------- 13 files changed, 134 insertions(+), 44 deletions(-) diff --git a/samples/grpc/local-drone-control-scala/README.md b/samples/grpc/local-drone-control-scala/README.md index 002277020..d96b4ac21 100644 --- a/samples/grpc/local-drone-control-scala/README.md +++ b/samples/grpc/local-drone-control-scala/README.md @@ -47,6 +47,19 @@ Inspect the current state of the local delivery queue grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue ``` +Ask to charge the drone, with a charging station first created in the restaurant-drone-deliveries service: + +```shell +grpcurl -d '{"drone_id":"drone1","charging_station_id":"station1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.GoCharge +``` + +Use the restaurant-drone-deliveries charge station inspection command to see the charging drones, and that they eventually +complete their charging: + +```shell +grpcurl -d '{"charging_station_id":"station1"}' -plaintext localhost:8101 charging.ChargingStationService.GetChargingStationState +``` + ## Running the sample as a multi node service It is also possible to run this sample service as a multi node Akka Cluster, for that you need to start a PostgreSQL diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index c443d849b..d96602f15 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -36,7 +36,9 @@ val AkkaHttpVersion = "10.6.0" val AkkaManagementVersion = "1.5.0" val AkkaPersistenceR2dbcVersion = "1.2.0" val AkkaProjectionVersion = - sys.props.getOrElse("akka-projection.version", "1.5.1-M1") + sys.props.getOrElse( + "akka-projection.version", + "1.5.1-M1-11-d5bcedce-20231207-1512-SNAPSHOT") val AkkaDiagnosticsVersion = "2.1.0" enablePlugins(AkkaGrpcPlugin) diff --git a/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto index 08e81a261..15a4e1e4c 100644 --- a/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto +++ b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto @@ -4,6 +4,7 @@ option java_multiple_files = true; option java_package = "local.drones.proto"; import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; import "common/coordinates.proto"; package local.drones; @@ -16,6 +17,9 @@ service DroneService { // deliveries rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {} rpc CompleteDelivery (CompleteDeliveryRequest) returns (google.protobuf.Empty) {} + + // charging + rpc GoCharge (GoChargeRequest) returns (ChargingResponse) {} } @@ -38,4 +42,24 @@ message RequestNextDeliveryResponse { message CompleteDeliveryRequest { string delivery_id = 1; +} + +message GoChargeRequest { + string drone_id = 1; + string charging_station_id = 2; +} + +message ChargingResponse { + oneof response { + ChargingStarted started = 1; + ComeBackLater come_back_later = 2; + }; +} + +message ChargingStarted { + google.protobuf.Timestamp done_by = 1; +} + +message ComeBackLater { + google.protobuf.Timestamp first_slot_free_at = 1; } \ No newline at end of file diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/replication.conf b/samples/grpc/local-drone-control-scala/src/main/resources/replication.conf index 6cefe04f9..378a3e730 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/replication.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/replication.conf @@ -8,7 +8,7 @@ charging-station { parallel-updates = 1 # only list the cloud replica(s) we want to connect to replicas: [{ - replica-id = "cloud1" + replica-id = cloud1 number-of-consumers = 1 grpc.client { # same as for producer in grpc.conf, so re-use config from there diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala b/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala index 220486925..39982bb4f 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala @@ -77,7 +77,7 @@ object ChargingStation { implicit system: ActorSystem[_]): EdgeReplication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) - .withSelfReplicaId(ReplicaId(locationId.replace("/", "_"))) + .withSelfReplicaId(ReplicaId(locationId)) Replication.grpcEdgeReplication(replicationSettings)(ChargingStation.apply) } @@ -87,6 +87,9 @@ object ChargingStation { Behaviors.setup[Command] { context => Behaviors.withTimers { timers => replicatedBehaviors.setup { replicationContext => + context.log.info( + "Charging Station {} starting up", + replicationContext.entityId) new ChargingStation(context, replicationContext, timers).behavior() } } @@ -185,7 +188,12 @@ class ChargingStation( } case CompleteCharging(droneId) => - Effect.persist(ChargingCompleted(droneId)) + if (state.dronesCharging.exists(_.droneId == droneId)) { + context.log.info("Drone {} completed charging", droneId) + Effect.persist(ChargingCompleted(droneId)) + } else { + + } case GetState(replyTo) => Effect.reply(replyTo)(state) diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/ClusteredMain.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/ClusteredMain.scala index aa03d15db..1ed13259c 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/ClusteredMain.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/ClusteredMain.scala @@ -5,6 +5,7 @@ import akka.actor.typed.{ ActorSystem, Behavior } import akka.cluster.typed.{ ClusterSingleton, SingletonActor } import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement +import charging.ChargingStation /** * Main for starting the local-drone-control as a cluster rather than a single self-contained node. Requires @@ -53,13 +54,20 @@ object ClusteredMain { new DeliveriesQueueServiceImpl(settings, deliveriesQueue)( context.system) + // replicated charging station entity + val chargingStationReplication = + ChargingStation.initEdge(settings.locationId)(context.system) + val grpcInterface = context.system.settings.config .getString("local-drone-control.grpc.interface") val grpcPort = context.system.settings.config.getInt("local-drone-control.grpc.port") val droneService = - new DroneServiceImpl(deliveriesQueue, settings)(context.system) + new DroneServiceImpl( + deliveriesQueue, + chargingStationReplication.entityRefFactory, + settings)(context.system) LocalDroneControlServer.start( grpcInterface, grpcPort, diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala index 15610481b..de38b051e 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala @@ -4,9 +4,12 @@ import akka.Done import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem } import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.grpc.GrpcServiceException import akka.util.Timeout +import charging.ChargingStation import com.google.protobuf.empty.Empty +import com.google.protobuf.timestamp.Timestamp import io.grpc.Status import org.slf4j.LoggerFactory import local.drones.proto @@ -16,6 +19,8 @@ import scala.concurrent.TimeoutException class DroneServiceImpl( deliveriesQueue: ActorRef[DeliveriesQueue.Command], + chargingStationEntityRefFactory: String => EntityRef[ + ChargingStation.Command], settings: Settings)(implicit system: ActorSystem[_]) extends proto.DroneService { @@ -82,6 +87,28 @@ class DroneServiceImpl( .map(_ => Empty.defaultInstance) } + override def goCharge( + in: proto.GoChargeRequest): Future[proto.ChargingResponse] = { + logger.info( + "Requesting charge of {} from {}", + in.droneId, + in.chargingStationId) + val entityRef = chargingStationEntityRefFactory(in.chargingStationId) + entityRef + .ask[ChargingStation.StartChargingResponse]( + ChargingStation.StartCharging(in.droneId, _)) + .map { + case ChargingStation.ChargingStarted(_, chargeComplete) => + proto.ChargingResponse( + ChargingResponse.Response.Started( + proto.ChargingStarted(Some(Timestamp(chargeComplete))))) + case ChargingStation.AllSlotsBusy(comeBackAt) => + proto.ChargingResponse( + ChargingResponse.Response.ComeBackLater( + proto.ComeBackLater(Some(Timestamp(comeBackAt))))) + } + } + private def convertError[T](response: Future[T]): Future[T] = { response.recoverWith { case _: TimeoutException => diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala index 6766e702c..9e8b7d905 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala @@ -41,7 +41,8 @@ object Main { context.system) // replicated charging station entity - ChargingStation.initEdge(settings.locationId)(context.system) + val chargingStationReplication = + ChargingStation.initEdge(settings.locationId)(context.system) val grpcInterface = context.system.settings.config @@ -49,7 +50,10 @@ object Main { val grpcPort = context.system.settings.config.getInt("local-drone-control.grpc.port") val droneService = - new DroneServiceImpl(deliveriesQueue, settings)(context.system) + new DroneServiceImpl( + deliveriesQueue, + chargingStationReplication.entityRefFactory, + settings)(context.system) LocalDroneControlServer.start( grpcInterface, grpcPort, diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt index 2e555893d..4ab293487 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt @@ -2,7 +2,8 @@ name := "restaurant-drone-deliveries-service" organization := "com.lightbend.akka.samples" organizationHomepage := Some(url("https://akka.io")) -licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) +licenses := Seq( + ("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) resolvers += "Akka library repository".at("https://repo.akka.io/maven") @@ -33,7 +34,9 @@ val AkkaHttpVersion = "10.6.0" val AkkaManagementVersion = "1.5.0" val AkkaPersistenceR2dbcVersion = "1.2.0" val AkkaProjectionVersion = - sys.props.getOrElse("akka-projection.version", "1.5.1-M1") + sys.props.getOrElse( + "akka-projection.version", + "1.5.1-M1-11-d5bcedce-20231207-1512-SNAPSHOT") val AkkaDiagnosticsVersion = "2.1.0" enablePlugins(AkkaGrpcPlugin) diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/replication.conf b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/replication.conf index 8f8b95248..5d0d960d3 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/replication.conf +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/replication.conf @@ -5,6 +5,7 @@ akka.projection.grpc.replication.accept-edge-replication = on # is the same as the ChargingStation.EntityType. charging-station { + # Note: the replica id must be the same that the edge system has defined for connecting here self-replica-id = cloud1 self-replica-id = ${?SELF_REPLICA_ID} entity-event-replication-timeout = 10s diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala index e94aed645..ea69d7b4c 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala @@ -25,13 +25,9 @@ object DroneDeliveriesServer { port: Int, droneOverviewService: central.drones.proto.DroneOverviewService, restaurantDeliveriesService: central.deliveries.proto.RestaurantDeliveriesService, - deliveryEventsProducerService: PartialFunction[ - HttpRequest, - Future[HttpResponse]], - pushedDroneEventsHandler: PartialFunction[ - HttpRequest, - Future[HttpResponse]], - chargingStationService: charging.proto.ChargingStationService)( + chargingStationService: charging.proto.ChargingStationService, + eventPullHandler: PartialFunction[HttpRequest, Future[HttpResponse]], + eventPushHandler: PartialFunction[HttpRequest, Future[HttpResponse]])( implicit system: ActorSystem[_]): Unit = { import system.executionContext @@ -40,8 +36,8 @@ object DroneDeliveriesServer { DroneOverviewServiceHandler.partial(droneOverviewService), RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService), ChargingStationServiceHandler.partial(chargingStationService), - deliveryEventsProducerService, - pushedDroneEventsHandler, + eventPullHandler, + eventPushHandler, ServerReflection.partial( List( DroneOverviewService, diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala index 5cde387b7..f45a97d84 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala @@ -4,11 +4,12 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.SpawnProtocol import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement +import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination import akka.projection.grpc.producer.scaladsl.EventProducer import central.deliveries.DeliveryEvents import central.deliveries.RestaurantDeliveries import central.deliveries.RestaurantDeliveriesServiceImpl -import central.drones.{Drone, DroneOverviewServiceImpl, LocalDroneEvents} +import central.drones.{ Drone, DroneOverviewServiceImpl, LocalDroneEvents } import charging.ChargingStation import charging.ChargingStationServiceImpl import org.slf4j.LoggerFactory @@ -46,32 +47,39 @@ object Main { val port = system.settings.config .getInt("restaurant-drone-deliveries-service.grpc.port") - val pushedDroneEventsHandler = - LocalDroneEvents.pushedEventsGrpcHandler(system) + val pushedEventsDestination = + LocalDroneEvents.pushedEventsDestination(system) val deliveryEventsProducerSource = DeliveryEvents.eventProducerSource(system) val droneOverviewService = new DroneOverviewServiceImpl(system, settings) val restaurantDeliveriesService = new RestaurantDeliveriesServiceImpl(system, settings) - val chargingStationService = new ChargingStationServiceImpl(chargingStationReplication.entityRefFactory) + val chargingStationService = new ChargingStationServiceImpl( + chargingStationReplication.entityRefFactory) - // delivery events and charging station replication both are Akka Projection gRPC push destinations - // and needs to be combined into a single gRPC service handling both: - // FIXME shouldn't this rather combine with pushedDroneEvents handler? Hmmmm. - val eventProducerService = EventProducer.grpcServiceHandler( + // delivery events and charging station replication both are Akka Projection gRPC event + // producers (pulled by the local drone control) and needs to be combined into a single gRPC service handling both: + val eventPullHandler = EventProducer.grpcServiceHandler( Set( deliveryEventsProducerSource, - chargingStationReplication.eventProducerService)) + chargingStationReplication.eventProducerSource)) + + // the drone events from edge and the charging station replicated entity are both Akka Projection gRPC + // event push destinations (pushed by local drone control) and needs to be combined into a single gRPC service handling both: + val eventPushHandler = EventProducerPushDestination.grpcServiceHandler( + Set( + pushedEventsDestination, + chargingStationReplication.eventProducerPushDestination.get))(system) DroneDeliveriesServer.start( interface, port, droneOverviewService, restaurantDeliveriesService, - eventProducerService, - pushedDroneEventsHandler, - chargingStationService) + chargingStationService, + eventPullHandler, + eventPushHandler) } diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala index b1d6035bf..e55134cf2 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala @@ -1,11 +1,8 @@ package central.drones import akka.actor.typed.ActorSystem -import akka.cluster.sharding.typed.scaladsl.{ - ClusterSharding, - ShardedDaemonProcess -} -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.query.Offset import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal @@ -13,13 +10,15 @@ import akka.persistence.typed.PersistenceId import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination import akka.projection.r2dbc.scaladsl.R2dbcProjection -import akka.projection.scaladsl.{ Handler, SourceProvider } -import akka.projection.{ Projection, ProjectionBehavior, ProjectionId } +import akka.projection.scaladsl.Handler +import akka.projection.scaladsl.SourceProvider +import akka.projection.Projection +import akka.projection.ProjectionBehavior +import akka.projection.ProjectionId import akka.util.Timeout import central.CoarseGrainedCoordinates import central.Main.logger -import scala.concurrent.Future import scala.jdk.DurationConverters.JavaDurationOps /** @@ -34,9 +33,9 @@ object LocalDroneEvents { // use it here as well when we consume the events private val ProducerEntityType = "Drone" - def pushedEventsGrpcHandler(implicit system: ActorSystem[_]) - : PartialFunction[HttpRequest, Future[HttpResponse]] = { - val destination = EventProducerPushDestination( + def pushedEventsDestination( + implicit system: ActorSystem[_]): EventProducerPushDestination = + EventProducerPushDestination( DroneEventStreamId, local.drones.proto.DroneEventsProto.javaDescriptor.getFile :: Nil) .withTransformationForOrigin((origin, _) => @@ -44,9 +43,6 @@ object LocalDroneEvents { // tag all events with the location name of the local control it came from) .registerTagMapper[local.drones.proto.CoarseDroneLocation](_ => Set("location:" + origin))) - - EventProducerPushDestination.grpcServiceHandler(destination)(system) - } // #eventConsumer // #eventProjection