From 1bb613dec38bfaff0b638a92bac2f56277ae86d5 Mon Sep 17 00:00:00 2001 From: Jonas Chapuis Date: Wed, 4 Sep 2024 17:36:38 +0200 Subject: [PATCH 1/6] Update multiple dependencies --- build.sbt | 4 ++-- .../endless/core/interpret/EntityRunFunctions.scala | 1 - .../main/scala/endless/core/interpret/EntityT.scala | 1 - .../core/interpret/EntityTLiftInstance.scala | 1 - .../example/protocol/BookingCommandProtocol.scala | 1 - project/Dependencies.scala | 13 ++++++++----- project/build.properties | 2 +- project/plugins.sbt | 2 +- .../endless/scodec/ScodecCommandProtocolSuite.scala | 6 ++++-- 9 files changed, 16 insertions(+), 15 deletions(-) diff --git a/build.sbt b/build.sbt index f3346788..08cb3cae 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ import Dependencies.* import sbtversionpolicy.Compatibility.None -val scala213 = "2.13.12" +val scala213 = "2.13.14" val scala3 = "3.4.2" val commonSettings = Seq( @@ -88,7 +88,7 @@ lazy val pekkoRuntime = (project in file("pekko-runtime")) .settings(commonSettings *) .dependsOn(core) .settings( - libraryDependencies ++= catsEffectStd ++ pekkoProvided ++ log4cats ++ scalapbCustomizations ++ (mUnit :+ pekkoTypedTestkit % pekkoVersion) + libraryDependencies ++= catsEffectStd ++ pekkoProvided ++ log4cats ++ scalapbCustomizations ++ (mUnit ++ logback ++ log4catsSlf4j :+ pekkoTypedTestkit % pekkoVersion) .map(_ % Test) ) .settings( diff --git a/core/src/main/scala/endless/core/interpret/EntityRunFunctions.scala b/core/src/main/scala/endless/core/interpret/EntityRunFunctions.scala index 3f614e95..42ab11a4 100644 --- a/core/src/main/scala/endless/core/interpret/EntityRunFunctions.scala +++ b/core/src/main/scala/endless/core/interpret/EntityRunFunctions.scala @@ -1,6 +1,5 @@ package endless.core.interpret -import cats.conversions.all.* import cats.data.{Chain, NonEmptyChain} import cats.syntax.applicative.* import cats.syntax.either.* diff --git a/core/src/main/scala/endless/core/interpret/EntityT.scala b/core/src/main/scala/endless/core/interpret/EntityT.scala index 8801986f..bdc4ab4f 100644 --- a/core/src/main/scala/endless/core/interpret/EntityT.scala +++ b/core/src/main/scala/endless/core/interpret/EntityT.scala @@ -1,6 +1,5 @@ package endless.core.interpret -import cats.conversions.all.* import cats.data.{Chain, NonEmptyChain} import cats.effect.kernel.Clock import cats.syntax.applicative.* diff --git a/core/src/main/scala/endless/core/interpret/EntityTLiftInstance.scala b/core/src/main/scala/endless/core/interpret/EntityTLiftInstance.scala index ed75a80b..11872ee7 100644 --- a/core/src/main/scala/endless/core/interpret/EntityTLiftInstance.scala +++ b/core/src/main/scala/endless/core/interpret/EntityTLiftInstance.scala @@ -1,7 +1,6 @@ package endless.core.interpret import cats.Monad -import cats.conversions.all.* import cats.data.NonEmptyChain import cats.syntax.applicative.* import cats.syntax.either.* diff --git a/example/src/main/scala/endless/example/protocol/BookingCommandProtocol.scala b/example/src/main/scala/endless/example/protocol/BookingCommandProtocol.scala index 44d653c2..f9ac68cc 100644 --- a/example/src/main/scala/endless/example/protocol/BookingCommandProtocol.scala +++ b/example/src/main/scala/endless/example/protocol/BookingCommandProtocol.scala @@ -1,6 +1,5 @@ package endless.example.protocol -import cats.conversions.all.* import cats.syntax.show.* import com.google.protobuf.timestamp.Timestamp import endless.\/ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2d51168e..e09a967c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -84,17 +84,20 @@ object Dependencies { ) lazy val log4catsTesting = Seq("org.typelevel" %% "log4cats-testing" % log4catsVersion) - lazy val mUnitVersion = "1.0.0" + lazy val mUnitVersion = "1.0.1" lazy val disciplineMUnitVersion = "2.0.0" + lazy val mUnitScalacheckVersion = "1.0.0" lazy val mUnit = - Seq("org.scalameta" %% "munit", "org.scalameta" %% "munit-scalacheck").map( - _ % mUnitVersion - ) ++ Seq("org.typelevel" %% "discipline-munit" % disciplineMUnitVersion) + Seq( + "org.scalameta" %% "munit" % mUnitVersion, + "org.scalameta" %% "munit-scalacheck" % mUnitScalacheckVersion, + "org.typelevel" %% "discipline-munit" % disciplineMUnitVersion + ) lazy val catsEffectMUnitVersion = "2.0.0" lazy val catsEffectMUnit = Seq("org.typelevel" %% "munit-cats-effect" % catsEffectMUnitVersion) - lazy val scalacheckEffectVersion = "1.0.4" + lazy val scalacheckEffectVersion = "2.0.0-M2" lazy val scalacheckEffect = Seq( "org.typelevel" %% "scalacheck-effect-munit" % scalacheckEffectVersion ) diff --git a/project/build.properties b/project/build.properties index 04267b14..ee4c672c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.9 +sbt.version=1.10.1 diff --git a/project/plugins.sbt b/project/plugins.sbt index f7320077..41f1f2f5 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -addSbtPlugin("org.wartremover" % "sbt-wartremover" % "3.1.8") +addSbtPlugin("org.wartremover" % "sbt-wartremover" % "3.2.0") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.1.1") addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1") addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.6.1") diff --git a/scodec/src/test/scala/endless/scodec/ScodecCommandProtocolSuite.scala b/scodec/src/test/scala/endless/scodec/ScodecCommandProtocolSuite.scala index eb2c1976..c74a962a 100644 --- a/scodec/src/test/scala/endless/scodec/ScodecCommandProtocolSuite.scala +++ b/scodec/src/test/scala/endless/scodec/ScodecCommandProtocolSuite.scala @@ -15,10 +15,12 @@ class ScodecCommandProtocolSuite extends munit.ScalaCheckSuite { } val dummyProtocol = new ScodecCommandProtocol[String, DummyAlg] { - def server[F[_]]: Decoder[IncomingCommand[F, DummyAlg]] = - ScodecDecoder(DummyCommand.scodecDecoder).map { case DummyCommand(x, y) => + def server[F[_]]: Decoder[IncomingCommand[F, DummyAlg]] = { + implicit val decoder = DummyCommand.scodecDecoder + ScodecDecoder.apply.map { case DummyCommand(x, y) => handleCommand[F, Boolean](_.dummy(x, y)) } + } def clientFor[F[_]](id: ID)(implicit sender: CommandSender[F, ID]): DummyAlg[F] = (x: Int, y: String) => sendCommand[F, DummyCommand, Boolean](id, DummyCommand(x, y)) From 6d41979abc8e9d4158707e530e338e20a0ce1985 Mon Sep 17 00:00:00 2001 From: Jonas Chapuis Date: Wed, 4 Sep 2024 17:37:14 +0200 Subject: [PATCH 2/6] Ensure JVM doesn't exit after test runs involving Akka/Pekko --- .../endless/runtime/akka/protobuf/ScalaPbSerializerSuite.scala | 1 + example/src/main/resources/akka.conf | 2 ++ example/src/main/resources/pekko.conf | 2 ++ .../src/test/scala/endless/example/AkkaExampleAppSuite.scala | 3 ++- example/src/test/scala/endless/example/ExampleAppSuite.scala | 3 ++- .../src/test/scala/endless/example/PekkoExampleAppSuite.scala | 3 ++- .../runtime/pekko/protobuf/ScalaPbSerializerSuite.scala | 3 ++- 7 files changed, 13 insertions(+), 4 deletions(-) diff --git a/akka-runtime/src/test/scala/endless/runtime/akka/protobuf/ScalaPbSerializerSuite.scala b/akka-runtime/src/test/scala/endless/runtime/akka/protobuf/ScalaPbSerializerSuite.scala index 5aacf5f9..dd427fe0 100644 --- a/akka-runtime/src/test/scala/endless/runtime/akka/protobuf/ScalaPbSerializerSuite.scala +++ b/akka-runtime/src/test/scala/endless/runtime/akka/protobuf/ScalaPbSerializerSuite.scala @@ -56,6 +56,7 @@ object ScalaPbSerializerSuite { | "endless.runtime.akka.protobuf.ScalaPbSerializer" = 1111 | } | } + | coordinated-shutdown.exit-jvm = off |} """.stripMargin } diff --git a/example/src/main/resources/akka.conf b/example/src/main/resources/akka.conf index 51a03ec3..d6f6cd3d 100644 --- a/example/src/main/resources/akka.conf +++ b/example/src/main/resources/akka.conf @@ -39,4 +39,6 @@ akka { passivate-idle-entity-after = off } } + + coordinated-shutdown.exit-jvm = off } \ No newline at end of file diff --git a/example/src/main/resources/pekko.conf b/example/src/main/resources/pekko.conf index 5afd54b0..a2be5628 100644 --- a/example/src/main/resources/pekko.conf +++ b/example/src/main/resources/pekko.conf @@ -39,4 +39,6 @@ pekko { passivate-idle-entity-after = off } } + + coordinated-shutdown.exit-jvm = off } \ No newline at end of file diff --git a/example/src/test/scala/endless/example/AkkaExampleAppSuite.scala b/example/src/test/scala/endless/example/AkkaExampleAppSuite.scala index affd72fd..219a42c1 100644 --- a/example/src/test/scala/endless/example/AkkaExampleAppSuite.scala +++ b/example/src/test/scala/endless/example/AkkaExampleAppSuite.scala @@ -1,9 +1,10 @@ package endless.example import endless.example.app.akka.AkkaApp +import munit.AnyFixture class AkkaExampleAppSuite extends munit.CatsEffectSuite with ExampleAppSuite { lazy val port: Int = 8080 private val akkaServer = ResourceSuiteLocalFixture("akka-server", AkkaApp(port)) - override def munitFixtures: Seq[Fixture[?]] = List(akkaServer, client) + override def munitFixtures: Seq[AnyFixture[?]] = List(akkaServer, client) } diff --git a/example/src/test/scala/endless/example/ExampleAppSuite.scala b/example/src/test/scala/endless/example/ExampleAppSuite.scala index 9dcbeda8..b9e1a541 100644 --- a/example/src/test/scala/endless/example/ExampleAppSuite.scala +++ b/example/src/test/scala/endless/example/ExampleAppSuite.scala @@ -7,6 +7,7 @@ import endless.example.data.Booking.BookingID import endless.example.data.Vehicle.VehicleID import endless.example.data.{Booking, LatLon, Speed} import io.circe.generic.auto.* +import munit.catseffect.IOFixture import org.http4s.Method.* import org.http4s.Uri import org.http4s.blaze.client.BlazeClientBuilder @@ -19,7 +20,7 @@ import java.util.UUID import scala.concurrent.duration.* trait ExampleAppSuite { self: munit.CatsEffectSuite => - protected val client: Fixture[Client[IO]] = + protected val client: IOFixture[Client[IO]] = ResourceSuiteLocalFixture("booking-client", BlazeClientBuilder[IO].resource) def port: Int private lazy val baseUri = Uri.unsafeFromString(s"http://localhost:$port") diff --git a/example/src/test/scala/endless/example/PekkoExampleAppSuite.scala b/example/src/test/scala/endless/example/PekkoExampleAppSuite.scala index 04f07bce..1d637440 100644 --- a/example/src/test/scala/endless/example/PekkoExampleAppSuite.scala +++ b/example/src/test/scala/endless/example/PekkoExampleAppSuite.scala @@ -1,9 +1,10 @@ package endless.example import endless.example.app.pekko.PekkoApp +import munit.AnyFixture class PekkoExampleAppSuite extends munit.CatsEffectSuite with ExampleAppSuite { lazy val port: Int = 8081 private val pekkoServer = ResourceSuiteLocalFixture("pekko-server", PekkoApp(port)) - override def munitFixtures: Seq[Fixture[?]] = List(pekkoServer, client) + override def munitFixtures: Seq[AnyFixture[?]] = List(pekkoServer, client) } diff --git a/pekko-runtime/src/test/scala/endless/runtime/pekko/protobuf/ScalaPbSerializerSuite.scala b/pekko-runtime/src/test/scala/endless/runtime/pekko/protobuf/ScalaPbSerializerSuite.scala index fefa40f8..24cd3856 100644 --- a/pekko-runtime/src/test/scala/endless/runtime/pekko/protobuf/ScalaPbSerializerSuite.scala +++ b/pekko-runtime/src/test/scala/endless/runtime/pekko/protobuf/ScalaPbSerializerSuite.scala @@ -41,8 +41,8 @@ object ScalaPbSerializerSuite { val serializationConfig: String = """ |pekko { - | provider = local | actor { + | provider = local | serializers { | scalapb = "endless.runtime.pekko.protobuf.ScalaPbSerializer" | } @@ -55,6 +55,7 @@ object ScalaPbSerializerSuite { | "endless.runtime.pekko.protobuf.ScalaPbSerializer" = 1111 | } | } + | coordinated-shutdown.exit-jvm = off |} """.stripMargin } From b8cfdb41188ece7f9dfe96c102b7ffde1871c0ea Mon Sep 17 00:00:00 2001 From: Jonas Chapuis Date: Thu, 5 Sep 2024 10:48:58 +0200 Subject: [PATCH 3/6] Add support for differentiated run modes for side-effects A new `runModeFor` method is introduced in `SideEffect`, with the default set to `Async` to ensure backward compatibility. With `Async` mode, it triggers in *run & forget* mode so that command reply is not delayed by any lengthy side-effect. With `Sync` mode, the side-effect runs to completion before the next command is processed by the entity: this can be useful to simplify the side-effect logic as it precludes accounting for concurrency, but can hurt the system's responsiveness. --- .../DurableShardedRepositoryDeployer.scala | 39 +++++++++------- ...ventSourcedShardedRepositoryDeployer.scala | 36 ++++++++------- .../scala/endless/core/entity/Deployer.scala | 5 ++- .../endless/core/entity/DurableDeployer.scala | 5 ++- .../endless/core/entity/SideEffect.scala | 41 +++++++++++++++-- .../interpret/SideEffectInterpreter.scala | 2 +- documentation/src/main/paradox/side-effect.md | 6 ++- .../example/logic/BookingSideEffect.scala | 1 + .../DurableShardedEntityDeployer.scala | 45 ++++++++++--------- .../EventSourcedShardedEntityDeployer.scala | 27 +++++------ 10 files changed, 131 insertions(+), 76 deletions(-) diff --git a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala index 0b72e720..c2308ac3 100644 --- a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala +++ b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala @@ -13,6 +13,7 @@ import cats.syntax.flatMap.* import cats.syntax.functor.* import cats.syntax.show.* import endless.core.entity.* +import endless.core.entity.SideEffect.RunMode import endless.core.interpret.DurableEntityT.{DurableEntityT, State} import endless.core.interpret.* import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec} @@ -58,11 +59,11 @@ private[deploy] class DurableShardedRepositoryDeployer[F[ ) .receiveSignal { case (state, RecoveryCompleted) => - dispatcher.unsafeRunAndForget( - Logger[F].info( - show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) + dispatcher.unsafeRunSync( + Logger[F] + .info(show"Recovery of ${nameProvider()} entity ${context.entityId} completed") ) + handleSideEffect(state, SideEffect.Trigger.AfterRecovery) case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( Logger[F].warn( @@ -76,13 +77,22 @@ private[deploy] class DurableShardedRepositoryDeployer[F[ private def handleSideEffect( state: Option[S], trigger: SideEffect.Trigger - )(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = { - for { + )(implicit + sideEffect: SideEffect[F, S, Alg], + entity: Alg[F], + passivator: EntityPassivator[F], + dispatcher: Dispatcher[F] + ): Unit = { + val effect = for { effector <- Effector[F, S, Alg](entity, state) _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () + dispatcher.unsafeRunSync(sideEffect.runModeFor(trigger, state)) match { + case RunMode.Sync => dispatcher.unsafeRunSync(effect) + case RunMode.Async => dispatcher.unsafeRunAndForget(effect) + } } private def handleCommand(state: Option[S], command: Command)(implicit @@ -108,16 +118,13 @@ private[deploy] class DurableShardedRepositoryDeployer[F[ case State.Updated(state) => Effect.persist(Option(state)) }) .thenRun((state: Option[S]) => - // run the effector asynchronously, as it can describe long-running processes - dispatcher.unsafeRunAndForget( - handleSideEffect( - state, - outcome match { - case State.None => SideEffect.Trigger.AfterRead - case State.Existing(_) => SideEffect.Trigger.AfterRead - case State.Updated(_) => SideEffect.Trigger.AfterPersistence - } - ) + handleSideEffect( + state, + outcome match { + case State.None => SideEffect.Trigger.AfterRead + case State.Existing(_) => SideEffect.Trigger.AfterRead + case State.Updated(_) => SideEffect.Trigger.AfterPersistence + } ) ) .thenReply(command.replyTo) { (_: Option[S]) => diff --git a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala index 6ca8e99c..851d0e1c 100644 --- a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala +++ b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala @@ -11,6 +11,7 @@ import cats.syntax.applicative.* import cats.syntax.flatMap.* import cats.syntax.functor.* import cats.syntax.show.* +import endless.core.entity.SideEffect.RunMode import endless.core.entity.{Effector, EntityNameProvider, Sharding, SideEffect} import endless.core.event.EventApplier import endless.core.interpret.{EntityT, SideEffectInterpreter} @@ -58,12 +59,11 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ ) .receiveSignal { case (state, RecoveryCompleted) => - dispatcher.unsafeRunAndForget { + dispatcher.unsafeRunSync( Logger[F] - .info( - show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) - } + .info(show"Recovery of ${nameProvider()} entity ${context.entityId} completed") + ) + handleSideEffect(state, SideEffect.Trigger.AfterRecovery) case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( Logger[F].warn( @@ -77,13 +77,22 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ private def handleSideEffect( state: Option[S], trigger: SideEffect.Trigger - )(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = { - for { + )(implicit + sideEffect: SideEffect[F, S, Alg], + entity: Alg[F], + passivator: EntityPassivator[F], + dispatcher: Dispatcher[F] + ): Unit = { + val effect = for { effector <- Effector[F, S, Alg](entity, state) _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () + dispatcher.unsafeRunSync(sideEffect.runModeFor(trigger, state)) match { + case RunMode.Sync => dispatcher.unsafeRunSync(effect) + case RunMode.Async => dispatcher.unsafeRunAndForget(effect) + } } private def handleEvent(state: Option[S], event: E)(implicit dispatcher: Dispatcher[F]) = @@ -113,12 +122,8 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ case Right((events, reply)) if events.nonEmpty => Effect .persist(events.toList) - .thenRun( - (state: Option[ - S - ]) => // run the effector asynchronously, as it can describe long-running processes - dispatcher - .unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterPersistence)) + .thenRun((state: Option[S]) => + handleSideEffect(state, SideEffect.Trigger.AfterPersistence) ) .thenReply(command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) @@ -127,10 +132,7 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ case Right((_, reply)) => Effect .none[E, Option[S]] - .thenRun((state: Option[S]) => - dispatcher - .unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterRead)) - ) + .thenRun((state: Option[S]) => handleSideEffect(state, SideEffect.Trigger.AfterRead)) .thenReply[Reply](command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) } diff --git a/core/src/main/scala/endless/core/entity/Deployer.scala b/core/src/main/scala/endless/core/entity/Deployer.scala index e3aa8532..1ebcba97 100644 --- a/core/src/main/scala/endless/core/entity/Deployer.scala +++ b/core/src/main/scala/endless/core/entity/Deployer.scala @@ -43,8 +43,9 @@ trait Deployer { * the message is decoded and run with the provided `behavior` interpreter: this typically * involves reading the entity state (e.g. for validation), and writing events (which can * lead to a new version of the state via the `eventApplier` function) - * - after events are written, a possible side-effect is triggered: this can be asynchronous - * (i.e. the function doesn't wait for completion of the side-effect to return) + * - after events are written, a possible side-effect is triggered: this is by default + * asynchronous (i.e. the function doesn't wait for completion of the side-effect to return) + * but can be made synchronous by overriding the `runModeFor` method in the `SideEffect` * - the function finally returns to the caller with the result of the operation described by * the entity algebra (reply value, typically encoded over the wire in a distributed * deployment) diff --git a/core/src/main/scala/endless/core/entity/DurableDeployer.scala b/core/src/main/scala/endless/core/entity/DurableDeployer.scala index 8284a675..f262139d 100644 --- a/core/src/main/scala/endless/core/entity/DurableDeployer.scala +++ b/core/src/main/scala/endless/core/entity/DurableDeployer.scala @@ -47,7 +47,8 @@ trait DurableDeployer { * involves reading the entity state (e.g. for validation), and writing events (which can * lead to a new version of the state via the `eventApplier` function) * - after events were written, a possible side-effect is triggered: this can be asynchronous - * (i.e. the function doesn't wait for completion of the side-effect to return) + * (i.e. the function doesn't wait for completion of the side-effect to return) but can be + * made synchronous by overriding the `runModeFor` method in the `SideEffect` * - the function finally returns to the caller with the result of the operation described by * the entity algebra (reply value, typically encoded over the wire in a distributed * deployment) @@ -60,7 +61,7 @@ trait DurableDeployer { * algebras respectively (both higher-kinded type constructors). * * Since the behavior described above involves concurrent handling of repository interactions and - * asynchronous side-effecting, we expect `Async` from `F`. + * possible asynchronous side-effecting, we expect `Async` from `F`. * * `EntityIDCodec` is used to encode/decode entity IDs to/from strings. * diff --git a/core/src/main/scala/endless/core/entity/SideEffect.scala b/core/src/main/scala/endless/core/entity/SideEffect.scala index 50ad4ae9..23a1f636 100644 --- a/core/src/main/scala/endless/core/entity/SideEffect.scala +++ b/core/src/main/scala/endless/core/entity/SideEffect.scala @@ -1,10 +1,13 @@ package endless.core.entity +import cats.Applicative +import cats.kernel.Eq import cats.syntax.eq.* /** `SideEffect[F, S, Alg]` represents a side-effect applied in context `F`. The side-effect is - * triggered just after events persistence if any, or after some reads for a read-only command. Its - * is interpreted with `Async` in order to allow for asynchronous processes. The passed `Effector` - * can be used to access entity state and algebra and to control passivation. + * triggered just after events persistence if any, or after some reads for a read-only command. The + * mode of interpretation is determined by the `runModeFor` method, which defaults to `Async` but + * can be overridden. The passed `Effector` can be used to access entity state and algebra and to + * control passivation. * @tparam F * effect type * @tparam S @@ -14,9 +17,15 @@ import cats.syntax.eq.* */ trait SideEffect[F[_], S, Alg[_[_]]] { def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit] + + def runModeFor(trigger: SideEffect.Trigger, state: Option[S])(implicit + applicative: Applicative[F] + ): F[SideEffect.RunMode] = Applicative[F].pure(SideEffect.RunMode.Async) } object SideEffect { + def unit[F[_]: Applicative, S, Alg[_[_]]]: SideEffect[F, S, Alg] = + (_: Trigger, _: Effector[F, S, Alg]) => Applicative[F].unit /** Trigger for the invocation of a side-effect: this allows for differentiated behavior depending * on the context in which the side-effect is triggered. @@ -39,6 +48,30 @@ object SideEffect { /** Triggered just after recovery */ case object AfterRecovery extends Trigger - implicit val eqTrigger: cats.Eq[Trigger] = cats.Eq.fromUniversalEquals + implicit val eqTrigger: Eq[Trigger] = Eq.fromUniversalEquals + } + + /** Run mode for a side-effect: `Async` (default value) means that the side-effect is triggered in + * "fire & forget" mode, while `Sync` means it is run to completion before any other command is + * processed by the entity. + */ + sealed trait RunMode + object RunMode { + + /** Run to completion before any other command is processed by the entity. + * + * @note + * This mode should in most cases not be used for long-running side-effects, as it can hurt + * availability of the entity for command processing. + */ + case object Sync extends RunMode + + /** Run in "fire & forget" mode. + * + * @note + * This mode requires careful consideration of the side-effect's concurrency and idempotency, + * as there is no limit on the number of invocations running simultaneously at any one time. + */ + case object Async extends RunMode } } diff --git a/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala b/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala index f449cd00..afd80a97 100644 --- a/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala +++ b/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala @@ -56,5 +56,5 @@ object SideEffectInterpreter { */ def unit[F[_]: Applicative, S, Alg[_[_]], RepositoryAlg[_[_]]] : SideEffectInterpreter[F, S, Alg, RepositoryAlg] = - lift((_, _) => (_, _) => Applicative[F].unit) + lift((_, _) => SideEffect.unit[F, S, Alg]) } diff --git a/documentation/src/main/paradox/side-effect.md b/documentation/src/main/paradox/side-effect.md index 12e5c06b..4f42e4b7 100644 --- a/documentation/src/main/paradox/side-effect.md +++ b/documentation/src/main/paradox/side-effect.md @@ -3,6 +3,8 @@ ```scala trait SideEffect[F[_], S, Alg[_[_]]] { def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit] + + def runModeFor(trigger: SideEffect.Trigger, state: Option[S]): F[SideEffect.RunMode] } ``` @@ -14,7 +16,9 @@ trait SideEffect[F[_], S, Alg[_[_]]] { It represents a side-effect that is triggered according to `trigger` either after event persistence, command handling (for a read-only behavior invocation) or recovery. Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and [entity passivation](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#passivation) (flushing out of memory). -In the runtime, the resulting `F[Unit]` is interpreted with `Async` in *run & forget* mode so that command reply is not delayed by any lengthy side-effect. The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation (for an example, see @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingSideEffect.scala) +In the runtime, the resulting `F[Unit]` is interpreted according to the `RunMode` setting indicated by a preliminary call to `runModeFor`. With `Async` mode, it triggers in *run & forget* mode so that command reply is not delayed by any lengthy side-effect. With `Async` mode, it triggers in *run & forget* mode so that any lengthy side-effect does not delay command reply. With `Sync` mode, the side-effect runs to completion before the entity processes the next command: this can simplify the side-effect logic as it precludes accounting for concurrency, but can hurt the system's responsiveness. The default implementation of `runModeFor` sets it to `Async`. + +The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation (for an example, see @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingSideEffect.scala) @@@ note Defining a side-effect is entirely optional, pass-in `SideEffectInterpreter.unit` in @scaladoc[deployRepository](endless.runtime.akka.deploy.Deployer) if there are no side-effects to describe. diff --git a/example/src/main/scala/endless/example/logic/BookingSideEffect.scala b/example/src/main/scala/endless/example/logic/BookingSideEffect.scala index 58a96c73..13ddd05c 100644 --- a/example/src/main/scala/endless/example/logic/BookingSideEffect.scala +++ b/example/src/main/scala/endless/example/logic/BookingSideEffect.scala @@ -18,6 +18,7 @@ import scala.concurrent.duration.* class BookingSideEffect[F[_]: Logger: Monad]()(implicit availabilityAlg: AvailabilityAlg[F] ) extends SideEffect[F, Booking, BookingAlg] { + def apply(trigger: Trigger, effector: Effector[F, Booking, BookingAlg]): F[Unit] = { import effector.* diff --git a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala index bbe94036..bb3068a8 100644 --- a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala +++ b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala @@ -13,6 +13,7 @@ import cats.syntax.flatMap.* import cats.syntax.functor.* import cats.syntax.show.* import endless.core.entity.* +import endless.core.entity.SideEffect.RunMode import endless.core.interpret.DurableEntityT.{DurableEntityT, State} import endless.core.interpret.* import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec} @@ -56,11 +57,11 @@ private[deploy] class DurableShardedEntityDeployer[F[_]: Async: Logger, S, ID: E ) .receiveSignal { case (state, RecoveryCompleted) => - dispatcher.unsafeRunAndForget( - Logger[F].info( - show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) + dispatcher.unsafeRunSync( + Logger[F] + .info(show"Recovery of ${nameProvider()} entity ${context.entityId} completed") ) + handleSideEffect(state, SideEffect.Trigger.AfterRecovery) case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( Logger[F].warn( @@ -71,17 +72,25 @@ private[deploy] class DurableShardedEntityDeployer[F[_]: Async: Logger, S, ID: E ) } - private def handleSideEffect(state: Option[S], trigger: SideEffect.Trigger)(implicit + private def handleSideEffect( + state: Option[S], + trigger: SideEffect.Trigger + )(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], - passivator: EntityPassivator[F] - ) = { - for { + passivator: EntityPassivator[F], + dispatcher: Dispatcher[F] + ): Unit = { + val effect = for { effector <- Effector[F, S, Alg](entity, state) _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () + dispatcher.unsafeRunSync(sideEffect.runModeFor(trigger, state)) match { + case RunMode.Sync => dispatcher.unsafeRunSync(effect) + case RunMode.Async => dispatcher.unsafeRunAndForget(effect) + } } private def handleCommand(state: Option[S], command: Command)(implicit @@ -107,18 +116,14 @@ private[deploy] class DurableShardedEntityDeployer[F[_]: Async: Logger, S, ID: E case State.Updated(state) => Effect.persist(Option(state)) }) .thenRun((state: Option[S]) => - // run the effector asynchronously, as it can describe long-running processes - dispatcher - .unsafeRunAndForget( - handleSideEffect( - state, - outcome match { - case State.None => SideEffect.Trigger.AfterRead - case State.Existing(_) => SideEffect.Trigger.AfterRead - case State.Updated(_) => SideEffect.Trigger.AfterPersistence - } - ) - ) + handleSideEffect( + state, + outcome match { + case State.None => SideEffect.Trigger.AfterRead + case State.Existing(_) => SideEffect.Trigger.AfterRead + case State.Updated(_) => SideEffect.Trigger.AfterPersistence + } + ) ) .thenReply(command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) diff --git a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala index 14c7b369..f29acbe2 100644 --- a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala +++ b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala @@ -11,6 +11,7 @@ import cats.syntax.applicative.* import cats.syntax.flatMap.* import cats.syntax.functor.* import cats.syntax.show.* +import endless.core.entity.SideEffect.RunMode import endless.core.entity.{Effector, EntityNameProvider, Sharding, SideEffect} import endless.core.event.EventApplier import endless.core.interpret.{EntityT, SideEffectInterpreter} @@ -58,11 +59,12 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ ) .receiveSignal { case (state, RecoveryCompleted) => - dispatcher.unsafeRunAndForget( + dispatcher.unsafeRunSync( Logger[F].info( show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) + ) ) + handleSideEffect(state, SideEffect.Trigger.AfterRecovery) case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( Logger[F].warn( @@ -76,14 +78,19 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ private def handleSideEffect(state: Option[S], trigger: SideEffect.Trigger)(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], - passivator: EntityPassivator[F] - ) = { - for { + passivator: EntityPassivator[F], + dispatcher: Dispatcher[F] + ): Unit = { + val effect = for { effector <- Effector[F, S, Alg](entity, state) _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () + dispatcher.unsafeRunSync(sideEffect.runModeFor(trigger, state)) match { + case RunMode.Sync => dispatcher.unsafeRunSync(effect) + case RunMode.Async => dispatcher.unsafeRunAndForget(effect) + } } private def handleEvent(state: Option[S], event: E)(implicit dispatcher: Dispatcher[F]) = @@ -114,10 +121,7 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ Effect .persist(events.toList) .thenRun((state: Option[S]) => - // run the effector asynchronously, as it can describe long-running processes - dispatcher.unsafeRunAndForget( - handleSideEffect(state, SideEffect.Trigger.AfterPersistence) - ) + handleSideEffect(state, SideEffect.Trigger.AfterPersistence) ) .thenReply(command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) @@ -126,10 +130,7 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ case Right((_, reply)) => Effect .none[E, Option[S]] - .thenRun((state: Option[S]) => - dispatcher - .unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterRead)) - ) + .thenRun((state: Option[S]) => handleSideEffect(state, SideEffect.Trigger.AfterRead)) .thenReply[Reply](command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) } From eb66be21bacd9c617687d880a69859a273fad661 Mon Sep 17 00:00:00 2001 From: Jonas Chapuis Date: Thu, 5 Sep 2024 11:02:35 +0200 Subject: [PATCH 4/6] Couple of adaptations to latest scala versions --- .../test/scala/endless/core/interpret/DurableEntityTSuite.scala | 2 +- core/src/test/scala/endless/core/interpret/EntityTSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/endless/core/interpret/DurableEntityTSuite.scala b/core/src/test/scala/endless/core/interpret/DurableEntityTSuite.scala index 6d40e4ac..12b0eb1c 100644 --- a/core/src/test/scala/endless/core/interpret/DurableEntityTSuite.scala +++ b/core/src/test/scala/endless/core/interpret/DurableEntityTSuite.scala @@ -46,7 +46,7 @@ class DurableEntityTSuite extends DisciplineSuite { F: Arbitrary[F[A]] ): Arbitrary[DurableEntityT[F, SampleState, A]] = Arbitrary( F.arbitrary.map( - DurableEntityT.stateWriter(sampleState)(Applicative[F]) >> DurableEntityT.liftF(_) + DurableEntityT.stateWriter(sampleState)(using Applicative[F]) >> DurableEntityT.liftF(_) ) ) diff --git a/core/src/test/scala/endless/core/interpret/EntityTSuite.scala b/core/src/test/scala/endless/core/interpret/EntityTSuite.scala index 5568db64..367ffcc2 100644 --- a/core/src/test/scala/endless/core/interpret/EntityTSuite.scala +++ b/core/src/test/scala/endless/core/interpret/EntityTSuite.scala @@ -54,7 +54,7 @@ class EntityTSuite extends DisciplineSuite { checkAll( "EntityT.FunctorLaws for direct map def", FunctorTests[EntityT[ListWrapper, State, Event, *]]( - new Functor[EntityT[ListWrapper, State, Event, *]] { + using new Functor[EntityT[ListWrapper, State, Event, *]] { override def map[A, B](fa: EntityT[ListWrapper, State, Event, A])( f: A => B ): EntityT[ListWrapper, State, Event, B] = fa.map(f) From 3e6b602264f98de8f461b8fde4e8dfa8574c2c27 Mon Sep 17 00:00:00 2001 From: Jonas Chapuis Date: Thu, 5 Sep 2024 22:38:48 +0200 Subject: [PATCH 5/6] Update scala 3 to 3.4.3 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 08cb3cae..9e4dfcad 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import Dependencies.* import sbtversionpolicy.Compatibility.None val scala213 = "2.13.14" -val scala3 = "3.4.2" +val scala3 = "3.4.3" val commonSettings = Seq( wartremoverExcluded += sourceManaged.value, From 5dc505d0c5e10467ff5ff73ad45d459d46a58914 Mon Sep 17 00:00:00 2001 From: Jonas Chapuis Date: Mon, 9 Sep 2024 08:30:49 +0200 Subject: [PATCH 6/6] Remove unnecessary dependency to log4cats-slf4j --- build.sbt | 4 ++-- project/Dependencies.scala | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index 9e4dfcad..1730bd75 100644 --- a/build.sbt +++ b/build.sbt @@ -88,7 +88,7 @@ lazy val pekkoRuntime = (project in file("pekko-runtime")) .settings(commonSettings *) .dependsOn(core) .settings( - libraryDependencies ++= catsEffectStd ++ pekkoProvided ++ log4cats ++ scalapbCustomizations ++ (mUnit ++ logback ++ log4catsSlf4j :+ pekkoTypedTestkit % pekkoVersion) + libraryDependencies ++= catsEffectStd ++ pekkoProvided ++ log4cats ++ scalapbCustomizations ++ (mUnit ++ logback :+ pekkoTypedTestkit % pekkoVersion) .map(_ % Test) ) .settings( @@ -136,7 +136,7 @@ lazy val example = (project in file("example")) .settings(commonSettings *) .dependsOn(core, akkaRuntime, pekkoRuntime, circeHelpers, protobufHelpers) .settings( - libraryDependencies ++= catsEffect ++ http4s ++ blaze ++ akka ++ pekko ++ scalapbCustomizations ++ akkaTest ++ pekkoTest ++ logback ++ log4catsSlf4j ++ (mUnit ++ catsEffectMUnit ++ scalacheckEffect ++ log4catsTesting) + libraryDependencies ++= catsEffect ++ http4s ++ blaze ++ akka ++ pekko ++ scalapbCustomizations ++ akkaTest ++ pekkoTest ++ logback ++ (mUnit ++ catsEffectMUnit ++ scalacheckEffect ++ log4catsTesting) .map(_ % Test) ) .settings(name := "endless-example", run / fork := true, publish / skip := true) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e09a967c..31d16b45 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -79,9 +79,6 @@ object Dependencies { lazy val log4catsVersion = "2.7.0" lazy val log4cats = Seq("org.typelevel" %% "log4cats-core" % log4catsVersion) - lazy val log4catsSlf4j = Seq( - "org.typelevel" %% "log4cats-slf4j" % log4catsVersion - ) lazy val log4catsTesting = Seq("org.typelevel" %% "log4cats-testing" % log4catsVersion) lazy val mUnitVersion = "1.0.1"