Skip to content

Commit

Permalink
Clarify Akka cluster lifetime management by means of a Resource
Browse files Browse the repository at this point in the history
Thanks to a simple `AkkaCluster` wrapper type and delegated actor system creation to the resource acquisition, graceful shutdown trigger upon release and blocking of built-in SIGTERM hook, we can provide a smooth application scaffolding experience where the resource-based application has full control of the cluster node lifetime
  • Loading branch information
Jonas Chapuis authored and jchapuis committed Apr 17, 2023
1 parent 3db91a2 commit 1ace15f
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 130 deletions.
2 changes: 1 addition & 1 deletion documentation/src/main/paradox/runtime.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Akka runtime

Once required interpreters and typeclass instances have been defined, deploying an entity with Akka boils down to a single call to @scaladoc[deployEntity](endless.runtime.akka.deploy.Deployer). This naturally requires an actor system and the cluster sharding extension in implicit scope.
Once required interpreters and typeclass instances have been defined, deploying an entity with Akka boils down to a single call to @scaladoc[deployEntity](endless.runtime.akka.deploy.Deployer). This requires an actor system and the cluster sharding extension in implicit scope, bundled in the type @scaladoc[AkkaCluster](endless.runtime.akka.deploy.AkkaCluster). The recommended pattern is to use the built-in @scaladoc[managedResource](endless.runtime.akka.deploy.AkkaCluster.managedResource) helper method to obtain an instance of this class, which wraps actor system creation and shutdown with a cats effect @link:[Resource](https://typelevel.org/cats-effect/docs/std/resource) { open=new }.

## `deployEntity`
This function brings everything together and delivers a cats effect @link:[Resource](https://typelevel.org/cats-effect/docs/std/resource) { open=new } with the repository instance in context `F` bundled with the ref to the shard region actor returned by the call to Akka's @link:[ClusterSharding.init](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#basic-example) { open=new }.
Expand Down
69 changes: 39 additions & 30 deletions example/src/main/scala/endless/example/ExampleApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import endless.example.data.Vehicle.VehicleID
import endless.example.data._
import endless.example.logic._
import endless.example.protocol.{BookingCommandProtocol, VehicleCommandProtocol}
import endless.runtime.akka.deploy.AkkaCluster
import endless.runtime.akka.syntax.deploy._
import io.circe.generic.auto._
import org.http4s.blaze.server.BlazeServerBuilder
Expand All @@ -44,8 +45,7 @@ object ExampleApp {
final case class BookingPatch(origin: Option[LatLon], destination: Option[LatLon])

// #main
def apply(implicit actorSystem: ActorSystem[Nothing]): IO[Resource[IO, Server]] = {
implicit val clusterSharding: ClusterSharding = ClusterSharding(actorSystem)
def apply(implicit createActorSystem: => ActorSystem[Nothing]): Resource[IO, Server] = {
implicit val bookingCommandProtocol: BookingCommandProtocol = new BookingCommandProtocol
implicit val vehicleCommandProtocol: VehicleCommandProtocol = new VehicleCommandProtocol
implicit val eventApplier: BookingEventApplier = new BookingEventApplier
Expand All @@ -57,35 +57,44 @@ object ExampleApp {
EntityIDCodec(_.id.show, VehicleID.fromString)
implicit val askTimeout: Timeout = Timeout(10.seconds)

Slf4jLogger
.create[IO]
.map { implicit logger: Logger[IO] =>
Resource
.both(
deployEntity[IO, Booking, BookingEvent, BookingID, BookingAlg, BookingRepositoryAlg](
BookingEntity(_),
BookingRepository(_),
{ case (effector, _, _) => BookingEffector(effector) }
),
deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
VehicleEntity(_).pure[IO],
VehicleRepository(_).pure[IO],
{ case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
customizeBehavior = (_, behavior) => behavior.snapshotAdapter(new VehicleStateAdapter)
)
)
.map { case ((bookingRepository, _), (vehicleRepository, _)) =>
httpService(bookingRepository, vehicleRepository)
}
Resource
.eval(Slf4jLogger.create[IO])
.flatMap { implicit logger: Logger[IO] =>
AkkaCluster.managedResource[IO](createActorSystem).flatMap {
implicit cluster: AkkaCluster =>
Resource
.both(
deployEntity[
IO,
Booking,
BookingEvent,
BookingID,
BookingAlg,
BookingRepositoryAlg
](
BookingEntity(_),
BookingRepository(_),
{ case (effector, _, _) => BookingEffector(effector) }
),
deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
VehicleEntity(_).pure[IO],
VehicleRepository(_).pure[IO],
{ case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
customizeBehavior =
(_, behavior) => behavior.snapshotAdapter(new VehicleStateAdapter)
)
)
.map { case ((bookingRepository, _), (vehicleRepository, _)) =>
httpService(bookingRepository, vehicleRepository)
}
.flatMap(service =>
BlazeServerBuilder[IO]
.bindHttp(8080, "localhost")
.withHttpApp(service)
.resource
)
}
}
.map(
_.flatMap(service =>
BlazeServerBuilder[IO]
.bindHttp(8080, "localhost")
.withHttpApp(service)
.resource
)
)
}
// #main

Expand Down
8 changes: 2 additions & 6 deletions example/src/main/scala/endless/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import cats.effect._
import com.typesafe.config.ConfigFactory

object Main extends IOApp {
implicit val actorSystem: ActorSystem[Nothing] =
implicit def actorSystem: ActorSystem[Nothing] =
ActorSystem.wrap(
akka.actor.ActorSystem(
"example-as",
Expand All @@ -20,10 +20,6 @@ object Main extends IOApp {
)
)

def run(args: List[String]): IO[ExitCode] = {
ExampleApp.apply
.flatMap(_.use(_ => IO.fromFuture(IO(actorSystem.whenTerminated))))
.as(ExitCode.Success)
}
def run(args: List[String]): IO[ExitCode] = ExampleApp.apply.useForever.as(ExitCode.Success)

}
14 changes: 3 additions & 11 deletions example/src/test/scala/endless/example/ExampleAppSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import akka.persistence.testkit.{
PersistenceTestKitDurableStateStorePlugin,
PersistenceTestKitPlugin
}
import cats.effect.{IO, Temporal}
import cats.effect.kernel.Resource
import cats.syntax.applicative._
import cats.effect.IO
import cats.syntax.show._
import com.typesafe.config.ConfigFactory
import endless.example.ExampleApp._
Expand All @@ -26,7 +24,7 @@ import java.util.UUID
import scala.concurrent.duration._

class ExampleAppSuite extends munit.CatsEffectSuite {
implicit val actorSystem: ActorSystem[Nothing] =
implicit def actorSystem: ActorSystem[Nothing] =
ActorSystem.wrap(
akka.actor.ActorSystem(
"example-as",
Expand All @@ -37,13 +35,7 @@ class ExampleAppSuite extends munit.CatsEffectSuite {
)
)

private val server =
ResourceSuiteLocalFixture(
"booking-server",
ExampleApp.apply
.unsafeRunSync()
.flatMap(server => Resource.make(server.pure[IO])(_ => IO.delay(actorSystem.terminate())))
)
private val server = ResourceSuiteLocalFixture("booking-server", ExampleApp.apply)
private val client = ResourceSuiteLocalFixture("booking-client", BlazeClientBuilder[IO].resource)
private val baseUri = uri"http://localhost:8080"
private val baseBookingUri = baseUri / "booking"
Expand Down
1 change: 1 addition & 0 deletions runtime/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ akka {
"endless.runtime.akka.data.Reply" = akka-persistence-tagless-reply
}
}
coordinated-shutdown.run-by-jvm-shutdown-hook=off // ensure we have control of cluster shutdown via resource scoping
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package endless.runtime.akka.deploy

import akka.actor.CoordinatedShutdown
import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import cats.effect.kernel.{Async, Resource, Sync}
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.show._
import org.typelevel.log4cats.Logger

/** Actor system and cluster sharding extension.
* @param system
* actor system
* @param sharding
* cluster sharding extension
*/
final case class AkkaCluster(system: ActorSystem[_], sharding: ClusterSharding)

object AkkaCluster {

/** Create a resource that manages the lifetime of an Akka actor system with cluster sharding
* extension. The actor system is created when the resource is acquired and shutdown when the
* resource is released. Akka's built-in SIGTERM hook is disabled in the reference config, so
* that CoordinatedShutdown is only triggered here, in the release action.
*/
def managedResource[F[_]: Async: Logger](
createActorSystem: => ActorSystem[_]
): Resource[F, AkkaCluster] = Resource.make(initCluster(createActorSystem))(shutdownCluster[F])

private def initCluster[F[_]: Sync: Logger](
createActorSystem: => ActorSystem[_]
): F[AkkaCluster] =
Logger[F].debug("Joining Akka actor cluster") >> Sync[F]
.delay {
val actorSystem = createActorSystem
val sharding = ClusterSharding(actorSystem)
AkkaCluster(actorSystem, sharding)
}
.flatTap(cluster =>
Logger[F].info(show"Joined Akka actor cluster with system name ${cluster.system.name}")
)

private def shutdownCluster[F[_]: Async: Logger](cluster: AkkaCluster) =
Logger[F].info("Leaving Akka actor cluster") >> Async[F]
.fromFuture(
Async[F].delay(
CoordinatedShutdown(cluster.system).run(CoordinatedShutdown.ActorSystemTerminateReason)
)
)
.void >> Logger[F].info("Akka cluster exited and actor system shutdown complete")
}
43 changes: 31 additions & 12 deletions runtime/src/main/scala/endless/runtime/akka/deploy/Deployer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ trait Deployer {
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]] =
identity
)(implicit
sharding: ClusterSharding,
actorSystem: ActorSystem[_],
akkaCluster: AkkaCluster,
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[Alg],
eventApplier: EventApplier[S, E],
Expand Down Expand Up @@ -155,27 +154,47 @@ trait Deployer {
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]] =
identity
)(implicit
sharding: ClusterSharding,
actorSystem: ActorSystem[_],
akkaCluster: AkkaCluster,
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[Alg],
eventApplier: EventApplier[S, E],
askTimeout: Timeout
): Resource[F, (RepositoryAlg[F], ActorRef[ShardingEnvelope[Command]])] = {
implicit val commandRouter: CommandRouter[F, ID] = ShardingCommandRouter.apply
val repositoryT = RepositoryT.apply[F, ID, Alg]
): Resource[F, (RepositoryAlg[F], ActorRef[ShardingEnvelope[Command]])] =
deployF(createEntity, createRepository, createEffector, customizeBehavior, customizeEntity)

private def deployF[F[_]: Async: Logger, S, E, ID: EntityIDCodec, Alg[
_[_]
]: FunctorK, RepositoryAlg[
_[_]
]](
createEntity: Entity[EntityT[F, S, E, *], S, E] => F[Alg[EntityT[F, S, E, *]]],
createRepository: Repository[F, ID, Alg] => F[RepositoryAlg[F]],
createEffector: EffectorParameters[F, S, Alg, RepositoryAlg] => F[EffectorT[F, S, Alg, Unit]],
customizeBehavior: (
EntityContext[Command],
EventSourcedBehavior[Command, E, Option[S]]
) => Behavior[Command] =
(_: EntityContext[Command], behavior: EventSourcedBehavior[Command, E, Option[S]]) =>
behavior,
customizeEntity: akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[
Command
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]] =
identity
)(implicit
akkaCluster: AkkaCluster,
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[Alg],
eventApplier: EventApplier[S, E],
askTimeout: Timeout
): Resource[F, (RepositoryAlg[F], ActorRef[ShardingEnvelope[Command]])] =
for {
interpretedEntityAlg <- Resource.eval(createEntity(EntityT.instance))
repository <- Resource.eval(createRepository(repositoryT))
entity <- new EventSourcedShardedEntityDeployer(
interpretedEntityAlg,
repository,
repositoryT,
createEffector,
customizeBehavior
).deployShardedEntity(sharding, customizeEntity).map((repository, _))
).deployShardedRepository(createRepository, customizeEntity)
} yield entity
}
}

object Deployer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.util.Timeout
import cats.effect.kernel.{Async, Resource}
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.tagless.FunctorK
import endless.core.entity._
import endless.core.interpret.DurableEntityT.DurableEntityT
import endless.core.interpret.EffectorT._
import endless.core.interpret._
import endless.core.protocol.{CommandProtocol, CommandRouter, EntityIDCodec}
import endless.runtime.akka.ShardingCommandRouter
import endless.core.protocol.{CommandProtocol, EntityIDCodec}
import endless.runtime.akka.data._
import endless.runtime.akka.deploy.DurableDeployer.EffectorParameters
import endless.runtime.akka.deploy.internal.DurableShardedEntityDeployer
Expand Down Expand Up @@ -115,8 +112,7 @@ trait DurableDeployer {
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]] =
identity
)(implicit
sharding: ClusterSharding,
actorSystem: ActorSystem[_],
akkaCluster: AkkaCluster,
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[Alg],
askTimeout: Timeout
Expand Down Expand Up @@ -151,24 +147,39 @@ trait DurableDeployer {
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]] =
identity
)(implicit
sharding: ClusterSharding,
actorSystem: ActorSystem[_],
akkaCluster: AkkaCluster,
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[Alg],
askTimeout: Timeout
): Resource[F, (RepositoryAlg[F], ActorRef[ShardingEnvelope[Command]])] =
deployF(createEntity, createRepository, createEffector, customizeBehavior, customizeEntity)

private def deployF[F[_]: Async: Logger, S, ID: EntityIDCodec, Alg[_[_]]: FunctorK, RepositoryAlg[
_[_]
]](
createEntity: DurableEntity[DurableEntityT[F, S, *], S] => F[Alg[DurableEntityT[F, S, *]]],
createRepository: Repository[F, ID, Alg] => F[RepositoryAlg[F]],
createEffector: EffectorParameters[F, S, Alg, RepositoryAlg] => F[EffectorT[F, S, Alg, Unit]],
customizeBehavior: (
EntityContext[Command],
DurableStateBehavior[Command, Option[S]]
) => Behavior[Command],
customizeEntity: akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[
Command
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]]
)(implicit
akkaCluster: AkkaCluster,
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[Alg],
askTimeout: Timeout
): Resource[F, (RepositoryAlg[F], ActorRef[ShardingEnvelope[Command]])] = {
implicit val commandRouter: CommandRouter[F, ID] = ShardingCommandRouter.apply
val repositoryT = RepositoryT.apply[F, ID, Alg]
for {
interpretedEntityAlg <- Resource.eval(createEntity(DurableEntityT.instance))
repository <- Resource.eval(createRepository(repositoryT))
entity <- new DurableShardedEntityDeployer(
interpretedEntityAlg,
repository,
repositoryT,
createEffector,
customizeBehavior
).deployShardedEntity(sharding, customizeEntity).map((repository, _))
).deployShardedRepository(createRepository, customizeEntity)
} yield entity
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,18 @@ private[deploy] class DurableShardedEntityDeployer[F[_]: Async: Logger, S, ID: E
_[_]
]: FunctorK, RepositoryAlg[_[_]]](
interpretedEntityAlg: Alg[DurableEntityT[F, S, *]],
repository: RepositoryAlg[F],
repositoryT: RepositoryT[F, ID, Alg],
createEffector: EffectorParameters[F, S, Alg, RepositoryAlg] => F[EffectorT[F, S, Alg, Unit]],
customizeBehavior: (
EntityContext[Command],
DurableStateBehavior[Command, Option[S]]
) => Behavior[Command]
)(implicit val nameProvider: EntityNameProvider[ID], commandProtocol: CommandProtocol[Alg])
extends ShardingDeployer[F, ID] {
extends ShardedRepositoryDeployer[F, RepositoryAlg, Alg, ID] {

protected override def createBehavior()(implicit
protected override def createBehaviorFor(
repository: RepositoryAlg[F],
repositoryT: RepositoryT[F, ID, Alg]
)(implicit
dispatcher: Dispatcher[F],
actor: ActorContext[Command],
context: EntityContext[Command]
Expand Down
Loading

0 comments on commit 1ace15f

Please sign in to comment.