Skip to content

Commit

Permalink
Add trigger information to SideEffect to allow for differentiated b…
Browse files Browse the repository at this point in the history
…ehavior
  • Loading branch information
Jonas Chapuis authored and jchapuis committed Dec 18, 2023
1 parent 71e3582 commit eaa96e5
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[deploy] class DurableShardedRepositoryDeployer[F[
dispatcher.unsafeRunAndForget(
Logger[F].info(
show"Recovery of ${nameProvider()} entity ${context.entityId} completed"
) >> handleSideEffect(state)
) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery)
)
case (_, RecoveryFailed(failure)) =>
dispatcher.unsafeRunSync(
Expand All @@ -74,11 +74,12 @@ private[deploy] class DurableShardedRepositoryDeployer[F[
}

private def handleSideEffect(
state: Option[S]
state: Option[S],
trigger: SideEffect.Trigger
)(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = {
for {
effector <- Effector[F, S, Alg](entity, state)
_ <- sideEffect.apply(effector)
_ <- sideEffect.apply(trigger, effector)
passivationState <- effector.passivationState
_ <- passivator.apply(passivationState)
} yield ()
Expand All @@ -100,15 +101,24 @@ private[deploy] class DurableShardedRepositoryDeployer[F[
case Some(value) => DurableEntityT.State.Existing(value)
case None => DurableEntityT.State.None
})
.flatMap { case (state, reply) =>
(state match {
.flatMap { case (outcome, reply) =>
(outcome match {
case State.None => Effect.none
case State.Existing(_) => Effect.none
case State.Updated(state) => Effect.persist(Option(state))
})
.thenRun((state: Option[S]) =>
// run the effector asynchronously, as it can describe long-running processes
dispatcher.unsafeRunAndForget(handleSideEffect(state))
dispatcher.unsafeRunAndForget(
handleSideEffect(
state,
outcome match {
case State.None => SideEffect.Trigger.AfterRead
case State.Existing(_) => SideEffect.Trigger.AfterRead
case State.Updated(_) => SideEffect.Trigger.AfterPersistence
}
)
)
)
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[
Logger[F]
.info(
show"Recovery of ${nameProvider()} entity ${context.entityId} completed"
) >> handleSideEffect(state)
) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery)
}
case (_, RecoveryFailed(failure)) =>
dispatcher.unsafeRunSync(
Expand All @@ -75,11 +75,12 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[
}

private def handleSideEffect(
state: Option[S]
state: Option[S],
trigger: SideEffect.Trigger
)(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = {
for {
effector <- Effector[F, S, Alg](entity, state)
_ <- sideEffect.apply(effector)
_ <- sideEffect.apply(trigger, effector)
passivationState <- effector.passivationState
_ <- passivator.apply(passivationState)
} yield ()
Expand Down Expand Up @@ -116,17 +117,23 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[
(state: Option[
S
]) => // run the effector asynchronously, as it can describe long-running processes
dispatcher.unsafeRunAndForget(handleSideEffect(state))
dispatcher
.unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterPersistence))
)
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
case Right((_, reply)) =>
Effect
.reply[Reply, E, Option[S]](command.replyTo)(
Reply(incomingCommand.replyEncoder.encode(reply))
.none[E, Option[S]]
.thenRun((state: Option[S]) =>
dispatcher
.unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterRead))
)
.thenReply[Reply](command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
}
dispatcher.unsafeRunSync(effect)
Expand Down
36 changes: 32 additions & 4 deletions core/src/main/scala/endless/core/entity/SideEffect.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package endless.core.entity
import cats.syntax.eq.*

/** `SideEffect[F, S, Alg]` represents a side-effect applied in context `F`. The side-effect is
* triggered just after events persistence, and is interpreted with `Async` in order to allow for
* asynchronous processes. The passed `Effector` can be used to access entity state and algebra and
* to control passivation.
* triggered just after events persistence if any, or after some reads for a read-only command. Its
* is interpreted with `Async` in order to allow for asynchronous processes. The passed `Effector`
* can be used to access entity state and algebra and to control passivation.
* @tparam F
* effect type
* @tparam S
Expand All @@ -12,5 +13,32 @@ package endless.core.entity
* entity algebra
*/
trait SideEffect[F[_], S, Alg[_[_]]] {
def apply(effector: Effector[F, S, Alg]): F[Unit]
def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit]
}

object SideEffect {

/** Trigger for the invocation of a side-effect: this allows for differentiated behavior depending
* on the context in which the side-effect is triggered.
*/
sealed trait Trigger {
def isAfterPersistence: Boolean = this === Trigger.AfterPersistence
def isAfterRead: Boolean = this === Trigger.AfterRead
def isAfterRecovery: Boolean = this === Trigger.AfterRecovery
}
object Trigger {

/** Triggered just after events or state persistence */
case object AfterPersistence extends Trigger

/** Triggered just after processing a read-only command (no events were written, the state
* hasn't changed)
*/
case object AfterRead extends Trigger

/** Triggered just after recovery */
case object AfterRecovery extends Trigger

implicit val eqTrigger: cats.Eq[Trigger] = cats.Eq.fromUniversalEquals
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ object SideEffectInterpreter {
* unit side-effect interpreter in context `F`
*/
def unit[F[_]: Applicative, S, Alg[_[_]], RepositoryAlg[_[_]]]
: SideEffectInterpreter[F, S, Alg, RepositoryAlg] = lift((_, _) => _ => Applicative[F].unit)
: SideEffectInterpreter[F, S, Alg, RepositoryAlg] =
lift((_, _) => (_, _) => Applicative[F].unit)
}
12 changes: 1 addition & 11 deletions documentation/src/main/paradox/effector.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@ trait Self[F[_], Alg[_[_]]] {
trait Effector[F[_], S] extends StateReader[F, S] with Passivator[F] with Self[F]
```

@scaladoc[Effector](endless.core.entity.Effector) is a typeclass used to describe side effects occurring **after** event persistence and entity recovery.

Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and [entity passivation](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#passivation) (flushing out of memory). `Effector` is used in a `Effector => F[Unit]` function provided upon entity deployment (e.g. @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingEffector.scala)). In the provided runtimes, the resulting `F[Unit]` is executed in *run & forget* mode so that command reply is not delayed by any lengthy side-effect (`Self` can be used to notify success or failure of asynchronous operations back to the entity).

@@@ warning
In the provided Akka/Pekko runtimes, read-only commands (commands that do not generate events) do not trigger side-effects, which is sound practice.
@@@

@@@ note
Defining an effector is entirely optional, pass-in `EffectorInterpreter.unit[F, S, Alg, RepositoryAlg]` in @scaladoc[deployRepository](endless.runtime.akka.deploy.Deployer) if there are no side-effects to describe.
@@@
@scaladoc[Effector](endless.core.entity.Effector) is a typeclass used to describe side effects in the context of a @ref:[SideEffect](side-effect.md) definition.

## State-derived side-effects
@scaladoc[StateReader](endless.core.entity.StateReader) allows reading the updated entity state after event persistence or recovery.
Expand Down
10 changes: 8 additions & 2 deletions documentation/src/main/paradox/side-effect.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

```scala
trait SideEffect[F[_], S, Alg[_[_]]] {
def apply(effector: Effector[F, S, Alg]): F[Unit]
def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit]
}
```

Expand All @@ -12,4 +12,10 @@ trait SideEffect[F[_], S, Alg[_[_]]] {
- `Alg[_[_]]`: entity algebra, allowing "back-interaction" with the entity itself (e.g. for at least once process definition, see note in @ref:[Effector](effector.md))
- `S`: entity state, e.g. @github[Booking](/example/src/main/scala/endless/example/data/Booking.scala)

It represents a side-effect that is triggered just after events persistence, and interpreted with `Async` to allow for asynchronicity. The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation.
It represents a side-effect that is triggered according to `trigger` either after event persistence, command handling (for a read-only behavior invocation) or recovery. Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and [entity passivation](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#passivation) (flushing out of memory).

In the runtime, the resulting `F[Unit]` is interpreted with `Async` in *run & forget* mode so that command reply is not delayed by any lengthy side-effect. The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation (for an example, see @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingSideEffect.scala)

@@@ note
Defining a side-effect is entirely optional, pass-in `SideEffectInterpreter.unit` in @scaladoc[deployRepository](endless.runtime.akka.deploy.Deployer) if there are no side-effects to describe.
@@@
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object AkkaApp extends Bookings with Vehicles with Availabilities {
deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg](
RepositoryInterpreter.lift(ShardedVehicles(_)),
DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)),
(_, _) => VehicleSideEffect()
SideEffectInterpreter.lift((_, _) => new VehicleSideEffect())
)
)
.flatMap { case (bookingDeployment, vehicleDeployment) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object PekkoApp extends Bookings with Vehicles with Availabilities {
deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg](
RepositoryInterpreter.lift(ShardedVehicles(_)),
DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)),
(_, _) => VehicleSideEffect()
SideEffectInterpreter.lift { case (_, _) => new VehicleSideEffect() }
)
)
.flatMap { case (bookingDeployment, vehicleDeployment) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.show.*
import cats.{Applicative, Monad}
import endless.core.entity.SideEffect.Trigger
import endless.core.entity.{Effector, SideEffect}
import endless.example.algebra.{AvailabilityAlg, BookingAlg}
import endless.example.data.Booking
Expand All @@ -17,12 +18,12 @@ import scala.concurrent.duration.*
class BookingSideEffect[F[_]: Logger: Monad]()(implicit
availabilityAlg: AvailabilityAlg[F]
) extends SideEffect[F, Booking, BookingAlg] {
def apply(effector: Effector[F, Booking, BookingAlg]): F[Unit] = {
def apply(trigger: Trigger, effector: Effector[F, Booking, BookingAlg]): F[Unit] = {
import effector.*

val availabilityProcess: Booking => F[Unit] = booking =>
booking.status match {
case Status.Pending =>
(booking.status, trigger) match {
case (Status.Pending, Trigger.AfterRecovery | Trigger.AfterPersistence) =>
(availabilityAlg.isCapacityAvailable(
booking.time,
booking.passengerCount
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
package endless.example.logic

import cats.Applicative
import cats.effect.{Concurrent, Ref}
import cats.{Applicative, Monad}
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import endless.core.entity.SideEffect.Trigger
import endless.core.entity.{Effector, SideEffect}
import endless.example.algebra.VehicleAlg
import endless.example.data.Vehicle

import scala.concurrent.duration.*

class VehicleSideEffect[F[_]: Concurrent](justRecoveredRef: Ref[F, Boolean])
extends SideEffect[F, Vehicle, VehicleAlg] {
def apply(effector: Effector[F, Vehicle, VehicleAlg]): F[Unit] = {
class VehicleSideEffect[F[_]: Monad] extends SideEffect[F, Vehicle, VehicleAlg] {
def apply(trigger: Trigger, effector: Effector[F, Vehicle, VehicleAlg]): F[Unit] = {
lazy val aggressivePassivation = effector.enablePassivation(1.second)
for {
justRecovered <- justRecoveredRef.getAndUpdate(_ => false)
_ <- Applicative[F].whenA(justRecovered)(effector.self.incrementRecoveryCount)
_ <- Applicative[F].whenA(trigger.isAfterRecovery)(effector.self.incrementRecoveryCount)
_ <- aggressivePassivation
} yield ()
}
}

object VehicleSideEffect {
def apply[F[_]: Concurrent](): F[SideEffect[F, Vehicle, VehicleAlg]] =
Ref[F].of(true).map(new VehicleSideEffect[F](_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.syntax.show.*
import endless.\/
import endless.core.entity.Effector
import endless.core.entity.Effector.PassivationState
import endless.core.entity.SideEffect.Trigger
import endless.example.algebra.{AvailabilityAlg, BookingAlg}
import endless.example.data.{Booking, LatLon}
import org.scalacheck.effect.PropF.forAllF
Expand All @@ -22,41 +23,41 @@ class BookingSideEffectSuite
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
implicit private def availabilityAlg: AvailabilityAlg[IO] = (_: Instant, _: Int) => IO(true)

test("some state logs") {
test("some state generates logs after persistence") {
forAllF { (booking: Booking) =>
val acceptedBooking = booking.copy(status = Booking.Status.Accepted)
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(acceptedBooking)
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector)
_ <- assertIO(logger.logged.map(_.map(_.message).last), show"State is now $acceptedBooking")
} yield ()
}
}

test("some state passivates after one hour") {
forAllF { (booking: Booking) =>
forAllF { (booking: Booking, trigger: Trigger) =>
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(booking.copy(status = Booking.Status.Accepted))
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(trigger, effector)
_ <- assertIO(effector.passivationState, Effector.PassivationState.After(1.hour))
} yield ()
}
}

test("passivates immediately when cancelled") {
forAllF { (booking: Booking) =>
forAllF { (booking: Booking, trigger: Trigger) =>
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(booking.copy(status = Booking.Status.Cancelled))
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(trigger, effector)
_ <- assertIO(effector.passivationState, PassivationState.After(Duration.Zero))
} yield ()
}
Expand All @@ -76,7 +77,7 @@ class BookingSideEffectSuite
},
Some(booking.copy(status = Booking.Status.Pending))
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector)
} yield ()
}
}
Expand Down
4 changes: 4 additions & 0 deletions example/src/test/scala/endless/example/logic/Generators.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package endless.example.logic

import endless.core.entity.SideEffect.Trigger
import endless.example.algebra.BookingAlg.{
BookingAlreadyExists,
BookingUnknown,
Expand All @@ -13,6 +14,8 @@ import org.scalacheck.{Arbitrary, Gen}
import java.time.Instant

trait Generators {
implicit val triggerGen: Gen[Trigger] =
Gen.oneOf(Trigger.AfterPersistence, Trigger.AfterRecovery, Trigger.AfterRead)
implicit val latLonGen: Gen[LatLon] = for {
latitude <- Gen.double
longitude <- Gen.double
Expand Down Expand Up @@ -42,4 +45,5 @@ trait Generators {
)
implicit val arbBookingUnknown: Arbitrary[BookingUnknown.type] = Arbitrary(bookingUnknownGen)
implicit val arbCancelError: Arbitrary[CancelError] = Arbitrary(cancelErrorGen)
implicit val arbTrigger: Arbitrary[Trigger] = Arbitrary(triggerGen)
}
Loading

0 comments on commit eaa96e5

Please sign in to comment.