diff --git a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala index c8fc9867..0b72e720 100644 --- a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala +++ b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala @@ -61,7 +61,7 @@ private[deploy] class DurableShardedRepositoryDeployer[F[ dispatcher.unsafeRunAndForget( Logger[F].info( show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state) + ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) ) case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( @@ -74,11 +74,12 @@ private[deploy] class DurableShardedRepositoryDeployer[F[ } private def handleSideEffect( - state: Option[S] + state: Option[S], + trigger: SideEffect.Trigger )(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = { for { effector <- Effector[F, S, Alg](entity, state) - _ <- sideEffect.apply(effector) + _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () @@ -100,15 +101,24 @@ private[deploy] class DurableShardedRepositoryDeployer[F[ case Some(value) => DurableEntityT.State.Existing(value) case None => DurableEntityT.State.None }) - .flatMap { case (state, reply) => - (state match { + .flatMap { case (outcome, reply) => + (outcome match { case State.None => Effect.none case State.Existing(_) => Effect.none case State.Updated(state) => Effect.persist(Option(state)) }) .thenRun((state: Option[S]) => // run the effector asynchronously, as it can describe long-running processes - dispatcher.unsafeRunAndForget(handleSideEffect(state)) + dispatcher.unsafeRunAndForget( + handleSideEffect( + state, + outcome match { + case State.None => SideEffect.Trigger.AfterRead + case State.Existing(_) => SideEffect.Trigger.AfterRead + case State.Updated(_) => SideEffect.Trigger.AfterPersistence + } + ) + ) ) .thenReply(command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) diff --git a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala index 8f20a9cd..6ca8e99c 100644 --- a/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala +++ b/akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala @@ -62,7 +62,7 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ Logger[F] .info( show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state) + ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) } case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( @@ -75,11 +75,12 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ } private def handleSideEffect( - state: Option[S] + state: Option[S], + trigger: SideEffect.Trigger )(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = { for { effector <- Effector[F, S, Alg](entity, state) - _ <- sideEffect.apply(effector) + _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () @@ -116,7 +117,8 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ (state: Option[ S ]) => // run the effector asynchronously, as it can describe long-running processes - dispatcher.unsafeRunAndForget(handleSideEffect(state)) + dispatcher + .unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterPersistence)) ) .thenReply(command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) @@ -124,9 +126,14 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[ .pure[F] case Right((_, reply)) => Effect - .reply[Reply, E, Option[S]](command.replyTo)( - Reply(incomingCommand.replyEncoder.encode(reply)) + .none[E, Option[S]] + .thenRun((state: Option[S]) => + dispatcher + .unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterRead)) ) + .thenReply[Reply](command.replyTo) { (_: Option[S]) => + Reply(incomingCommand.replyEncoder.encode(reply)) + } .pure[F] } dispatcher.unsafeRunSync(effect) diff --git a/core/src/main/scala/endless/core/entity/SideEffect.scala b/core/src/main/scala/endless/core/entity/SideEffect.scala index 117801a6..50ad4ae9 100644 --- a/core/src/main/scala/endless/core/entity/SideEffect.scala +++ b/core/src/main/scala/endless/core/entity/SideEffect.scala @@ -1,9 +1,10 @@ package endless.core.entity +import cats.syntax.eq.* /** `SideEffect[F, S, Alg]` represents a side-effect applied in context `F`. The side-effect is - * triggered just after events persistence, and is interpreted with `Async` in order to allow for - * asynchronous processes. The passed `Effector` can be used to access entity state and algebra and - * to control passivation. + * triggered just after events persistence if any, or after some reads for a read-only command. Its + * is interpreted with `Async` in order to allow for asynchronous processes. The passed `Effector` + * can be used to access entity state and algebra and to control passivation. * @tparam F * effect type * @tparam S @@ -12,5 +13,32 @@ package endless.core.entity * entity algebra */ trait SideEffect[F[_], S, Alg[_[_]]] { - def apply(effector: Effector[F, S, Alg]): F[Unit] + def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit] +} + +object SideEffect { + + /** Trigger for the invocation of a side-effect: this allows for differentiated behavior depending + * on the context in which the side-effect is triggered. + */ + sealed trait Trigger { + def isAfterPersistence: Boolean = this === Trigger.AfterPersistence + def isAfterRead: Boolean = this === Trigger.AfterRead + def isAfterRecovery: Boolean = this === Trigger.AfterRecovery + } + object Trigger { + + /** Triggered just after events or state persistence */ + case object AfterPersistence extends Trigger + + /** Triggered just after processing a read-only command (no events were written, the state + * hasn't changed) + */ + case object AfterRead extends Trigger + + /** Triggered just after recovery */ + case object AfterRecovery extends Trigger + + implicit val eqTrigger: cats.Eq[Trigger] = cats.Eq.fromUniversalEquals + } } diff --git a/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala b/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala index da8df763..f449cd00 100644 --- a/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala +++ b/core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala @@ -55,5 +55,6 @@ object SideEffectInterpreter { * unit side-effect interpreter in context `F` */ def unit[F[_]: Applicative, S, Alg[_[_]], RepositoryAlg[_[_]]] - : SideEffectInterpreter[F, S, Alg, RepositoryAlg] = lift((_, _) => _ => Applicative[F].unit) + : SideEffectInterpreter[F, S, Alg, RepositoryAlg] = + lift((_, _) => (_, _) => Applicative[F].unit) } diff --git a/documentation/src/main/paradox/effector.md b/documentation/src/main/paradox/effector.md index bb354bb6..02bf5b5f 100644 --- a/documentation/src/main/paradox/effector.md +++ b/documentation/src/main/paradox/effector.md @@ -14,17 +14,7 @@ trait Self[F[_], Alg[_[_]]] { trait Effector[F[_], S] extends StateReader[F, S] with Passivator[F] with Self[F] ``` -@scaladoc[Effector](endless.core.entity.Effector) is a typeclass used to describe side effects occurring **after** event persistence and entity recovery. - -Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and [entity passivation](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#passivation) (flushing out of memory). `Effector` is used in a `Effector => F[Unit]` function provided upon entity deployment (e.g. @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingEffector.scala)). In the provided runtimes, the resulting `F[Unit]` is executed in *run & forget* mode so that command reply is not delayed by any lengthy side-effect (`Self` can be used to notify success or failure of asynchronous operations back to the entity). - -@@@ warning -In the provided Akka/Pekko runtimes, read-only commands (commands that do not generate events) do not trigger side-effects, which is sound practice. -@@@ - -@@@ note -Defining an effector is entirely optional, pass-in `EffectorInterpreter.unit[F, S, Alg, RepositoryAlg]` in @scaladoc[deployRepository](endless.runtime.akka.deploy.Deployer) if there are no side-effects to describe. -@@@ +@scaladoc[Effector](endless.core.entity.Effector) is a typeclass used to describe side effects in the context of a @ref:[SideEffect](side-effect.md) definition. ## State-derived side-effects @scaladoc[StateReader](endless.core.entity.StateReader) allows reading the updated entity state after event persistence or recovery. diff --git a/documentation/src/main/paradox/side-effect.md b/documentation/src/main/paradox/side-effect.md index a9fb504c..12e5c06b 100644 --- a/documentation/src/main/paradox/side-effect.md +++ b/documentation/src/main/paradox/side-effect.md @@ -2,7 +2,7 @@ ```scala trait SideEffect[F[_], S, Alg[_[_]]] { - def apply(effector: Effector[F, S, Alg]): F[Unit] + def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit] } ``` @@ -12,4 +12,10 @@ trait SideEffect[F[_], S, Alg[_[_]]] { - `Alg[_[_]]`: entity algebra, allowing "back-interaction" with the entity itself (e.g. for at least once process definition, see note in @ref:[Effector](effector.md)) - `S`: entity state, e.g. @github[Booking](/example/src/main/scala/endless/example/data/Booking.scala) -It represents a side-effect that is triggered just after events persistence, and interpreted with `Async` to allow for asynchronicity. The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation. \ No newline at end of file +It represents a side-effect that is triggered according to `trigger` either after event persistence, command handling (for a read-only behavior invocation) or recovery. Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and [entity passivation](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#passivation) (flushing out of memory). + +In the runtime, the resulting `F[Unit]` is interpreted with `Async` in *run & forget* mode so that command reply is not delayed by any lengthy side-effect. The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation (for an example, see @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingSideEffect.scala) + +@@@ note +Defining a side-effect is entirely optional, pass-in `SideEffectInterpreter.unit` in @scaladoc[deployRepository](endless.runtime.akka.deploy.Deployer) if there are no side-effects to describe. +@@@ diff --git a/example/src/main/scala/endless/example/app/akka/AkkaApp.scala b/example/src/main/scala/endless/example/app/akka/AkkaApp.scala index 4aa91a0f..62c30ea4 100644 --- a/example/src/main/scala/endless/example/app/akka/AkkaApp.scala +++ b/example/src/main/scala/endless/example/app/akka/AkkaApp.scala @@ -117,7 +117,7 @@ object AkkaApp extends Bookings with Vehicles with Availabilities { deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg]( RepositoryInterpreter.lift(ShardedVehicles(_)), DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)), - (_, _) => VehicleSideEffect() + SideEffectInterpreter.lift((_, _) => new VehicleSideEffect()) ) ) .flatMap { case (bookingDeployment, vehicleDeployment) => diff --git a/example/src/main/scala/endless/example/app/pekko/PekkoApp.scala b/example/src/main/scala/endless/example/app/pekko/PekkoApp.scala index dff270a8..ca667c9e 100644 --- a/example/src/main/scala/endless/example/app/pekko/PekkoApp.scala +++ b/example/src/main/scala/endless/example/app/pekko/PekkoApp.scala @@ -120,7 +120,7 @@ object PekkoApp extends Bookings with Vehicles with Availabilities { deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg]( RepositoryInterpreter.lift(ShardedVehicles(_)), DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)), - (_, _) => VehicleSideEffect() + SideEffectInterpreter.lift { case (_, _) => new VehicleSideEffect() } ) ) .flatMap { case (bookingDeployment, vehicleDeployment) => diff --git a/example/src/main/scala/endless/example/logic/BookingSideEffect.scala b/example/src/main/scala/endless/example/logic/BookingSideEffect.scala index fbd55138..58a96c73 100644 --- a/example/src/main/scala/endless/example/logic/BookingSideEffect.scala +++ b/example/src/main/scala/endless/example/logic/BookingSideEffect.scala @@ -5,6 +5,7 @@ import cats.syntax.flatMap.* import cats.syntax.functor.* import cats.syntax.show.* import cats.{Applicative, Monad} +import endless.core.entity.SideEffect.Trigger import endless.core.entity.{Effector, SideEffect} import endless.example.algebra.{AvailabilityAlg, BookingAlg} import endless.example.data.Booking @@ -17,12 +18,12 @@ import scala.concurrent.duration.* class BookingSideEffect[F[_]: Logger: Monad]()(implicit availabilityAlg: AvailabilityAlg[F] ) extends SideEffect[F, Booking, BookingAlg] { - def apply(effector: Effector[F, Booking, BookingAlg]): F[Unit] = { + def apply(trigger: Trigger, effector: Effector[F, Booking, BookingAlg]): F[Unit] = { import effector.* val availabilityProcess: Booking => F[Unit] = booking => - booking.status match { - case Status.Pending => + (booking.status, trigger) match { + case (Status.Pending, Trigger.AfterRecovery | Trigger.AfterPersistence) => (availabilityAlg.isCapacityAvailable( booking.time, booking.passengerCount diff --git a/example/src/main/scala/endless/example/logic/VehicleSideEffect.scala b/example/src/main/scala/endless/example/logic/VehicleSideEffect.scala index f7cf2713..e22fdb88 100644 --- a/example/src/main/scala/endless/example/logic/VehicleSideEffect.scala +++ b/example/src/main/scala/endless/example/logic/VehicleSideEffect.scala @@ -1,28 +1,21 @@ package endless.example.logic -import cats.Applicative -import cats.effect.{Concurrent, Ref} +import cats.{Applicative, Monad} import cats.syntax.flatMap.* import cats.syntax.functor.* +import endless.core.entity.SideEffect.Trigger import endless.core.entity.{Effector, SideEffect} import endless.example.algebra.VehicleAlg import endless.example.data.Vehicle import scala.concurrent.duration.* -class VehicleSideEffect[F[_]: Concurrent](justRecoveredRef: Ref[F, Boolean]) - extends SideEffect[F, Vehicle, VehicleAlg] { - def apply(effector: Effector[F, Vehicle, VehicleAlg]): F[Unit] = { +class VehicleSideEffect[F[_]: Monad] extends SideEffect[F, Vehicle, VehicleAlg] { + def apply(trigger: Trigger, effector: Effector[F, Vehicle, VehicleAlg]): F[Unit] = { lazy val aggressivePassivation = effector.enablePassivation(1.second) for { - justRecovered <- justRecoveredRef.getAndUpdate(_ => false) - _ <- Applicative[F].whenA(justRecovered)(effector.self.incrementRecoveryCount) + _ <- Applicative[F].whenA(trigger.isAfterRecovery)(effector.self.incrementRecoveryCount) _ <- aggressivePassivation } yield () } } - -object VehicleSideEffect { - def apply[F[_]: Concurrent](): F[SideEffect[F, Vehicle, VehicleAlg]] = - Ref[F].of(true).map(new VehicleSideEffect[F](_)) -} diff --git a/example/src/test/scala/endless/example/logic/BookingSideEffectSuite.scala b/example/src/test/scala/endless/example/logic/BookingSideEffectSuite.scala index 61f958e9..57bb524a 100644 --- a/example/src/test/scala/endless/example/logic/BookingSideEffectSuite.scala +++ b/example/src/test/scala/endless/example/logic/BookingSideEffectSuite.scala @@ -6,6 +6,7 @@ import cats.syntax.show.* import endless.\/ import endless.core.entity.Effector import endless.core.entity.Effector.PassivationState +import endless.core.entity.SideEffect.Trigger import endless.example.algebra.{AvailabilityAlg, BookingAlg} import endless.example.data.{Booking, LatLon} import org.scalacheck.effect.PropF.forAllF @@ -22,7 +23,7 @@ class BookingSideEffectSuite implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]() implicit private def availabilityAlg: AvailabilityAlg[IO] = (_: Instant, _: Int) => IO(true) - test("some state logs") { + test("some state generates logs after persistence") { forAllF { (booking: Booking) => val acceptedBooking = booking.copy(status = Booking.Status.Accepted) for { @@ -30,33 +31,33 @@ class BookingSideEffectSuite new SelfEntity {}, Some(acceptedBooking) ) - _ <- new BookingSideEffect().apply(effector) + _ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector) _ <- assertIO(logger.logged.map(_.map(_.message).last), show"State is now $acceptedBooking") } yield () } } test("some state passivates after one hour") { - forAllF { (booking: Booking) => + forAllF { (booking: Booking, trigger: Trigger) => for { effector <- Effector.apply[IO, Booking, BookingAlg]( new SelfEntity {}, Some(booking.copy(status = Booking.Status.Accepted)) ) - _ <- new BookingSideEffect().apply(effector) + _ <- new BookingSideEffect().apply(trigger, effector) _ <- assertIO(effector.passivationState, Effector.PassivationState.After(1.hour)) } yield () } } test("passivates immediately when cancelled") { - forAllF { (booking: Booking) => + forAllF { (booking: Booking, trigger: Trigger) => for { effector <- Effector.apply[IO, Booking, BookingAlg]( new SelfEntity {}, Some(booking.copy(status = Booking.Status.Cancelled)) ) - _ <- new BookingSideEffect().apply(effector) + _ <- new BookingSideEffect().apply(trigger, effector) _ <- assertIO(effector.passivationState, PassivationState.After(Duration.Zero)) } yield () } @@ -76,7 +77,7 @@ class BookingSideEffectSuite }, Some(booking.copy(status = Booking.Status.Pending)) ) - _ <- new BookingSideEffect().apply(effector) + _ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector) } yield () } } diff --git a/example/src/test/scala/endless/example/logic/Generators.scala b/example/src/test/scala/endless/example/logic/Generators.scala index 0fa0976b..089f3bad 100644 --- a/example/src/test/scala/endless/example/logic/Generators.scala +++ b/example/src/test/scala/endless/example/logic/Generators.scala @@ -1,5 +1,6 @@ package endless.example.logic +import endless.core.entity.SideEffect.Trigger import endless.example.algebra.BookingAlg.{ BookingAlreadyExists, BookingUnknown, @@ -13,6 +14,8 @@ import org.scalacheck.{Arbitrary, Gen} import java.time.Instant trait Generators { + implicit val triggerGen: Gen[Trigger] = + Gen.oneOf(Trigger.AfterPersistence, Trigger.AfterRecovery, Trigger.AfterRead) implicit val latLonGen: Gen[LatLon] = for { latitude <- Gen.double longitude <- Gen.double @@ -42,4 +45,5 @@ trait Generators { ) implicit val arbBookingUnknown: Arbitrary[BookingUnknown.type] = Arbitrary(bookingUnknownGen) implicit val arbCancelError: Arbitrary[CancelError] = Arbitrary(cancelErrorGen) + implicit val arbTrigger: Arbitrary[Trigger] = Arbitrary(triggerGen) } diff --git a/example/src/test/scala/endless/example/logic/VehicleSideEffectSuite.scala b/example/src/test/scala/endless/example/logic/VehicleSideEffectSuite.scala index d0bc5aa1..6dd191e2 100644 --- a/example/src/test/scala/endless/example/logic/VehicleSideEffectSuite.scala +++ b/example/src/test/scala/endless/example/logic/VehicleSideEffectSuite.scala @@ -2,8 +2,10 @@ package endless.example.logic import cats.effect.IO import endless.core.entity.Effector +import endless.core.entity.SideEffect.Trigger import endless.example.algebra.VehicleAlg import endless.example.data.{LatLon, Speed, Vehicle} + import scala.concurrent.duration.* class VehicleSideEffectSuite extends munit.CatsEffectSuite { @@ -17,21 +19,25 @@ class VehicleSideEffectSuite extends munit.CatsEffectSuite { }, None ) - sideEffect <- VehicleSideEffect[IO]() - _ <- sideEffect.apply(effector) + sideEffect = new VehicleSideEffect[IO] + _ <- sideEffect.apply(Trigger.AfterRecovery, effector) _ <- assertIO(recoveryCountRef.get, 1) } yield () } test("does not increment recovery count otherwise") { for { + recoveryCountRef <- IO.ref(0) effector <- Effector.apply[IO, Vehicle, VehicleAlg]( - new SelfEntity {}, + new SelfEntity { + override def incrementRecoveryCount: IO[Unit] = recoveryCountRef.update(_ + 1) + }, None ) - justRecoveredRef <- IO.ref(false) - sideEffect <- IO.pure(new VehicleSideEffect[IO](justRecoveredRef)) - _ <- sideEffect.apply(effector) + sideEffect = new VehicleSideEffect[IO] + _ <- sideEffect.apply(Trigger.AfterRead, effector) + _ <- sideEffect.apply(Trigger.AfterPersistence, effector) + _ <- assertIO(recoveryCountRef.get, 0) } yield () } @@ -43,8 +49,8 @@ class VehicleSideEffectSuite extends munit.CatsEffectSuite { }, None ) - sideEffect <- VehicleSideEffect[IO]() - _ <- sideEffect.apply(effector) + sideEffect = new VehicleSideEffect[IO] + _ <- sideEffect.apply(Trigger.AfterRead, effector) _ <- assertIO(effector.passivationState, Effector.PassivationState.After(1.second)) } yield () } diff --git a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala index fa137e54..bbe94036 100644 --- a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala +++ b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala @@ -59,7 +59,7 @@ private[deploy] class DurableShardedEntityDeployer[F[_]: Async: Logger, S, ID: E dispatcher.unsafeRunAndForget( Logger[F].info( show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state) + ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) ) case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( @@ -71,12 +71,14 @@ private[deploy] class DurableShardedEntityDeployer[F[_]: Async: Logger, S, ID: E ) } - private def handleSideEffect( - state: Option[S] - )(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = { + private def handleSideEffect(state: Option[S], trigger: SideEffect.Trigger)(implicit + sideEffect: SideEffect[F, S, Alg], + entity: Alg[F], + passivator: EntityPassivator[F] + ) = { for { effector <- Effector[F, S, Alg](entity, state) - _ <- sideEffect.apply(effector) + _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () @@ -98,15 +100,25 @@ private[deploy] class DurableShardedEntityDeployer[F[_]: Async: Logger, S, ID: E case Some(value) => DurableEntityT.State.Existing(value) case None => DurableEntityT.State.None }) - .flatMap { case (state, reply) => - (state match { + .flatMap { case (outcome, reply) => + (outcome match { case State.None => Effect.none case State.Existing(_) => Effect.none case State.Updated(state) => Effect.persist(Option(state)) }) .thenRun((state: Option[S]) => // run the effector asynchronously, as it can describe long-running processes - dispatcher.unsafeRunAndForget(handleSideEffect(state)) + dispatcher + .unsafeRunAndForget( + handleSideEffect( + state, + outcome match { + case State.None => SideEffect.Trigger.AfterRead + case State.Existing(_) => SideEffect.Trigger.AfterRead + case State.Updated(_) => SideEffect.Trigger.AfterPersistence + } + ) + ) ) .thenReply(command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) diff --git a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala index a38c3936..14c7b369 100644 --- a/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala +++ b/pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala @@ -61,7 +61,7 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ dispatcher.unsafeRunAndForget( Logger[F].info( show"Recovery of ${nameProvider()} entity ${context.entityId} completed" - ) >> handleSideEffect(state) + ) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery) ) case (_, RecoveryFailed(failure)) => dispatcher.unsafeRunSync( @@ -73,12 +73,14 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ ) } - private def handleSideEffect( - state: Option[S] - )(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = { + private def handleSideEffect(state: Option[S], trigger: SideEffect.Trigger)(implicit + sideEffect: SideEffect[F, S, Alg], + entity: Alg[F], + passivator: EntityPassivator[F] + ) = { for { effector <- Effector[F, S, Alg](entity, state) - _ <- sideEffect.apply(effector) + _ <- sideEffect.apply(trigger, effector) passivationState <- effector.passivationState _ <- passivator.apply(passivationState) } yield () @@ -113,7 +115,9 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ .persist(events.toList) .thenRun((state: Option[S]) => // run the effector asynchronously, as it can describe long-running processes - dispatcher.unsafeRunAndForget(handleSideEffect(state)) + dispatcher.unsafeRunAndForget( + handleSideEffect(state, SideEffect.Trigger.AfterPersistence) + ) ) .thenReply(command.replyTo) { (_: Option[S]) => Reply(incomingCommand.replyEncoder.encode(reply)) @@ -121,9 +125,14 @@ private[deploy] class EventSourcedShardedEntityDeployer[F[ .pure[F] case Right((_, reply)) => Effect - .reply[Reply, E, Option[S]](command.replyTo)( - Reply(incomingCommand.replyEncoder.encode(reply)) + .none[E, Option[S]] + .thenRun((state: Option[S]) => + dispatcher + .unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterRead)) ) + .thenReply[Reply](command.replyTo) { (_: Option[S]) => + Reply(incomingCommand.replyEncoder.encode(reply)) + } .pure[F] } dispatcher.unsafeRunSync(effect)