Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trigger information to SideEffect #293

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[deploy] class DurableShardedRepositoryDeployer[F[
dispatcher.unsafeRunAndForget(
Logger[F].info(
show"Recovery of ${nameProvider()} entity ${context.entityId} completed"
) >> handleSideEffect(state)
) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery)
)
case (_, RecoveryFailed(failure)) =>
dispatcher.unsafeRunSync(
Expand All @@ -74,11 +74,12 @@ private[deploy] class DurableShardedRepositoryDeployer[F[
}

private def handleSideEffect(
state: Option[S]
state: Option[S],
trigger: SideEffect.Trigger
)(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = {
for {
effector <- Effector[F, S, Alg](entity, state)
_ <- sideEffect.apply(effector)
_ <- sideEffect.apply(trigger, effector)
passivationState <- effector.passivationState
_ <- passivator.apply(passivationState)
} yield ()
Expand All @@ -100,15 +101,24 @@ private[deploy] class DurableShardedRepositoryDeployer[F[
case Some(value) => DurableEntityT.State.Existing(value)
case None => DurableEntityT.State.None
})
.flatMap { case (state, reply) =>
(state match {
.flatMap { case (outcome, reply) =>
(outcome match {
case State.None => Effect.none
case State.Existing(_) => Effect.none
case State.Updated(state) => Effect.persist(Option(state))
})
.thenRun((state: Option[S]) =>
// run the effector asynchronously, as it can describe long-running processes
dispatcher.unsafeRunAndForget(handleSideEffect(state))
dispatcher.unsafeRunAndForget(
handleSideEffect(
state,
outcome match {
case State.None => SideEffect.Trigger.AfterRead
case State.Existing(_) => SideEffect.Trigger.AfterRead
case State.Updated(_) => SideEffect.Trigger.AfterPersistence
}
)
)
)
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[
Logger[F]
.info(
show"Recovery of ${nameProvider()} entity ${context.entityId} completed"
) >> handleSideEffect(state)
) >> handleSideEffect(state, SideEffect.Trigger.AfterRecovery)
}
case (_, RecoveryFailed(failure)) =>
dispatcher.unsafeRunSync(
Expand All @@ -75,11 +75,12 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[
}

private def handleSideEffect(
state: Option[S]
state: Option[S],
trigger: SideEffect.Trigger
)(implicit sideEffect: SideEffect[F, S, Alg], entity: Alg[F], passivator: EntityPassivator[F]) = {
for {
effector <- Effector[F, S, Alg](entity, state)
_ <- sideEffect.apply(effector)
_ <- sideEffect.apply(trigger, effector)
passivationState <- effector.passivationState
_ <- passivator.apply(passivationState)
} yield ()
Expand Down Expand Up @@ -116,17 +117,23 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[
(state: Option[
S
]) => // run the effector asynchronously, as it can describe long-running processes
dispatcher.unsafeRunAndForget(handleSideEffect(state))
dispatcher
.unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterPersistence))
)
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
case Right((_, reply)) =>
Effect
.reply[Reply, E, Option[S]](command.replyTo)(
Reply(incomingCommand.replyEncoder.encode(reply))
.none[E, Option[S]]
.thenRun((state: Option[S]) =>
dispatcher
.unsafeRunAndForget(handleSideEffect(state, SideEffect.Trigger.AfterRead))
)
.thenReply[Reply](command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
}
dispatcher.unsafeRunSync(effect)
Expand Down
36 changes: 32 additions & 4 deletions core/src/main/scala/endless/core/entity/SideEffect.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package endless.core.entity
import cats.syntax.eq.*

/** `SideEffect[F, S, Alg]` represents a side-effect applied in context `F`. The side-effect is
* triggered just after events persistence, and is interpreted with `Async` in order to allow for
* asynchronous processes. The passed `Effector` can be used to access entity state and algebra and
* to control passivation.
* triggered just after events persistence if any, or after some reads for a read-only command. Its
* is interpreted with `Async` in order to allow for asynchronous processes. The passed `Effector`
* can be used to access entity state and algebra and to control passivation.
* @tparam F
* effect type
* @tparam S
Expand All @@ -12,5 +13,32 @@
* entity algebra
*/
trait SideEffect[F[_], S, Alg[_[_]]] {
def apply(effector: Effector[F, S, Alg]): F[Unit]
def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit]
}

object SideEffect {

/** Trigger for the invocation of a side-effect: this allows for differentiated behavior depending
* on the context in which the side-effect is triggered.
*/
sealed trait Trigger {
def isAfterPersistence: Boolean = this === Trigger.AfterPersistence
def isAfterRead: Boolean = this === Trigger.AfterRead

Check warning on line 26 in core/src/main/scala/endless/core/entity/SideEffect.scala

View check run for this annotation

Codecov / codecov/patch

core/src/main/scala/endless/core/entity/SideEffect.scala#L25-L26

Added lines #L25 - L26 were not covered by tests
def isAfterRecovery: Boolean = this === Trigger.AfterRecovery
}
object Trigger {

/** Triggered just after events or state persistence */
case object AfterPersistence extends Trigger

/** Triggered just after processing a read-only command (no events were written, the state
* hasn't changed)
*/
case object AfterRead extends Trigger

/** Triggered just after recovery */
case object AfterRecovery extends Trigger

implicit val eqTrigger: cats.Eq[Trigger] = cats.Eq.fromUniversalEquals
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@
* unit side-effect interpreter in context `F`
*/
def unit[F[_]: Applicative, S, Alg[_[_]], RepositoryAlg[_[_]]]
: SideEffectInterpreter[F, S, Alg, RepositoryAlg] = lift((_, _) => _ => Applicative[F].unit)
: SideEffectInterpreter[F, S, Alg, RepositoryAlg] =
lift((_, _) => (_, _) => Applicative[F].unit)

Check warning on line 59 in core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala

View check run for this annotation

Codecov / codecov/patch

core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala#L59

Added line #L59 was not covered by tests
}
12 changes: 1 addition & 11 deletions documentation/src/main/paradox/effector.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@ trait Self[F[_], Alg[_[_]]] {
trait Effector[F[_], S] extends StateReader[F, S] with Passivator[F] with Self[F]
```

@scaladoc[Effector](endless.core.entity.Effector) is a typeclass used to describe side effects occurring **after** event persistence and entity recovery.

Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and [entity passivation](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#passivation) (flushing out of memory). `Effector` is used in a `Effector => F[Unit]` function provided upon entity deployment (e.g. @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingEffector.scala)). In the provided runtimes, the resulting `F[Unit]` is executed in *run & forget* mode so that command reply is not delayed by any lengthy side-effect (`Self` can be used to notify success or failure of asynchronous operations back to the entity).

@@@ warning
In the provided Akka/Pekko runtimes, read-only commands (commands that do not generate events) do not trigger side-effects, which is sound practice.
@@@

@@@ note
Defining an effector is entirely optional, pass-in `EffectorInterpreter.unit[F, S, Alg, RepositoryAlg]` in @scaladoc[deployRepository](endless.runtime.akka.deploy.Deployer) if there are no side-effects to describe.
@@@
@scaladoc[Effector](endless.core.entity.Effector) is a typeclass used to describe side effects in the context of a @ref:[SideEffect](side-effect.md) definition.

## State-derived side-effects
@scaladoc[StateReader](endless.core.entity.StateReader) allows reading the updated entity state after event persistence or recovery.
Expand Down
10 changes: 8 additions & 2 deletions documentation/src/main/paradox/side-effect.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

```scala
trait SideEffect[F[_], S, Alg[_[_]]] {
def apply(effector: Effector[F, S, Alg]): F[Unit]
def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit]
}
```

Expand All @@ -12,4 +12,10 @@ trait SideEffect[F[_], S, Alg[_[_]]] {
- `Alg[_[_]]`: entity algebra, allowing "back-interaction" with the entity itself (e.g. for at least once process definition, see note in @ref:[Effector](effector.md))
- `S`: entity state, e.g. @github[Booking](/example/src/main/scala/endless/example/data/Booking.scala)

It represents a side-effect that is triggered just after events persistence, and interpreted with `Async` to allow for asynchronicity. The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation.
It represents a side-effect that is triggered according to `trigger` either after event persistence, command handling (for a read-only behavior invocation) or recovery. Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and [entity passivation](https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#passivation) (flushing out of memory).

In the runtime, the resulting `F[Unit]` is interpreted with `Async` in *run & forget* mode so that command reply is not delayed by any lengthy side-effect. The passed @ref:[Effector](effector.md) instance can be used to access entity state, chain further interactions with the entity itself and to control passivation (for an example, see @github[BookingEffector](/example/src/main/scala/endless/example/logic/BookingSideEffect.scala)

@@@ note
Defining a side-effect is entirely optional, pass-in `SideEffectInterpreter.unit` in @scaladoc[deployRepository](endless.runtime.akka.deploy.Deployer) if there are no side-effects to describe.
@@@
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object AkkaApp extends Bookings with Vehicles with Availabilities {
deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg](
RepositoryInterpreter.lift(ShardedVehicles(_)),
DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)),
(_, _) => VehicleSideEffect()
SideEffectInterpreter.lift((_, _) => new VehicleSideEffect())
)
)
.flatMap { case (bookingDeployment, vehicleDeployment) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object PekkoApp extends Bookings with Vehicles with Availabilities {
deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg](
RepositoryInterpreter.lift(ShardedVehicles(_)),
DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)),
(_, _) => VehicleSideEffect()
SideEffectInterpreter.lift { case (_, _) => new VehicleSideEffect() }
)
)
.flatMap { case (bookingDeployment, vehicleDeployment) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.show.*
import cats.{Applicative, Monad}
import endless.core.entity.SideEffect.Trigger
import endless.core.entity.{Effector, SideEffect}
import endless.example.algebra.{AvailabilityAlg, BookingAlg}
import endless.example.data.Booking
Expand All @@ -17,12 +18,12 @@ import scala.concurrent.duration.*
class BookingSideEffect[F[_]: Logger: Monad]()(implicit
availabilityAlg: AvailabilityAlg[F]
) extends SideEffect[F, Booking, BookingAlg] {
def apply(effector: Effector[F, Booking, BookingAlg]): F[Unit] = {
def apply(trigger: Trigger, effector: Effector[F, Booking, BookingAlg]): F[Unit] = {
import effector.*

val availabilityProcess: Booking => F[Unit] = booking =>
booking.status match {
case Status.Pending =>
(booking.status, trigger) match {
case (Status.Pending, Trigger.AfterRecovery | Trigger.AfterPersistence) =>
(availabilityAlg.isCapacityAvailable(
booking.time,
booking.passengerCount
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
package endless.example.logic

import cats.Applicative
import cats.effect.{Concurrent, Ref}
import cats.{Applicative, Monad}
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import endless.core.entity.SideEffect.Trigger
import endless.core.entity.{Effector, SideEffect}
import endless.example.algebra.VehicleAlg
import endless.example.data.Vehicle

import scala.concurrent.duration.*

class VehicleSideEffect[F[_]: Concurrent](justRecoveredRef: Ref[F, Boolean])
extends SideEffect[F, Vehicle, VehicleAlg] {
def apply(effector: Effector[F, Vehicle, VehicleAlg]): F[Unit] = {
class VehicleSideEffect[F[_]: Monad] extends SideEffect[F, Vehicle, VehicleAlg] {
def apply(trigger: Trigger, effector: Effector[F, Vehicle, VehicleAlg]): F[Unit] = {
lazy val aggressivePassivation = effector.enablePassivation(1.second)
for {
justRecovered <- justRecoveredRef.getAndUpdate(_ => false)
_ <- Applicative[F].whenA(justRecovered)(effector.self.incrementRecoveryCount)
_ <- Applicative[F].whenA(trigger.isAfterRecovery)(effector.self.incrementRecoveryCount)
_ <- aggressivePassivation
} yield ()
}
}

object VehicleSideEffect {
def apply[F[_]: Concurrent](): F[SideEffect[F, Vehicle, VehicleAlg]] =
Ref[F].of(true).map(new VehicleSideEffect[F](_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.syntax.show.*
import endless.\/
import endless.core.entity.Effector
import endless.core.entity.Effector.PassivationState
import endless.core.entity.SideEffect.Trigger
import endless.example.algebra.{AvailabilityAlg, BookingAlg}
import endless.example.data.{Booking, LatLon}
import org.scalacheck.effect.PropF.forAllF
Expand All @@ -22,41 +23,41 @@ class BookingSideEffectSuite
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
implicit private def availabilityAlg: AvailabilityAlg[IO] = (_: Instant, _: Int) => IO(true)

test("some state logs") {
test("some state generates logs after persistence") {
forAllF { (booking: Booking) =>
val acceptedBooking = booking.copy(status = Booking.Status.Accepted)
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(acceptedBooking)
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector)
_ <- assertIO(logger.logged.map(_.map(_.message).last), show"State is now $acceptedBooking")
} yield ()
}
}

test("some state passivates after one hour") {
forAllF { (booking: Booking) =>
forAllF { (booking: Booking, trigger: Trigger) =>
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(booking.copy(status = Booking.Status.Accepted))
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(trigger, effector)
_ <- assertIO(effector.passivationState, Effector.PassivationState.After(1.hour))
} yield ()
}
}

test("passivates immediately when cancelled") {
forAllF { (booking: Booking) =>
forAllF { (booking: Booking, trigger: Trigger) =>
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(booking.copy(status = Booking.Status.Cancelled))
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(trigger, effector)
_ <- assertIO(effector.passivationState, PassivationState.After(Duration.Zero))
} yield ()
}
Expand All @@ -76,7 +77,7 @@ class BookingSideEffectSuite
},
Some(booking.copy(status = Booking.Status.Pending))
)
_ <- new BookingSideEffect().apply(effector)
_ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector)
} yield ()
}
}
Expand Down
4 changes: 4 additions & 0 deletions example/src/test/scala/endless/example/logic/Generators.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package endless.example.logic

import endless.core.entity.SideEffect.Trigger
import endless.example.algebra.BookingAlg.{
BookingAlreadyExists,
BookingUnknown,
Expand All @@ -13,6 +14,8 @@ import org.scalacheck.{Arbitrary, Gen}
import java.time.Instant

trait Generators {
implicit val triggerGen: Gen[Trigger] =
Gen.oneOf(Trigger.AfterPersistence, Trigger.AfterRecovery, Trigger.AfterRead)
implicit val latLonGen: Gen[LatLon] = for {
latitude <- Gen.double
longitude <- Gen.double
Expand Down Expand Up @@ -42,4 +45,5 @@ trait Generators {
)
implicit val arbBookingUnknown: Arbitrary[BookingUnknown.type] = Arbitrary(bookingUnknownGen)
implicit val arbCancelError: Arbitrary[CancelError] = Arbitrary(cancelErrorGen)
implicit val arbTrigger: Arbitrary[Trigger] = Arbitrary(triggerGen)
}
Loading
Loading