Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: endless4s/endless
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.14.0
Choose a base ref
...
head repository: endless4s/endless
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref

Commits on Dec 7, 2021

  1. Add discord channel

    Jonas Chapuis authored and jchapuis committed Dec 7, 2021
    Copy the full SHA
    04899c3 View commit details

Commits on Dec 11, 2021

  1. Update sbt-protoc to 1.0.5

    scala-steward authored and jchapuis committed Dec 11, 2021
    Copy the full SHA
    d108a70 View commit details
  2. Update sbt to 1.5.6

    scala-steward authored and jchapuis committed Dec 11, 2021
    Copy the full SHA
    00ec6a6 View commit details
  3. Copy the full SHA
    d7fe199 View commit details

Commits on Dec 12, 2021

  1. Avoid triggering side-effects for read-only commands

    Jonas Chapuis authored and jchapuis committed Dec 12, 2021
    Copy the full SHA
    81f93f5 View commit details

Commits on Dec 16, 2021

  1. Update sbt to 1.5.7

    scala-steward authored and jchapuis committed Dec 16, 2021
    Copy the full SHA
    ef6ce39 View commit details
  2. Update logback-classic to 1.2.8

    scala-steward authored and jchapuis committed Dec 16, 2021
    Copy the full SHA
    ba06f65 View commit details
  3. Add direct map def on EntityT

    Jonas Chapuis authored and jchapuis committed Dec 16, 2021
    Copy the full SHA
    b76aa76 View commit details
  4. Set policy to binary-compatible

    Jonas Chapuis authored and jchapuis committed Dec 16, 2021
    Copy the full SHA
    cb2e6ca View commit details

Commits on Dec 17, 2021

  1. Update logback-classic to 1.2.9

    scala-steward authored and jchapuis committed Dec 17, 2021
    Copy the full SHA
    fbc24de View commit details

Commits on Dec 24, 2021

  1. Update logback-classic to 1.2.10

    scala-steward authored and jchapuis committed Dec 24, 2021
    Copy the full SHA
    07b60e6 View commit details
  2. Update sbt to 1.5.8

    scala-steward authored and jchapuis committed Dec 24, 2021
    Copy the full SHA
    79d60bd View commit details
  3. Update scalafmt-core to 3.2.2

    scala-steward authored and jchapuis committed Dec 24, 2021
    Copy the full SHA
    f54f4a8 View commit details
  4. Copy the full SHA
    d65b572 View commit details

Commits on Dec 27, 2021

  1. Update sbt to 1.6.0

    scala-steward authored and jchapuis committed Dec 27, 2021
    Copy the full SHA
    c25787a View commit details
  2. Update scalafmt-core to 3.3.0

    scala-steward authored and jchapuis committed Dec 27, 2021
    Copy the full SHA
    0c70593 View commit details

Commits on Dec 29, 2021

  1. Update sbt to 1.6.1

    scala-steward authored and jchapuis committed Dec 29, 2021
    Copy the full SHA
    147582a View commit details

Commits on Dec 30, 2021

  1. Copy the full SHA
    75f5291 View commit details

Commits on Jan 1, 2022

  1. Copy the full SHA
    9672894 View commit details

Commits on Jan 2, 2022

  1. Update scalafmt-core to 3.3.1

    scala-steward authored and jchapuis committed Jan 2, 2022
    Copy the full SHA
    c8485a1 View commit details

Commits on Jan 7, 2022

  1. Update sbt-protoc to 1.0.6

    scala-steward authored and jchapuis committed Jan 7, 2022
    Copy the full SHA
    51b9406 View commit details

Commits on Jan 9, 2022

  1. Copy the full SHA
    554ae8d View commit details

Commits on Jan 19, 2022

  1. Copy the full SHA
    d74ad27 View commit details
  2. Copy the full SHA
    f30de4d View commit details

Commits on Jan 20, 2022

  1. Update scalafmt-core to 3.3.2

    scala-steward authored and jchapuis committed Jan 20, 2022
    Copy the full SHA
    f927a2b View commit details
  2. Reformat with scalafmt 3.3.2

    scala-steward authored and jchapuis committed Jan 20, 2022
    Copy the full SHA
    f50a2dc View commit details

Commits on Jan 26, 2022

  1. Copy the full SHA
    5756838 View commit details

Commits on Jan 31, 2022

  1. Copy the full SHA
    d4526b0 View commit details
  2. Add customizeEntity hook in Deployer

    Jonas Chapuis authored and jchapuis committed Jan 31, 2022
    Copy the full SHA
    956254f View commit details
  3. Set compatibility intention to None

    Jonas Chapuis authored and jchapuis committed Jan 31, 2022
    Copy the full SHA
    5b3b72c View commit details
  4. Copy the full SHA
    bcf96a6 View commit details
  5. Update sbt-scoverage to 1.9.3

    scala-steward authored and jchapuis committed Jan 31, 2022
    Copy the full SHA
    32130d4 View commit details
  6. Update scalafmt-core to 3.3.3

    scala-steward authored and jchapuis committed Jan 31, 2022
    Copy the full SHA
    1c6bb23 View commit details
  7. Copy the full SHA
    ddf872e View commit details

Commits on Feb 1, 2022

  1. Update sbt to 1.6.2

    scala-steward authored and jchapuis committed Feb 1, 2022
    Copy the full SHA
    df65af4 View commit details
  2. Update scalafmt-core to 3.4.0

    scala-steward authored and jchapuis committed Feb 1, 2022
    Copy the full SHA
    4745392 View commit details

Commits on Feb 6, 2022

  1. Copy the full SHA
    84c70e9 View commit details
  2. Update scalafmt-core to 3.4.2

    scala-steward authored and jchapuis committed Feb 6, 2022
    Copy the full SHA
    bb68541 View commit details

Commits on Feb 12, 2022

  1. Update scalafmt-core to 3.4.3

    scala-steward authored and jchapuis committed Feb 12, 2022
    Copy the full SHA
    b8df8c6 View commit details
  2. Copy the full SHA
    127fd0c View commit details

Commits on Feb 18, 2022

  1. Set compatibility intention to BinaryAndSourceCompatible

    Jonas Chapuis committed Feb 18, 2022
    Copy the full SHA
    43f940d View commit details
  2. Remove manual release workflow trigger

    Jonas Chapuis committed Feb 18, 2022
    Copy the full SHA
    05ff211 View commit details

Commits on Feb 28, 2022

  1. Copy the full SHA
    7cd84d4 View commit details

Commits on Mar 14, 2022

  1. Copy the full SHA
    9884ecd View commit details
  2. Update logback-classic to 1.2.11

    scala-steward authored and jchapuis committed Mar 14, 2022
    Copy the full SHA
    f5a65bf View commit details
  3. Copy the full SHA
    d85221d View commit details

Commits on Mar 20, 2022

  1. Copy the full SHA
    97ffef3 View commit details
  2. Copy the full SHA
    e44b8c8 View commit details

Commits on Mar 25, 2022

  1. Copy the full SHA
    ef15527 View commit details

Commits on Apr 18, 2022

  1. Copy the full SHA
    9e0cde4 View commit details
Showing with 5,945 additions and 1,844 deletions.
  1. +5 −0 .git-blame-ignore-revs
  2. +7 −3 .github/workflows/ci.yml
  3. +17 −8 .github/workflows/release.yml
  4. +1 −1 .scalafmt.conf
  5. +7 −0 CODE_OF_CONDUCT.md
  6. +6 −1 README.md
  7. +5 −0 {runtime → akka-runtime}/src/main/protobuf/command.proto
  8. +1 −0 {runtime → akka-runtime}/src/main/resources/reference.conf
  9. +51 −0 akka-runtime/src/main/scala/endless/runtime/akka/EntityPassivator.scala
  10. +11 −15 ...ommandRouter.scala → akka-runtime/src/main/scala/endless/runtime/akka/ShardingCommandSender.scala
  11. 0 {runtime → akka-runtime}/src/main/scala/endless/runtime/akka/data/Command.scala
  12. 0 {runtime → akka-runtime}/src/main/scala/endless/runtime/akka/data/Reply.scala
  13. +132 −0 akka-runtime/src/main/scala/endless/runtime/akka/deploy/AkkaCluster.scala
  14. +77 −0 akka-runtime/src/main/scala/endless/runtime/akka/deploy/AkkaDeployer.scala
  15. +76 −0 akka-runtime/src/main/scala/endless/runtime/akka/deploy/AkkaDurableDeployer.scala
  16. +137 −0 ...untime/src/main/scala/endless/runtime/akka/deploy/internal/DurableShardedRepositoryDeployer.scala
  17. +147 −0 ...e/src/main/scala/endless/runtime/akka/deploy/internal/EventSourcedShardedRepositoryDeployer.scala
  18. +53 −0 akka-runtime/src/main/scala/endless/runtime/akka/deploy/internal/ShardedRepositoryDeployer.scala
  19. +68 −0 akka-runtime/src/main/scala/endless/runtime/akka/protobuf/ScalaPbSerializer.scala
  20. +1 −1 {runtime → akka-runtime}/src/main/scala/endless/runtime/akka/serializer/CommandSerializer.scala
  21. 0 {runtime → akka-runtime}/src/main/scala/endless/runtime/akka/serializer/ReplySerializer.scala
  22. +7 −0 akka-runtime/src/main/scala/endless/runtime/akka/syntax/package.scala
  23. +18 −0 akka-runtime/src/test/protobuf/dummy.proto
  24. +62 −0 akka-runtime/src/test/scala/endless/runtime/akka/protobuf/ScalaPbSerializerSuite.scala
  25. +103 −30 build.sbt
  26. +6 −6 circe/src/main/scala/endless/circe/CirceCommandProtocol.scala
  27. +1 −1 circe/src/main/scala/endless/circe/CirceDecoder.scala
  28. +1 −1 circe/src/main/scala/endless/circe/CirceIncomingCommand.scala
  29. +0 −5 circe/src/main/scala/endless/circe/CirceOutgoingCommand.scala
  30. +0 −65 circe/src/main/scala/endless/circe/serializer/CirceSerializer.scala
  31. +18 −14 circe/src/test/scala/endless/circe/CirceCommandProtocolSuite.scala
  32. +6 −0 codecov.yml
  33. +1 −2 core/src/main/scala/endless/core/data/EventsFolder.scala
  34. +105 −0 core/src/main/scala/endless/core/entity/Deployer.scala
  35. +106 −0 core/src/main/scala/endless/core/entity/DurableDeployer.scala
  36. +24 −0 core/src/main/scala/endless/core/entity/DurableEntity.scala
  37. +30 −1 core/src/main/scala/endless/core/entity/Effector.scala
  38. +8 −97 core/src/main/scala/endless/core/entity/Entity.scala
  39. +6 −0 core/src/main/scala/endless/core/entity/Passivator.scala
  40. +0 −21 core/src/main/scala/endless/core/entity/Repository.scala
  41. +1 −1 core/src/main/scala/endless/core/entity/Self.scala
  42. +31 −0 core/src/main/scala/endless/core/entity/Sharding.scala
  43. +77 −0 core/src/main/scala/endless/core/entity/SideEffect.scala
  44. +1 −1 core/src/main/scala/endless/core/entity/StateReader.scala
  45. +128 −0 core/src/main/scala/endless/core/entity/StateReaderHelpers.scala
  46. +36 −0 core/src/main/scala/endless/core/entity/StateWriter.scala
  47. +0 −2 core/src/main/scala/endless/core/event/EventApplier.scala
  48. +43 −0 core/src/main/scala/endless/core/interpret/BehaviorInterpreter.scala
  49. +42 −0 core/src/main/scala/endless/core/interpret/DurableBehaviorInterpreter.scala
  50. +91 −0 core/src/main/scala/endless/core/interpret/DurableEntityT.scala
  51. +0 −84 core/src/main/scala/endless/core/interpret/EffectorT.scala
  52. +0 −8 core/src/main/scala/endless/core/interpret/EntityLift.scala
  53. +2 −3 core/src/main/scala/endless/core/interpret/EntityRunFunctions.scala
  54. +33 −12 core/src/main/scala/endless/core/interpret/EntityT.scala
  55. +37 −28 core/src/main/scala/endless/core/interpret/EntityTLiftInstance.scala
  56. +0 −16 core/src/main/scala/endless/core/interpret/LoggerLiftingHelper.scala
  57. +45 −0 core/src/main/scala/endless/core/interpret/RepositoryInterpreter.scala
  58. +0 −60 core/src/main/scala/endless/core/interpret/RepositoryT.scala
  59. +60 −0 core/src/main/scala/endless/core/interpret/SideEffectInterpreter.scala
  60. +32 −9 core/src/main/scala/endless/core/protocol/CommandProtocol.scala
  61. +0 −20 core/src/main/scala/endless/core/protocol/CommandRouter.scala
  62. +49 −0 core/src/main/scala/endless/core/protocol/CommandSender.scala
  63. +67 −25 core/src/test/scala/endless/core/entity/EffectorSuite.scala
  64. +13 −17 core/src/test/scala/endless/core/entity/{EntitySuite.scala → StateReaderHelpersSuite.scala}
  65. +174 −0 core/src/test/scala/endless/core/interpret/DurableEntityTSuite.scala
  66. +0 −95 core/src/test/scala/endless/core/interpret/EffectorTSuite.scala
  67. +34 −19 core/src/test/scala/endless/core/interpret/EntityTSuite.scala
  68. +6 −2 documentation/src/main/paradox/abstractions.md
  69. +2 −4 documentation/src/main/paradox/applier.md
  70. +40 −0 documentation/src/main/paradox/deployer.md
  71. +29 −0 documentation/src/main/paradox/durable-entity.md
  72. +13 −20 documentation/src/main/paradox/effector.md
  73. +9 −4 documentation/src/main/paradox/entity.md
  74. +37 −19 documentation/src/main/paradox/example.md
  75. +3 −3 documentation/src/main/paradox/getting-started.md
  76. +9 −4 documentation/src/main/paradox/index.md
  77. +4 −4 documentation/src/main/paradox/inspiration.md
  78. +1 −1 documentation/src/main/paradox/name.md
  79. +13 −8 documentation/src/main/paradox/nutshell.md
  80. +8 −5 documentation/src/main/paradox/protocol.md
  81. +4 −1 documentation/src/main/paradox/reference.md
  82. +0 −9 documentation/src/main/paradox/repository.md
  83. +0 −17 documentation/src/main/paradox/router.md
  84. +23 −18 documentation/src/main/paradox/runtime.md
  85. +13 −0 documentation/src/main/paradox/sender.md
  86. BIN documentation/src/main/paradox/sequences/BookingRepository.png
  87. +25 −0 documentation/src/main/paradox/sequences/BookingRepository.puml
  88. BIN documentation/src/main/paradox/sequences/PlaceBookingClient.png
  89. +30 −0 documentation/src/main/paradox/sequences/PlaceBookingClient.puml
  90. BIN documentation/src/main/paradox/sequences/PlaceBookingServer.png
  91. +35 −0 documentation/src/main/paradox/sequences/PlaceBookingServer.puml
  92. +9 −0 documentation/src/main/paradox/sharding.md
  93. +25 −0 documentation/src/main/paradox/side-effect.md
  94. +6 −0 documentation/src/main/paradox/transactions.md
  95. +43 −0 example/src/main/protobuf/booking/commands.proto
  96. +47 −0 example/src/main/protobuf/booking/events.proto
  97. +35 −0 example/src/main/protobuf/booking/models.proto
  98. +65 −0 example/src/main/protobuf/booking/replies.proto
  99. +9 −0 example/src/main/protobuf/package.proto
  100. +28 −0 example/src/main/protobuf/vehicle/commands.proto
  101. +12 −0 example/src/main/protobuf/vehicle/models.proto
  102. +19 −0 example/src/main/protobuf/vehicle/replies.proto
  103. +11 −0 example/src/main/protobuf/vehicle/state.proto
  104. +44 −0 example/src/main/resources/akka.conf
  105. +2 −42 example/src/main/resources/application.conf
  106. +2 −2 example/src/main/resources/logback.xml
  107. +44 −0 example/src/main/resources/pekko.conf
  108. +0 −156 example/src/main/scala/endless/example/ExampleApp.scala
  109. +0 −23 example/src/main/scala/endless/example/Main.scala
  110. +47 −0 example/src/main/scala/endless/example/adapter/BookingEventAdapter.scala
  111. +24 −0 example/src/main/scala/endless/example/adapter/VehicleStateAdapter.scala
  112. +2 −5 example/src/main/scala/endless/example/algebra/BookingAlg.scala
  113. +1 −1 example/src/main/scala/endless/example/algebra/{BookingRepositoryAlg.scala → BookingsAlg.scala}
  114. +14 −0 example/src/main/scala/endless/example/algebra/VehicleAlg.scala
  115. +9 −0 example/src/main/scala/endless/example/algebra/VehiclesAlg.scala
  116. +168 −0 example/src/main/scala/endless/example/app/HttpServer.scala
  117. +128 −0 example/src/main/scala/endless/example/app/akka/AkkaApp.scala
  118. +7 −0 example/src/main/scala/endless/example/app/akka/Main.scala
  119. +24 −0 example/src/main/scala/endless/example/app/impl/Availabilities.scala
  120. +18 −0 example/src/main/scala/endless/example/app/impl/Bookings.scala
  121. +16 −0 example/src/main/scala/endless/example/app/impl/Vehicles.scala
  122. +7 −0 example/src/main/scala/endless/example/app/pekko/Main.scala
  123. +131 −0 example/src/main/scala/endless/example/app/pekko/PekkoApp.scala
  124. +5 −8 example/src/main/scala/endless/example/data/Booking.scala
  125. +4 −4 example/src/main/scala/endless/example/data/BookingEvent.scala
  126. +13 −0 example/src/main/scala/endless/example/data/LatLon.scala
  127. +10 −0 example/src/main/scala/endless/example/data/Speed.scala
  128. +16 −0 example/src/main/scala/endless/example/data/Vehicle.scala
  129. +18 −13 example/src/main/scala/endless/example/logic/{BookingEntity.scala → BookingEntityBehavior.scala}
  130. +2 −2 example/src/main/scala/endless/example/logic/BookingEventApplier.scala
  131. +0 −14 example/src/main/scala/endless/example/logic/BookingRepository.scala
  132. +19 −18 example/src/main/scala/endless/example/logic/{BookingEffector.scala → BookingSideEffect.scala}
  133. +13 −0 example/src/main/scala/endless/example/logic/ShardedBookings.scala
  134. +11 −0 example/src/main/scala/endless/example/logic/ShardedVehicles.scala
  135. +43 −0 example/src/main/scala/endless/example/logic/VehicleEntityBehavior.scala
  136. +21 −0 example/src/main/scala/endless/example/logic/VehicleSideEffect.scala
  137. +0 −24 example/src/main/scala/endless/example/protocol/BookingCommand.scala
  138. +301 −49 example/src/main/scala/endless/example/protocol/BookingCommandProtocol.scala
  139. +108 −0 example/src/main/scala/endless/example/protocol/VehicleCommandProtocol.scala
  140. +0 −13 example/src/main/scala/endless/example/serializer/BookingEventSerializer.scala
  141. +12 −0 example/src/test/resources/logback-test.xml
  142. +10 −0 example/src/test/scala/endless/example/AkkaExampleAppSuite.scala
  143. +97 −49 example/src/test/scala/endless/example/ExampleAppSuite.scala
  144. +10 −0 example/src/test/scala/endless/example/PekkoExampleAppSuite.scala
  145. +0 −113 example/src/test/scala/endless/example/logic/BookingEffectorSuite.scala
  146. +15 −16 ...rc/test/scala/endless/example/logic/{BookingEntitySuite.scala → BookingEntityBehaviorSuite.scala}
  147. +13 −15 example/src/test/scala/endless/example/logic/BookingEventApplierSuite.scala
  148. +111 −0 example/src/test/scala/endless/example/logic/BookingSideEffectSuite.scala
  149. +10 −3 example/src/test/scala/endless/example/logic/Generators.scala
  150. +104 −0 example/src/test/scala/endless/example/logic/VehicleEntityBehaviorSuite.scala
  151. +68 −0 example/src/test/scala/endless/example/logic/VehicleSideEffectSuite.scala
  152. +80 −73 example/src/test/scala/endless/example/protocol/BookingCommandProtocolSuite.scala
  153. BIN logo/with-label/transparent/1x/logo.png
  154. +15 −0 pekko-runtime/src/main/protobuf/command.proto
  155. +20 −0 pekko-runtime/src/main/resources/reference.conf
  156. +51 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/EntityPassivator.scala
  157. +70 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/ShardingCommandSender.scala
  158. +16 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/data/Command.scala
  159. +8 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/data/Reply.scala
  160. +128 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/PekkoCluster.scala
  161. +81 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/PekkoDeployer.scala
  162. +81 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/PekkoDurableDeployer.scala
  163. +135 −0 ...o-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/DurableShardedEntityDeployer.scala
  164. +145 −0 ...time/src/main/scala/endless/runtime/pekko/deploy/internal/EventSourcedShardedEntityDeployer.scala
  165. +62 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/deploy/internal/ShardedRepositoryDeployer.scala
  166. +68 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/protobuf/ScalaPbSerializer.scala
  167. +58 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/serializer/CommandSerializer.scala
  168. +41 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/serializer/ReplySerializer.scala
  169. +7 −0 pekko-runtime/src/main/scala/endless/runtime/pekko/syntax/package.scala
  170. +17 −0 pekko-runtime/src/test/protobuf/dummy.proto
  171. +61 −0 pekko-runtime/src/test/scala/endless/runtime/pekko/protobuf/ScalaPbSerializerSuite.scala
  172. +55 −26 project/Dependencies.scala
  173. +1 −1 project/build.properties
  174. +13 −8 project/plugins.sbt
  175. +1 −0 project/project/build.properties
  176. +2 −2 project/scalapb.sbt
  177. +12 −16 protobuf/src/main/scala/endless/protobuf/ProtobufCommandProtocol.scala
  178. +13 −0 protobuf/src/main/scala/endless/protobuf/ProtobufIncomingCommand.scala
  179. +13 −0 protobuf/src/main/scala/endless/protobuf/ProtobufOutgoingCommand.scala
  180. +5 −0 protobuf/src/test/protobuf/dummy.proto
  181. +19 −15 protobuf/src/test/scala/endless/protobuf/ProtobufCommandProtocolSuite.scala
  182. +0 −258 runtime/src/main/scala/endless/runtime/akka/Deployer.scala
  183. +0 −47 runtime/src/main/scala/endless/runtime/akka/EntityPassivator.scala
  184. +0 −7 runtime/src/main/scala/endless/runtime/akka/syntax/package.scala
  185. +6 −6 scodec/src/main/scala/endless/scodec/ScodecCommandProtocol.scala
  186. +0 −5 scodec/src/main/scala/endless/scodec/ScodecOutgoingCommand.scala
  187. +26 −19 scodec/src/test/scala/endless/scodec/ScodecCommandProtocolSuite.scala
5 changes: 5 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Scala Steward: Reformat with scalafmt 3.6.0
96d4826c1b5792169ccd92c4e2b38ad450964320

# Scala Steward: Reformat with scalafmt 3.7.2
0918b199fb29938d1d5bb81e175c37a28edf5780
10 changes: 7 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
name: CI
on:
pull_request:
merge_group:
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2.3.4
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: olafurpg/setup-scala@v13
- run: sbt compile coverage test coverageReport coverageAggregate versionPolicyCheck documentation/makeSite
- uses: coursier/setup-action@v1.3.3
with:
jvm: temurin:17
apps: sbt
- run: sbt scalafmtCheckAll compile coverage +test coverageReport coverageAggregate versionPolicyCheck documentation/makeSite
- uses: codecov/codecov-action@v2.1.0
25 changes: 17 additions & 8 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
name: Release
on:
create:
tags:
push:
tags:
- 'v*'
workflow_dispatch:

jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2.3.4
- uses: olafurpg/setup-scala@v13
- uses: coursier/setup-action@v1.3.3
with:
jvm: temurin:17
apps: sbt
- name: Test and compute coverage
run: sbt coverage test coverageReport coverageAggregate
run: sbt scalafmtCheckAll coverage +test coverageReport coverageAggregate
- name: Codecov
uses: codecov/codecov-action@v2.1.0

publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2.3.4
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: olafurpg/setup-scala@v13
- uses: coursier/setup-action@v1.3.3
with:
jvm: temurin:17
apps: sbt
- run: sbt versionCheck ci-release
env:
PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }}
@@ -35,7 +41,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2.3.4
- uses: olafurpg/setup-scala@v13
- uses: coursier/setup-action@v1.3.3
with:
jvm: temurin:17
apps: sbt
- name: Generate website
run: sbt documentation/makeSite
- uses: JamesIves/github-pages-deploy-action@4.1.5
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version = 3.2.1
version = 3.8.3
runner.dialect = scala213
maxColumn = 100
7 changes: 7 additions & 0 deletions CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Code of Conduct
We are committed to providing a friendly, safe and welcoming environment for all, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, nationality, or other such characteristics.

Everyone is expected to follow the Scala Code of Conduct when discussing the project on the available communication channels. If you are being harassed, please contact us immediately so that we can support you.

## Moderation
Any questions, concerns, or moderation requests please contact a member of the project.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -4,7 +4,12 @@
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.github.endless4s/endless-core_2.13/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.github.endless4s/endless-core_2.13)
[![codecov](https://codecov.io/gh/endless4s/endless/branch/master/graph/badge.svg?token=aH9vOhLxVS)](https://codecov.io/gh/endless4s/endless)
[![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAA4AAAAQCAMAAAARSr4IAAAAVFBMVEUAAACHjojlOy5NWlrKzcYRKjGFjIbp293YycuLa3pYY2LSqql4f3pCUFTgSjNodYRmcXUsPD/NTTbjRS+2jomhgnzNc223cGvZS0HaSD0XLjbaSjElhIr+AAAAAXRSTlMAQObYZgAAAHlJREFUCNdNyosOwyAIhWHAQS1Vt7a77/3fcxxdmv0xwmckutAR1nkm4ggbyEcg/wWmlGLDAA3oL50xi6fk5ffZ3E2E3QfZDCcCN2YtbEWZt+Drc6u6rlqv7Uk0LdKqqr5rk2UCRXOk0vmQKGfc94nOJyQjouF9H/wCc9gECEYfONoAAAAASUVORK5CYII=)](https://scala-steward.org)
[![Discord](https://badgen.net/badge/icon/discord?icon=discord&label)](https://discord.gg/RceQCetW)
[![Typelevel Affiliate Project](https://img.shields.io/badge/typelevel-affiliate%20project-FFB4B5.svg)](https://typelevel.org/projects/affiliate/)
<a href="https://typelevel.org/cats/"><img src="https://typelevel.org/cats/img/cats-badge.svg" height="40px" align="right" alt="Cats friendly" /></a>

*endless* is a Scala library to describe event sourced entities using tagless-final algebras, running with built-in implementations for Akka.
*endless* is a Scala library to describe event sourced entities using tagless-final algebras, running with built-in implementations for Akka and Pekko.

Head to the [documentation](https://endless4s.github.io/index.html) to learn more.

For a more story-driven background explanation make sure to check out the [blog article](https://jonas-chapuis.medium.com/functional-event-sourcing-with-akka-and-cats-7c075939fbdc).
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
syntax = "proto2";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
scala3_sources: true
};

package endless.runtime.akka.serializer.proto;
option optimize_for = SPEED;
Original file line number Diff line number Diff line change
@@ -16,4 +16,5 @@ akka {
"endless.runtime.akka.data.Reply" = akka-persistence-tagless-reply
}
}
coordinated-shutdown.exit-jvm = on // ensure the JVM exits when the cluster decides to remove the node after a SBR decision
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package endless.runtime.akka

import akka.actor.Cancellable
import akka.actor.typed.scaladsl.ActorContext
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext}
import cats.Applicative
import cats.effect.kernel.{Ref, Sync}
import cats.syntax.eq.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import endless.core.entity.Effector.PassivationState

import scala.concurrent.duration.{Duration, FiniteDuration}

private[akka] class EntityPassivator[F[_]: Sync](upcomingPassivation: Ref[F, Option[Cancellable]])(
implicit
entityContext: EntityContext[?],
actorContext: ActorContext[?]
) {
private lazy val passivateMessage = ClusterSharding.Passivate(actorContext.self)
private lazy val passivate = Sync[F].delay(entityContext.shard.tell(passivateMessage))
private lazy val disablePassivation =
upcomingPassivation.modify(maybeCancellable => {
(None, maybeCancellable.foreach(_.cancel()))
})

def apply(passivationState: PassivationState): F[Unit] = passivationState match {
case PassivationState.After(duration) => enablePassivation(duration)
case PassivationState.Disabled => disablePassivation
case PassivationState.Unchanged => Applicative[F].unit
}

private def enablePassivation(after: FiniteDuration) =
if (after === Duration.Zero) passivate else schedulePassivation(after)

private def schedulePassivation(after: FiniteDuration) =
disablePassivation >> upcomingPassivation.set(
Some(
actorContext.scheduleOnce(after, entityContext.shard, passivateMessage)
)
)

}

object EntityPassivator {
def apply[F[_]: Sync](implicit
entityContext: EntityContext[?],
actorContext: ActorContext[?]
): F[EntityPassivator[F]] =
Ref.of[F, Option[Cancellable]](Option.empty[Cancellable]).map(new EntityPassivator(_))
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
package endless.runtime.akka

import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityTypeKey}
import akka.util.Timeout
import cats.effect.kernel.Async
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.show._
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.show.*
import cats.~>
import endless.core.entity.EntityNameProvider
import endless.core.protocol.{CommandRouter, EntityIDEncoder, OutgoingCommand}
import endless.core.protocol.{CommandSender, EntityIDEncoder, OutgoingCommand}
import endless.runtime.akka.data.{Command, Reply}
import org.typelevel.log4cats.Logger

/** Implementation of [[CommandRouter]] for Akka cluster sharding
/** Implementation of [[CommandSender]] for Akka cluster sharding
*
* Retrieves the entity ref and asks the command, then decodes the reply and lifts it into `F`
*
* @param sharding
* Akka cluster sharding extension
* @param actorSystem
* actor system
* @param askTimeout
* Akka ask timeout
* @param idEncoder
@@ -31,15 +29,14 @@ import org.typelevel.log4cats.Logger
* @tparam ID
* entity ID
*/
private[akka] final class ShardingCommandRouter[F[_]: Logger, ID](implicit
private[akka] final class ShardingCommandSender[F[_]: Logger, ID](implicit
sharding: ClusterSharding,
actorSystem: ActorSystem[_],
askTimeout: Timeout,
idEncoder: EntityIDEncoder[ID],
nameProvider: EntityNameProvider[ID],
F: Async[F]
) extends CommandRouter[F, ID] {
def routerForID(id: ID): OutgoingCommand[*] ~> F =
) extends CommandSender[F, ID] {
def senderForID(id: ID): OutgoingCommand[*] ~> F =
new (OutgoingCommand[*] ~> F) {
def apply[A](fa: OutgoingCommand[A]): F[A] = {
val encodedID = idEncoder.encode(id)
@@ -62,13 +59,12 @@ private[akka] final class ShardingCommandRouter[F[_]: Logger, ID](implicit
}
}

object ShardingCommandRouter {
object ShardingCommandSender {
implicit def apply[F[_]: Logger, ID](implicit
sharding: ClusterSharding,
actorSystem: ActorSystem[_],
askTimeout: Timeout,
idEncoder: EntityIDEncoder[ID],
nameProvider: EntityNameProvider[ID],
F: Async[F]
): CommandRouter[F, ID] = new ShardingCommandRouter
): CommandSender[F, ID] = new ShardingCommandSender
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package endless.runtime.akka.deploy

import akka.Done
import akka.actor.CoordinatedShutdown
import akka.actor.typed.ActorSystem
import akka.cluster.{Cluster, MemberStatus}
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import cats.effect.kernel.implicits.*
import cats.effect.kernel.{Async, Deferred, Resource, Sync}
import cats.effect.std.Dispatcher
import cats.implicits.catsSyntaxApplicativeError
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import org.typelevel.log4cats.Logger

import scala.concurrent.TimeoutException
import scala.concurrent.duration.{Duration, DurationInt}

/** Actor system and cluster sharding extension as well as dispatcher tied to its resource scope.
* @param system
* actor system
* @param cluster
* cluster extension
* @param sharding
* cluster sharding extension
* @param dispatcher
* effects dispatcher tied to the cluster resource scope
*/
final case class AkkaCluster[F[_]: Async](
system: ActorSystem[?],
dispatcher: Dispatcher[F],
cluster: Cluster,
sharding: ClusterSharding
) {

/** Returns true if the cluster member is up. Can be used for readiness checks.
*/
def isMemberUp: F[Boolean] = Sync[F].delay(cluster.selfMember.status match {
case MemberStatus.Up => true
case _ => false
})
}

object AkkaCluster {

/** Create a resource that manages the lifetime of an Akka actor system with cluster sharding
* extension. The actor system is created when the resource is acquired and shutdown when the
* resource is released.
*
* @see
* Some elements borrowed from
* https://alexn.org/blog/2023/04/17/integrating-akka-with-cats-effect-3/
*
* @param createActorSystem
* Actor system creator. It is recommended to use the IO execution context
* (`IO.executionContext`) for the actor system, as it supports Akka operation and it's simpler
* to have a single application execution context
*
* @param catsEffectReleaseTimeout
* Maximum amount of time Akka coordinated shutdown is allowed to wait for cats-effect to
* finish, typically when Akka initiates shutdown following a SBR decision. This value should
* not be higher than the actual timeout for `before-service-unbind` phase of Akka coordinated
* shutdown. See <a href="https://doc.akka.io/docs/akka/current/coordinated-shutdown.html">Akka
* coordinated shutdown documentation</a> to learn how to configure the timeouts of individual
* phases. Default (5 seconds) is the same as the default-phase-timeout of Akka coordinated
* shutdown.
* @param akkaReleaseTimeout
* Maximum amount of time to wait for the actor system to terminate during resource release (5
* seconds by default).
*/
def managedResource[F[_]: Async: Logger](
createActorSystem: => ActorSystem[?],
catsEffectReleaseTimeout: Duration = 5.seconds,
akkaReleaseTimeout: Duration = 5.seconds
): Resource[F, AkkaCluster[F]] =
Dispatcher
.parallel(await = true)
.flatMap(dispatcher =>
Resource[F, AkkaCluster[F]](for {
cluster <- createCluster(createActorSystem, dispatcher)
awaitCatsTermination <- Deferred[F, Unit]
_ <- Sync[F].delay {
CoordinatedShutdown(cluster.system)
.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "ce-resources-release") { () =>
dispatcher.unsafeToFuture(
awaitCatsTermination.get
.timeout(catsEffectReleaseTimeout)
.recoverWith { case ex: TimeoutException =>
Logger[F].error(ex)(
"Timed out during cluster shutdown while waiting for cats-effect resources release"
)
}
.as(Done)
)
}
}
} yield {
val release = for {
_ <- awaitCatsTermination.complete(())
_ <- Logger[F].info("Leaving Akka actor cluster")
_ <- Async[F]
.fromFuture(
Async[F].blocking(
CoordinatedShutdown(cluster.system)
.run(CoordinatedShutdown.ActorSystemTerminateReason)
)
)
.void
.timeoutAndForget(akkaReleaseTimeout)
.handleErrorWith(
Logger[F].error(_)(
"Timed out during cluster shutdown while waiting for actor system to terminate"
)
)
_ <- Logger[F].info("Akka cluster exited and actor system shutdown complete")
} yield ()
(cluster, release)
})
)

private def createCluster[F[_]: Async: Logger](
createActorSystem: => ActorSystem[?],
dispatcher: Dispatcher[F]
) = for {
system <- Sync[F].delay(createActorSystem)
_ <- Logger[F].info(s"Created actor system ${system.name}")
cluster <- Sync[F].delay(Cluster(system))
_ <- Logger[F].info(s"Created cluster extension for actor system ${system.name}")
sharding <- Sync[F].delay(ClusterSharding(system))
_ <- Logger[F].info(s"Created cluster sharding extension for actor system ${system.name}")
} yield AkkaCluster(system, dispatcher, cluster, sharding)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package endless.runtime.akka.deploy

import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext}
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.util.Timeout
import cats.effect.kernel.{Async, Resource}
import endless.core.entity.*
import endless.core.event.EventApplier
import endless.core.interpret.*
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.data.*
import AkkaDeployer.*
import endless.runtime.akka.deploy.internal.EventSourcedShardedRepositoryDeployer
import org.typelevel.log4cats.Logger
import endless.core.entity.Deployer
import endless.runtime.akka.ShardingCommandSender

trait AkkaDeployer extends Deployer {
type DeploymentParameters[F[_], _, S, E] = AkkaDeploymentParameters[F, S, E]
type Deployment[F[_], RepositoryAlg[_[_]]] = DeployedAkkaRepository[F, RepositoryAlg]

override def deployRepository[F[_]: Async, ID: EntityIDCodec, S, E, Alg[_[_]], RepositoryAlg[_[
_
]]](
repository: RepositoryInterpreter[F, ID, Alg, RepositoryAlg],
behavior: BehaviorInterpreter[F, S, E, Alg],
sideEffect: SideEffectInterpreter[F, S, Alg, RepositoryAlg]
)(implicit
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[ID, Alg],
eventApplier: EventApplier[S, E],
parameters: AkkaDeploymentParameters[F, S, E]
): Resource[F, DeployedAkkaRepository[F, RepositoryAlg]] = {
import parameters.*
implicit val sharding: ClusterSharding = akkaCluster.sharding
implicit val sender: CommandSender[F, ID] = ShardingCommandSender[F, ID]
for {
interpretedEntityAlg <- Resource.eval(behavior(EntityT.instance))
deployment <- new EventSourcedShardedRepositoryDeployer(
interpretedEntityAlg,
sideEffect,
parameters.customizeBehavior
).deployShardedRepository(repository, parameters.customizeEntity)
} yield {
val (repository, shardRegionActor) = deployment
DeployedAkkaRepository[F, RepositoryAlg](repository, shardRegionActor)
}
}

}

object AkkaDeployer {
final case class DeployedAkkaRepository[F[_], RepositoryAlg[_[_]]](
repository: RepositoryAlg[F],
shardRegionActor: ActorRef[ShardingEnvelope[Command]]
)

final case class AkkaDeploymentParameters[F[_], S, E](
customizeBehavior: (
EntityContext[Command],
EventSourcedBehavior[Command, E, Option[S]]
) => Behavior[Command] =
(_: EntityContext[Command], behavior: EventSourcedBehavior[Command, E, Option[S]]) =>
behavior,
customizeEntity: akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[
Command
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]] =
identity
)(implicit
val logger: Logger[F],
val akkaCluster: AkkaCluster[F],
val askTimeout: Timeout
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package endless.runtime.akka.deploy

import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext}
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.util.Timeout
import cats.effect.kernel.{Async, Resource}
import endless.core.entity.*
import endless.core.interpret.*
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.ShardingCommandSender
import endless.runtime.akka.data.*
import endless.runtime.akka.deploy.AkkaDurableDeployer.{
AkkaDurableDeploymentParameters,
DeployedAkkaDurableRepository
}
import endless.runtime.akka.deploy.internal.DurableShardedRepositoryDeployer
import org.typelevel.log4cats.Logger

trait AkkaDurableDeployer extends DurableDeployer {
type DurableDeploymentParameters[F[_], _, S] = AkkaDurableDeploymentParameters[F, S]
type DurableDeployment[F[_], RepositoryAlg[_[_]]] =
DeployedAkkaDurableRepository[F, RepositoryAlg]

override def deployDurableRepository[F[_]: Async, ID: EntityIDCodec, S, Alg[_[_]], RepositoryAlg[
_[_]
]](
repository: RepositoryInterpreter[F, ID, Alg, RepositoryAlg],
behavior: DurableBehaviorInterpreter[F, S, Alg],
sideEffect: SideEffectInterpreter[F, S, Alg, RepositoryAlg]
)(implicit
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[ID, Alg],
parameters: AkkaDurableDeploymentParameters[F, S]
): Resource[F, DeployedAkkaDurableRepository[F, RepositoryAlg]] = {
import parameters.*
implicit val sharding: ClusterSharding = akkaCluster.sharding
implicit val sender: CommandSender[F, ID] = ShardingCommandSender[F, ID]
for {
interpretedEntityAlg <- Resource.eval(behavior(DurableEntityT.instance))
deployment <- new DurableShardedRepositoryDeployer(
interpretedEntityAlg,
sideEffect,
parameters.customizeBehavior
).deployShardedRepository(repository, parameters.customizeEntity)
} yield {
val (repository, shardRegionActor) = deployment
DeployedAkkaDurableRepository[F, RepositoryAlg](repository, shardRegionActor)
}
}

}

object AkkaDurableDeployer {
final case class DeployedAkkaDurableRepository[F[_], RepositoryAlg[_[_]]](
repository: RepositoryAlg[F],
shardRegionActor: ActorRef[ShardingEnvelope[Command]]
)

final case class AkkaDurableDeploymentParameters[F[_], S](
customizeBehavior: (
EntityContext[Command],
DurableStateBehavior[Command, Option[S]]
) => Behavior[Command] =
(_: EntityContext[Command], behavior: DurableStateBehavior[Command, Option[S]]) => behavior,
customizeEntity: akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[
Command
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]] =
identity
)(implicit
val logger: Logger[F],
val akkaCluster: AkkaCluster[F],
val askTimeout: Timeout
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package endless.runtime.akka.deploy.internal

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.cluster.sharding.typed.scaladsl.EntityContext
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.scaladsl.{DurableStateBehavior, Effect}
import akka.persistence.typed.state.{RecoveryCompleted, RecoveryFailed}
import cats.effect.kernel.Async
import cats.effect.std.Dispatcher
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.show.*
import endless.core.entity.*
import endless.core.entity.SideEffect.RunMode
import endless.core.interpret.DurableEntityT.{DurableEntityT, State}
import endless.core.interpret.*
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.EntityPassivator
import endless.runtime.akka.data.*
import org.typelevel.log4cats.Logger

private[deploy] class DurableShardedRepositoryDeployer[F[
_
]: Async: Logger, S, ID: EntityIDCodec, Alg[_[
_
]], RepositoryAlg[_[_]]](
interpretedEntityAlg: Alg[DurableEntityT[F, S, *]],
sideEffectInterpreter: SideEffectInterpreter[F, S, Alg, RepositoryAlg],
customizeBehavior: (
EntityContext[Command],
DurableStateBehavior[Command, Option[S]]
) => Behavior[Command]
)(implicit
val nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[ID, Alg],
commandSender: CommandSender[F, ID]
) extends ShardedRepositoryDeployer[F, RepositoryAlg, Alg, ID] {

protected override def createBehaviorFor(repositoryAlg: RepositoryAlg[F])(implicit
dispatcher: Dispatcher[F],
actor: ActorContext[Command],
context: EntityContext[Command]
): Behavior[Command] = {
implicit val passivator: EntityPassivator[F] = dispatcher.unsafeRunSync(EntityPassivator[F])
implicit val repository: RepositoryAlg[F] = repositoryAlg
implicit val entity: Alg[F] =
Sharding[F, ID, Alg].entityFor(implicitly[EntityIDCodec[ID]].decode(context.entityId))
implicit val sideEffect: SideEffect[F, S, Alg] =
dispatcher.unsafeRunSync(sideEffectInterpreter(repository, entity))
customizeBehavior(
context,
DurableStateBehavior
.withEnforcedReplies[Command, Option[S]](
PersistenceId(entityTypeKey.name, context.entityId),
Option.empty[S],
commandHandler = handleCommand
)
.receiveSignal {
case (state, RecoveryCompleted) =>
dispatcher.unsafeRunSync(
Logger[F]
.info(show"Recovery of ${nameProvider()} entity ${context.entityId} completed")
)
handleSideEffect(state, SideEffect.Trigger.AfterRecovery)
case (_, RecoveryFailed(failure)) =>
dispatcher.unsafeRunSync(
Logger[F].warn(
show"Recovery of ${nameProvider()} entity ${context.entityId} failed with error ${failure.getMessage}"
)
)
}
)
}

private def handleSideEffect(
state: Option[S],
trigger: SideEffect.Trigger
)(implicit
sideEffect: SideEffect[F, S, Alg],
entity: Alg[F],
passivator: EntityPassivator[F],
dispatcher: Dispatcher[F]
): Unit = {
val effect = for {
effector <- Effector[F, S, Alg](entity, state)
_ <- sideEffect.apply(trigger, effector)
passivationState <- effector.passivationState
_ <- passivator.apply(passivationState)
} yield ()
dispatcher.unsafeRunSync(sideEffect.runModeFor(trigger, state)) match {
case RunMode.Sync => dispatcher.unsafeRunSync(effect)
case RunMode.Async => dispatcher.unsafeRunAndForget(effect)
}
}

private def handleCommand(state: Option[S], command: Command)(implicit
entity: Alg[F],
dispatcher: Dispatcher[F],
passivator: EntityPassivator[F],
sideEffect: SideEffect[F, S, Alg]
) = {
val incomingCommand =
commandProtocol.server[DurableEntityT[F, S, *]].decode(command.payload)
val effect = Logger[F].debug(
show"Handling command for ${nameProvider()} entity ${command.id}"
) >> incomingCommand
.runWith(interpretedEntityAlg)
.run(state match {
case Some(value) => DurableEntityT.State.Existing(value)
case None => DurableEntityT.State.None
})
.flatMap { case (outcome, reply) =>
(outcome match {
case State.None => Effect.none
case State.Existing(_) => Effect.none
case State.Updated(state) => Effect.persist(Option(state))
})
.thenRun((state: Option[S]) =>
handleSideEffect(
state,
outcome match {
case State.None => SideEffect.Trigger.AfterRead
case State.Existing(_) => SideEffect.Trigger.AfterRead
case State.Updated(_) => SideEffect.Trigger.AfterPersistence
}
)
)
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
}
dispatcher.unsafeRunSync(effect)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package endless.runtime.akka.deploy.internal

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.cluster.sharding.typed.scaladsl.EntityContext
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
import akka.persistence.typed.{PersistenceId, RecoveryCompleted, RecoveryFailed}
import cats.effect.kernel.Async
import cats.effect.std.Dispatcher
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.show.*
import endless.core.entity.SideEffect.RunMode
import endless.core.entity.{Effector, EntityNameProvider, Sharding, SideEffect}
import endless.core.event.EventApplier
import endless.core.interpret.{EntityT, SideEffectInterpreter}
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.EntityPassivator
import endless.runtime.akka.data.{Command, Reply}
import endless.runtime.akka.deploy.internal.EventSourcedShardedRepositoryDeployer.*
import org.typelevel.log4cats.Logger

private[deploy] class EventSourcedShardedRepositoryDeployer[F[
_
]: Async: Logger, S, E, ID: EntityIDCodec, Alg[_[_]], RepositoryAlg[_[_]]](
interpretedEntityAlg: Alg[EntityT[F, S, E, *]],
sideEffectInterpreter: SideEffectInterpreter[F, S, Alg, RepositoryAlg],
customizeBehavior: (
EntityContext[Command],
EventSourcedBehavior[Command, E, Option[S]]
) => Behavior[Command]
)(implicit
val nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[ID, Alg],
commandSender: CommandSender[F, ID],
eventApplier: EventApplier[S, E]
) extends ShardedRepositoryDeployer[F, RepositoryAlg, Alg, ID] {

protected override def createBehaviorFor(repositoryAlg: RepositoryAlg[F])(implicit
dispatcher: Dispatcher[F],
actor: ActorContext[Command],
context: EntityContext[Command]
): Behavior[Command] = {
implicit val passivator: EntityPassivator[F] = dispatcher.unsafeRunSync(EntityPassivator[F])
implicit val entity: Alg[F] =
Sharding[F, ID, Alg].entityFor(implicitly[EntityIDCodec[ID]].decode(context.entityId))
implicit val repository: RepositoryAlg[F] = repositoryAlg
implicit val sideEffect: SideEffect[F, S, Alg] =
dispatcher.unsafeRunSync(sideEffectInterpreter(repository, entity))
customizeBehavior(
context,
EventSourcedBehavior
.withEnforcedReplies[Command, E, Option[S]](
PersistenceId(entityTypeKey.name, context.entityId),
Option.empty[S],
commandHandler = handleCommand,
eventHandler = handleEvent
)
.receiveSignal {
case (state, RecoveryCompleted) =>
dispatcher.unsafeRunSync(
Logger[F]
.info(show"Recovery of ${nameProvider()} entity ${context.entityId} completed")
)
handleSideEffect(state, SideEffect.Trigger.AfterRecovery)
case (_, RecoveryFailed(failure)) =>
dispatcher.unsafeRunSync(
Logger[F].warn(
show"Recovery of ${nameProvider()} entity ${context.entityId} failed with error ${failure.getMessage}"
)
)
}
)
}

private def handleSideEffect(
state: Option[S],
trigger: SideEffect.Trigger
)(implicit
sideEffect: SideEffect[F, S, Alg],
entity: Alg[F],
passivator: EntityPassivator[F],
dispatcher: Dispatcher[F]
): Unit = {
val effect = for {
effector <- Effector[F, S, Alg](entity, state)
_ <- sideEffect.apply(trigger, effector)
passivationState <- effector.passivationState
_ <- passivator.apply(passivationState)
} yield ()
dispatcher.unsafeRunSync(sideEffect.runModeFor(trigger, state)) match {
case RunMode.Sync => dispatcher.unsafeRunSync(effect)
case RunMode.Async => dispatcher.unsafeRunAndForget(effect)
}
}

private def handleEvent(state: Option[S], event: E)(implicit dispatcher: Dispatcher[F]) =
eventApplier.apply(state, event) match {
case Left(error) =>
dispatcher.unsafeRunSync(Logger[F].warn(error))
throw new EventApplierException(error)
case Right(newState) => newState
}

private def handleCommand(state: Option[S], command: Command)(implicit
entity: Alg[F],
dispatcher: Dispatcher[F],
passivator: EntityPassivator[F],
sideEffect: SideEffect[F, S, Alg]
) = {
val incomingCommand =
commandProtocol.server[EntityT[F, S, E, *]].decode(command.payload)
val effect = Logger[F].debug(
show"Handling command for ${nameProvider()} entity ${command.id}"
) >> incomingCommand
.runWith(interpretedEntityAlg)
.run(state)
.flatMap {
case Left(error) =>
Logger[F].warn(error) >> Effect.unhandled[E, Option[S]].thenNoReply().pure[F]
case Right((events, reply)) if events.nonEmpty =>
Effect
.persist(events.toList)
.thenRun((state: Option[S]) =>
handleSideEffect(state, SideEffect.Trigger.AfterPersistence)
)
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
case Right((_, reply)) =>
Effect
.none[E, Option[S]]
.thenRun((state: Option[S]) => handleSideEffect(state, SideEffect.Trigger.AfterRead))
.thenReply[Reply](command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
}
dispatcher.unsafeRunSync(effect)
}
}

private[deploy] object EventSourcedShardedRepositoryDeployer {
final class EventApplierException(error: String) extends RuntimeException(error)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package endless.runtime.akka.deploy.internal

import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, EntityTypeKey}
import akka.util.Timeout
import cats.effect.kernel.{Async, Resource}
import cats.effect.std.Dispatcher
import endless.core.entity.*
import endless.core.interpret.RepositoryInterpreter
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDEncoder}
import endless.runtime.akka.ShardingCommandSender
import endless.runtime.akka.data.*
import endless.runtime.akka.deploy.AkkaCluster
import org.typelevel.log4cats.Logger

trait ShardedRepositoryDeployer[F[_], RepositoryAlg[_[_]], Alg[_[_]], ID] {
implicit def nameProvider: EntityNameProvider[ID]
protected lazy val entityTypeKey: EntityTypeKey[Command] = EntityTypeKey[Command](nameProvider())

def deployShardedRepository(
repositoryInterpreter: RepositoryInterpreter[F, ID, Alg, RepositoryAlg],
customizeEntity: akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[
Command
]] => akka.cluster.sharding.typed.scaladsl.Entity[Command, ShardingEnvelope[Command]]
)(implicit
akkaCluster: AkkaCluster[F],
entityIDEncoder: EntityIDEncoder[ID],
async: Async[F],
logger: Logger[F],
askTimeout: Timeout,
commandProtocol: CommandProtocol[ID, Alg]
): Resource[F, (RepositoryAlg[F], ActorRef[ShardingEnvelope[Command]])] = {
implicit val clusterSharding: ClusterSharding = akkaCluster.sharding
implicit val commandSender: CommandSender[F, ID] = ShardingCommandSender.apply
val repository = Sharding.apply[F, ID, Alg]
repositoryInterpreter(repository).map(repository => {
implicit val dispatcher: Dispatcher[F] = akkaCluster.dispatcher
val akkaEntity =
akka.cluster.sharding.typed.scaladsl.Entity(entityTypeKey) { implicit context =>
Behaviors.setup { implicit actor => createBehaviorFor(repository) }
}
(repository, akkaCluster.sharding.init(customizeEntity(akkaEntity)))
})
}

protected def createBehaviorFor(repository: RepositoryAlg[F])(implicit
dispatcher: Dispatcher[F],
actor: ActorContext[Command],
context: EntityContext[Command]
): Behavior[Command]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package endless.runtime.akka.protobuf

import akka.actor.ExtendedActorSystem
import akka.serialization.BaseSerializer
import scalapb.GeneratedMessageCompanion

import java.util.concurrent.atomic.AtomicReference

/*
Akka serializer making use of scalapb-generated classes for protobuf serialization
Inspired by https://gist.github.com/thesamet/5d0349b40d3dc92859a1a2eafba448d5
*/
@SuppressWarnings(
Array(
"org.wartremover.warts.AsInstanceOf",
"org.wartremover.warts.Equals",
"org.wartremover.warts.Null"
)
)
class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private val classToCompanionMapRef =
new AtomicReference[Map[Class[?], GeneratedMessageCompanion[?]]](Map.empty)

override def toBinary(o: AnyRef): Array[Byte] = o match {
case e: scalapb.GeneratedMessage => e.toByteArray
case _ => throw new IllegalArgumentException("Need a subclass of scalapb.GeneratedMessage")
}

override def includeManifest: Boolean = true

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[?]]): AnyRef =
manifest match {
case Some(clazz) =>
// noinspection ScalaStyle
@scala.annotation.tailrec
def messageCompanion(
companion: GeneratedMessageCompanion[?] = null
): GeneratedMessageCompanion[?] = {
val classToCompanion = classToCompanionMapRef.get()
classToCompanion.get(clazz) match {
case Some(cachedCompanion) => cachedCompanion
case None =>
val uncachedCompanion =
if (companion eq null)
Class
.forName(clazz.getName + "$", true, clazz.getClassLoader)
.getField("MODULE$")
.get(())
.asInstanceOf[GeneratedMessageCompanion[?]]
else companion
if (
classToCompanionMapRef.compareAndSet(
classToCompanion,
classToCompanion.updated(clazz, uncachedCompanion)
)
)
uncachedCompanion
else
messageCompanion(uncachedCompanion)
}
}
messageCompanion().parseFrom(bytes).asInstanceOf[AnyRef]
case _ =>
throw new IllegalArgumentException(
"Need a ScalaPB companion class to be able to deserialize."
)
}
}
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ package endless.runtime.akka.serializer
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.serialization.{BaseSerializer, SerializerWithStringManifest}
import cats.syntax.show._
import cats.syntax.show.*
import com.google.protobuf.ByteString
import endless.runtime.akka.data.Command
import endless.runtime.akka.serializer.CommandSerializer.ManifestKey
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package endless.runtime.akka

import endless.runtime.akka.deploy.{AkkaDeployer, AkkaDurableDeployer}

package object syntax {
object deploy extends AkkaDeployer with AkkaDurableDeployer
}
18 changes: 18 additions & 0 deletions akka-runtime/src/test/protobuf/dummy.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

import "scalapb/scalapb.proto";

option (scalapb.options) = {
scala3_sources: true
};

package endless.protobuf.test.proto;

message DummyCommand {
string x = 1;
int32 y = 2;
}

message DummyReply {
bool ok = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package endless.runtime.akka.protobuf

import akka.actor.ActorSystem
import akka.serialization.{Serialization, SerializationExtension}
import com.typesafe.config.ConfigFactory
import endless.protobuf.test.proto.dummy.DummyCommand

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

class ScalaPbSerializerSuite extends munit.FunSuite {
val akkaSerialization = FunFixture[Serialization](
setup = { _ =>
val system = ActorSystem(
"ScalaPbSerializerSpec",
ConfigFactory.parseString(ScalaPbSerializerSuite.serializationConfig)
)
SerializationExtension(system)
},
teardown = { fixture => Await.result(fixture.system.terminate(), 30.seconds) }
)

akkaSerialization.test(
"be found by akka serialization extension when asked for it for test type"
) { serialization =>
val clasz = serialization.serializerFor(classOf[DummyCommand]).getClass
assert(clasz == classOf[ScalaPbSerializer])
}

akkaSerialization.test("serialize and deserialize event correctly") { serialization =>
val dummy = DummyCommand("dummy")
val serialized = serialization.serialize(dummy)
assert(serialized.isSuccess)
val deserialized = serialization.deserialize(serialized.get, classOf[DummyCommand])
assert(deserialized.isSuccess)
assertEquals(deserialized.get, dummy)
}
}

object ScalaPbSerializerSuite {
val serializationConfig: String =
"""
|akka {
| actor {
| provider = local
|
| serializers {
| scalapb = "endless.runtime.akka.protobuf.ScalaPbSerializer"
| }
|
| serialization-bindings {
| "scalapb.GeneratedMessage" = scalapb
| }
|
| serialization-identifiers {
| "endless.runtime.akka.protobuf.ScalaPbSerializer" = 1111
| }
| }
| coordinated-shutdown.exit-jvm = off
|}
""".stripMargin
}
133 changes: 103 additions & 30 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
import Dependencies._
import Dependencies.*
import sbtversionpolicy.Compatibility.None

val scala213 = "2.13.15"
val scala3 = "3.5.1"

val commonSettings = Seq(
scalacOptions ++= Seq("-Xfatal-warnings"),
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full),
wartremoverExcluded += sourceManaged.value,
Compile / compile / wartremoverErrors ++= Warts
.allBut(Wart.Any, Wart.Nothing, Wart.ImplicitParameter, Wart.Throw, Wart.DefaultArguments),
coverageExcludedPackages := "<empty>;endless.test.*"
Compile / compile / wartremoverErrors ++= Warts.allBut(
Wart.Any,
Wart.Nothing,
Wart.ImplicitParameter,
Wart.Throw,
Wart.DefaultArguments
),
scalaVersion := scala213,
crossScalaVersions := Seq(scala213, scala3),
libraryDependencies ++= (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, _)) =>
Seq(compilerPlugin("org.typelevel" % "kind-projector" % "0.13.3" cross CrossVersion.full))
case _ => Nil
}),
Compile / scalacOptions ++= Seq("-Xfatal-warnings"),
Compile / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((3, _)) => Seq("-Xkind-projector:underscores")
case Some((2, _)) =>
Seq("-Xsource:3", "-P:kind-projector:underscore-placeholders", "-Xlint:unused")
case _ => Nil
})
)

inThisBuild(
@@ -26,57 +46,84 @@ inThisBuild(
sonatypeProjectHosting := Some(
xerial.sbt.Sonatype.GitHubHosting("endless4s", "endless", "me@jonaschapuis.com")
),
scalaVersion := "2.13.7",
Global / onChangedBuildSource := ReloadOnSourceChanges,
PB.protocVersion := "3.17.3", // works on Apple Silicon,
versionPolicyIntention := Compatibility.None,
versionScheme := Some("early-semver"),
versionPolicyIgnoredInternalDependencyVersions := Some(
"^\\d+\\.\\d+\\.\\d+\\+\\d+".r
) // Support for versions generated by sbt-dynver
), // Support for versions generated by sbt-dynver
coverageExcludedPackages := "<empty>;endless.test.*"
)
)

lazy val core = (project in file("core"))
.settings(commonSettings: _*)
.settings(commonSettings *)
.settings(
libraryDependencies ++= cats ++ catsTagless ++ log4cats ++ (catsLaws ++ catsTestkit ++ mUnit)
.map(_ % Test)
libraryDependencies ++= cats ++ catsEffectKernel ++ log4cats ++
(catsLaws ++ catsEffectLaws ++ catsEffect ++ catsTestkit ++ catsEffectTestKit ++ mUnit ++ catsEffectMUnit ++ scalacheckEffect ++ kittens)
.map(_ % Test)
)
.settings(name := "endless-core")

lazy val runtime = (project in file("runtime"))
lazy val akkaRuntime = (project in file("akka-runtime"))
.settings(commonSettings *)
.dependsOn(core)
.settings(commonSettings: _*)
.settings(libraryDependencies ++= catsEffectStd ++ akkaProvided ++ log4cats)
.settings(
libraryDependencies ++= catsEffectStd ++ akkaProvided ++ log4cats ++ scalapbCustomizations ++ (mUnit :+ akkaTypedTestkit % akkaVersion)
.map(_ % Test)
)
.settings(
Project.inConfig(Test)(sbtprotoc.ProtocPlugin.protobufConfigSettings),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
),
Test / PB.targets := Seq(
scalapb.gen() -> (Test / sourceManaged).value / "scalapb"
)
)
.settings(name := "endless-runtime-akka")

lazy val pekkoRuntime = (project in file("pekko-runtime"))
.settings(commonSettings *)
.dependsOn(core)
.settings(
libraryDependencies ++= catsEffectStd ++ pekkoProvided ++ log4cats ++ scalapbCustomizations ++ (mUnit ++ logback :+ pekkoTypedTestkit % pekkoVersion)
.map(_ % Test)
)
.settings(
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
),
Test / PB.targets := Seq(
scalapb.gen() -> (Test / sourceManaged).value / "scalapb"
)
)
.settings(name := "endless-runtime-pekko")

lazy val circeHelpers = (project in file("circe"))
.settings(commonSettings *)
.dependsOn(core)
.settings(commonSettings: _*)
.settings(
libraryDependencies ++= (circe :+ (akkaActorTyped % akkaVersion)) ++ mUnit.map(_ % Test)
libraryDependencies ++= circe ++ mUnit.map(_ % Test)
)
.settings(name := "endless-circe-helpers")

lazy val scodecHelpers = (project in file("scodec"))
.settings(commonSettings *)
.settings(scalaVersion := scala3, crossScalaVersions := Nil) // only scala3 for scodec
.dependsOn(core)
.settings(commonSettings: _*)
.settings(libraryDependencies ++= scodecCore ++ mUnit.map(_ % Test))
.settings(name := "endless-scodec-helpers")

lazy val protobufHelpers = (project in file("protobuf"))
.settings(commonSettings *)
.dependsOn(core)
.settings(commonSettings: _*)
.settings(libraryDependencies ++= mUnit.map(_ % Test))
.settings(
libraryDependencies ++= scalapbCustomizations ++ mUnit.map(_ % Test)
)
.settings(name := "endless-protobuf-helpers")
.settings(
Project.inConfig(Test)(sbtprotoc.ProtocPlugin.protobufConfigSettings),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
),
@@ -86,37 +133,61 @@ lazy val protobufHelpers = (project in file("protobuf"))
)

lazy val example = (project in file("example"))
.dependsOn(core, runtime, circeHelpers)
.settings(commonSettings: _*)
.settings(commonSettings *)
.dependsOn(core, akkaRuntime, pekkoRuntime, circeHelpers, protobufHelpers)
.settings(
libraryDependencies ++= catsEffect ++ http4s ++ akka ++ akkaTest ++ logback ++ log4catsSlf4j ++ (mUnit ++ catsEffectMUnit ++ scalacheckEffect ++ log4catsTesting)
libraryDependencies ++= catsEffect ++ http4s ++ blaze ++ akka ++ pekko ++ scalapbCustomizations ++ akkaTest ++ pekkoTest ++ logback ++ (mUnit ++ catsEffectMUnit ++ scalacheckEffect ++ log4catsTesting)
.map(_ % Test)
)
.settings(name := "endless-example", run / fork := true, publish / skip := true)
.settings(
Project.inConfig(Test)(sbtprotoc.ProtocPlugin.protobufConfigSettings),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
),
Test / PB.targets := Seq(
scalapb.gen() -> (Test / sourceManaged).value / "scalapb"
)
)

// Generate API documentation per module, as documented in https://www.scala-sbt.org/sbt-site/api-documentation.html#scaladoc-from-multiple-projects

// Create one configuration per module
val Core = config("core")
val Protobuf = config("protobuf")
val Circe = config("circe")
val Runtime = config("runtime")
val Scodec = config("scodec")
val AkkaRuntime = config("akka-runtime")
val PekkoRuntime = config("pekko-runtime")

// For each module define its package prefix and path in documentation API
val scaladocSiteProjects = List(
core -> (Core, "endless", "core"),
protobufHelpers -> (Protobuf, "endless.protobuf", "protobuf"),
scodecHelpers -> (Scodec, "endless.scodec", "scodec"),
circeHelpers -> (Circe, "endless.circe", "circe"),
runtime -> (Runtime, "endless.runtime", "runtime")
akkaRuntime -> (AkkaRuntime, "endless.runtime.akka", "akka-runtime"),
pekkoRuntime -> (PekkoRuntime, "endless.runtime.pekko", "pekko-runtime")
)

lazy val documentation = (project in file("documentation"))
.enablePlugins(ParadoxMaterialThemePlugin, ParadoxPlugin, ParadoxSitePlugin, SiteScaladocPlugin)
.enablePlugins(
ParadoxMaterialThemePlugin,
SitePreviewPlugin,
ParadoxPlugin,
ParadoxSitePlugin,
SiteScaladocPlugin
)
.settings(
paradoxProperties ++= (
scaladocSiteProjects.map { case (_, (_, pkg, path)) =>
s"scaladoc.${pkg}.base_url" -> s"api/${path}"
}.toMap
),
paradoxProperties += ("akka.min.version" -> akkaVersion),
paradoxProperties ++= List(
("akka.min.version" -> akkaVersion),
("pekko.min.version" -> pekkoVersion)
).toMap,
scaladocSiteProjects.flatMap { case (project, (conf, _, path)) =>
SiteScaladocPlugin.scaladocSettings(
conf,
@@ -146,9 +217,11 @@ lazy val documentation = (project in file("documentation"))

lazy val root = project
.in(file("."))
.aggregate(core, runtime, circeHelpers, scodecHelpers, protobufHelpers, example)
.aggregate(core, akkaRuntime, pekkoRuntime, circeHelpers, scodecHelpers, protobufHelpers, example)
.dependsOn(example)
.settings(Compile / mainClass := (example / Compile / mainClass).value)
.settings(commonSettings: _*)
.settings(commonSettings *)
.settings(crossScalaVersions := Nil)
.settings(publish / skip := true)
.settings(name := "endless")

Compile / mainClass := Some("endless.example.app.pekko.Main")
12 changes: 6 additions & 6 deletions circe/src/main/scala/endless/circe/CirceCommandProtocol.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package endless.circe

import endless.core.protocol.{CommandProtocol, IncomingCommand, OutgoingCommand}
import endless.core.protocol.{CommandProtocol, CommandSender, IncomingCommand}

trait CirceCommandProtocol[Alg[_[_]]] extends CommandProtocol[Alg] {
protected def outgoingCommand[C: io.circe.Encoder, R: io.circe.Decoder](
command: C
): OutgoingCommand[R] = CirceOutgoingCommand(command)
trait CirceCommandProtocol[ID, Alg[_[_]]] extends CommandProtocol[ID, Alg] {
protected def sendCommand[F[_], C: io.circe.Encoder, R: io.circe.Decoder](id: ID, command: C)(
implicit sender: CommandSender[F, ID]
): F[R] = CommandProtocol.sendCommand(id, new CirceOutgoingCommand[C, R](command))

protected def incomingCommand[F[_], R: io.circe.Encoder](
protected def handleCommand[F[_], R: io.circe.Encoder](
run: Alg[F] => F[R]
): IncomingCommand[F, Alg] = CirceIncomingCommand(run)
}
2 changes: 1 addition & 1 deletion circe/src/main/scala/endless/circe/CirceDecoder.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package endless.circe

import cats.syntax.either._
import cats.syntax.either.*
import endless.circe.CirceDecoder.{DecodingException, ParsingException}
import endless.core.protocol.Decoder
import io.circe.parser.parse
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package endless.circe

import endless.circe.CirceEncoder._
import endless.circe.CirceEncoder.*
import endless.core.protocol.IncomingCommand
import io.circe.Encoder

5 changes: 0 additions & 5 deletions circe/src/main/scala/endless/circe/CirceOutgoingCommand.scala
Original file line number Diff line number Diff line change
@@ -8,8 +8,3 @@ final class CirceOutgoingCommand[C, +R: io.circe.Decoder](command: C)(implicit
def payload: Array[Byte] = CirceEncoder[C].encode(command)
def replyDecoder: Decoder[R] = CirceDecoder[R]
}

object CirceOutgoingCommand {
def apply[C: io.circe.Encoder, R: io.circe.Decoder](command: C): CirceOutgoingCommand[C, R] =
new CirceOutgoingCommand(command)
}

This file was deleted.

32 changes: 18 additions & 14 deletions circe/src/test/scala/endless/circe/CirceCommandProtocolSuite.scala
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
package endless.circe
import cats.Id
import cats.syntax.functor._
import endless.core.protocol.{Decoder, IncomingCommand, OutgoingCommand}
import io.circe.generic.auto._
import endless.core.protocol.{CommandSender, Decoder, IncomingCommand}
import io.circe.generic.auto.*
import org.scalacheck.Prop.forAll

class CirceCommandProtocolSuite extends munit.ScalaCheckSuite {
test("circe command protocol") {
forAll { (int: Int, str: String, reply: Boolean) =>
val outgoingCommand = protocol.client.dummy(int, str)
val incomingCommand = protocol.server[Id].decode(outgoingCommand.payload)
val encodedReply = incomingCommand
.runWith((_: Int, _: String) => reply)
.map(incomingCommand.replyEncoder.encode)
assertEquals(outgoingCommand.replyDecoder.decode(encodedReply), reply)
forAll { (int: Int, str: String, reply: Boolean, id: ID) =>
implicit val sender: CommandSender[Id, ID] = localCommandSenderWith(reply)
val actual = dummyProtocol.clientFor(id).dummy(int, str)
assertEquals(actual, reply)
}
}

val protocol = new CirceCommandProtocol[DummyAlg] {
val dummyProtocol = new CirceCommandProtocol[ID, DummyAlg] {
def server[F[_]]: Decoder[IncomingCommand[F, DummyAlg]] =
CirceDecoder[DummyCommand].map { case DummyCommand(x, y) =>
incomingCommand[F, Boolean](_.dummy(x, y))
handleCommand[F, Boolean](_.dummy(x, y))
}

def client: DummyAlg[OutgoingCommand[*]] = (x: Int, y: String) =>
outgoingCommand[DummyCommand, Boolean](DummyCommand(x, y))
def clientFor[F[_]](id: ID)(implicit sender: CommandSender[F, ID]): DummyAlg[F] =
(x: Int, y: String) => sendCommand[F, DummyCommand, Boolean](id, DummyCommand(x, y))
}

def localCommandSenderWith(reply: Boolean): CommandSender[Id, ID] = CommandSender.local(
dummyProtocol,
new DummyAlg[Id] {
def dummy(x: Int, y: String): Id[Boolean] = reply
}
)

trait DummyAlg[F[_]] {
def dummy(x: Int, y: String): F[Boolean]
}
case class DummyCommand(x: Int, y: String)
type ID = String
}
6 changes: 6 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
comment:
layout: "reach, diff, files"
behavior: default
require_changes: false
require_base: false
require_head: true
3 changes: 1 addition & 2 deletions core/src/main/scala/endless/core/data/EventsFolder.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package endless.core.data

import cats.Foldable
import cats.syntax.foldable._
import cats.syntax.foldable.*
import endless.\/
import endless.core.event.EventApplier

@@ -16,7 +16,6 @@ import endless.core.event.EventApplier
* event
*/
final case class EventsFolder[S, E](state: Option[S], applier: EventApplier[S, E]) {
@SuppressWarnings(Array("org.wartremover.warts.OptionPartial"))
def applyOnFoldable[G[_]: Foldable](foldable: G[E]): String \/ Option[S] =
foldable.foldM(state)(applier.apply)
}
105 changes: 105 additions & 0 deletions core/src/main/scala/endless/core/entity/Deployer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package endless.core.entity

import cats.effect.kernel.{Async, Resource}
import endless.core.event.EventApplier
import endless.core.interpret.{SideEffectInterpreter, BehaviorInterpreter, RepositoryInterpreter}
import endless.core.protocol.{CommandProtocol, EntityIDCodec}

/** `Deployer` deploys event-sourced entity repositories by assembling required interpreters and
* components.
*/
trait Deployer {

/** Platform-specific deployment parameters: the final type is to be specified in implementations
* @tparam F
* effect type
* @tparam ID
* entity ID
* @tparam S
* entity state
* @tparam E
* entity event
*/
type DeploymentParameters[F[_], ID, S, E]

/** Handle on a deployed repository: the final type is to be specified in implementations
* @tparam F
* effect type
* @tparam RepositoryAlg
* repository algebra
*/
type Deployment[F[_], RepositoryAlg[_[_]]]

/** Deploys an event-sourced entity repository in context `F`, returning an instance of
* implementation-specific `Deployment` typed with the Repository algebra, wrapped in a resource
* (since deployments typically require finalization).
*
* Repository operation is defined by the interpreted repository, behavior and side-effect
* algebras, following a strictly defined sequence:
* - the interpreted repository is used to create a handle on the entity with the specified ID
* implementing the entity algebra, so that the caller can interact with it
* - when a function of the entity algebra is invoked, this invocation is serialized using the
* `commandProtocol` and sent over the wire thanks to `CommandSender`. On the receiving node,
* the message is decoded and run with the provided `behavior` interpreter: this typically
* involves reading the entity state (e.g. for validation), and writing events (which can
* lead to a new version of the state via the `eventApplier` function)
* - after events are written, a possible side-effect is triggered: this is by default
* asynchronous (i.e. the function doesn't wait for completion of the side-effect to return)
* but can be made synchronous by overriding the `runModeFor` method in the `SideEffect`
* - the function finally returns to the caller with the result of the operation described by
* the entity algebra (reply value, typically encoded over the wire in a distributed
* deployment)
*
* This interaction pattern occurs with "actor-like" semantics: all calls on the entity are
* processed in sequence.
*
* The function is parameterized with the context `F` and the various involved types: `S` for
* entity state, `E` for events, `ID` for entity ID and `Alg` & `RepositoryAlg` for entity and
* repository algebras respectively (both higher-kinded type constructors).
*
* Since the behavior described above involves concurrent handling of repository interactions and
* asynchronous side-effecting, we expect `Async` from `F`.
*
* `EntityIDCodec` is used to encode/decode entity IDs to/from strings.
*
* @param repository
* interpreter for the repository algebra (used to "materialize" the repository)
* @param behavior
* interpreter for the behavior algebra (used to "materialize" the behavior)
* @param sideEffect
* interpreter for the side-effect algebra (used to "materialize" the side-effect)
* @param nameProvider
* provides a name for the entity (in other words, the "type of entity", e.g. "booking")
* @param commandProtocol
* protocol-centric definition of entity algebra: defines a wire encoding for interactions with
* remote entities
* @param eventApplier
* defines how events are applied to the entity state
* @param parameters
* platform-specific deployment parameters
* @tparam F
* effect type
* @tparam ID
* entity ID
* @tparam S
* entity state
* @tparam E
* event type
* @tparam Alg
* entity algebra
* @tparam RepositoryAlg
* repository algebra
* @return
* a resource encapsulating access to the deployed repository algebra
*/
def deployRepository[F[_]: Async, ID: EntityIDCodec, S, E, Alg[_[_]], RepositoryAlg[_[_]]](
repository: RepositoryInterpreter[F, ID, Alg, RepositoryAlg],
behavior: BehaviorInterpreter[F, S, E, Alg],
sideEffect: SideEffectInterpreter[F, S, Alg, RepositoryAlg]
)(implicit
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[ID, Alg],
eventApplier: EventApplier[S, E],
parameters: DeploymentParameters[F, ID, S, E]
): Resource[F, Deployment[F, RepositoryAlg]]
}
106 changes: 106 additions & 0 deletions core/src/main/scala/endless/core/entity/DurableDeployer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package endless.core.entity

import cats.effect.kernel.{Async, Resource}
import endless.core.interpret.{
DurableBehaviorInterpreter,
SideEffectInterpreter,
RepositoryInterpreter
}
import endless.core.protocol.{CommandProtocol, EntityIDCodec}

/** `DurableDeployer` deploys durable entity repositories by assembling the required interpreters
* and components.
*/
trait DurableDeployer {

/** Platform-specific deployment parameters: the final type is to be specified in implementations
*
* @tparam F
* effect type
* @tparam ID
* entity ID
* @tparam S
* entity state
*/
type DurableDeploymentParameters[F[_], ID, S]

/** Handle on a deployed repository: the final type is to be specified in implementations
*
* @tparam F
* effect type
* @tparam RepositoryAlg
* repository algebra
*/
type DurableDeployment[F[_], RepositoryAlg[_[_]]]

/** Deploys a durable entity repository in context `F`, returning an instance of
* implementation-specific `Deployment` typed with the Repository algebra, wrapped in a resource
* (since deployments typically require finalization).
*
* Repository operation uses the the interpreted repository, behavior and side-effect algebras,
* following a strictly defined sequence:
* - the interpreted repository is used to create a handle on the entity with the specified ID
* implementing the entity algebra, so that the caller can interact with it
* - when a function of the entity algebra is invoked, this invocation is serialized using the
* `commandProtocol` and sent over the wire thanks to `CommandSender`. On the receiving node,
* the message is decoded and run with the provided `behavior` interpreter: this typically
* involves reading the entity state (e.g. for validation), and writing events (which can
* lead to a new version of the state via the `eventApplier` function)
* - after events were written, a possible side-effect is triggered: this can be asynchronous
* (i.e. the function doesn't wait for completion of the side-effect to return) but can be
* made synchronous by overriding the `runModeFor` method in the `SideEffect`
* - the function finally returns to the caller with the result of the operation described by
* the entity algebra (reply value, typically encoded over the wire in a distributed
* deployment)
*
* This interaction pattern occurs with "actor-like" semantics: all calls on the entity are
* processed in sequence.
*
* The function is parameterized with the context `F` and the various involved types: `S` for
* entity state, `ID` for entity ID and `Alg` & `RepositoryAlg` for entity and repository
* algebras respectively (both higher-kinded type constructors).
*
* Since the behavior described above involves concurrent handling of repository interactions and
* possible asynchronous side-effecting, we expect `Async` from `F`.
*
* `EntityIDCodec` is used to encode/decode entity IDs to/from strings.
*
* @param repository
* interpreter for the repository algebra (used to "materialize" the repository)
* @param behavior
* interpreter for the behavior algebra (used to "materialize" the behavior)
* @param sideEffect
* interpreter for the side-effect algebra (used to "materialize" the side-effect)
* @param nameProvider
* provides a name for the entity (in other words, the "type of entity", e.g. "booking")
* @param commandProtocol
* protocol-centric definition of entity algebra: defines a wire encoding for interactions with
* remote entities
* @param parameters
* platform-specific deployment parameters
* @tparam F
* effect type
* @tparam ID
* entity ID
* @tparam S
* entity state
* @tparam Alg
* entity algebra
* @tparam RepositoryAlg
* repository algebra
* @return
* a resource encapsulating access to the deployed repository algebra
*/
def deployDurableRepository[F[_]: Async, ID: EntityIDCodec, S, Alg[_[_]], RepositoryAlg[
_[_]
]](
repository: RepositoryInterpreter[F, ID, Alg, RepositoryAlg],
behavior: DurableBehaviorInterpreter[F, S, Alg],
sideEffect: SideEffectInterpreter[F, S, Alg, RepositoryAlg]
)(implicit
nameProvider: EntityNameProvider[ID],
commandProtocol: CommandProtocol[ID, Alg],
parameters: DurableDeploymentParameters[F, ID, S]
): Resource[F, DurableDeployment[F, RepositoryAlg]]

}
24 changes: 24 additions & 0 deletions core/src/main/scala/endless/core/entity/DurableEntity.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package endless.core.entity

/** `DurableEntity[F, S]` is the ability to read and write an entity state of type `S` together with
* the ability to compose such dependent effectful functions.
*
* These dual reader/writer abilities are what is needed to describe command handler behavior. This
* model enables a stateful entity to store the full state after processing each command instead of
* using event sourcing. When interpreting code involving `DurableEntity`, the final resulting
* value in the monadic chain is typically understood as the reply, and any written state is
* persisted behind the scenes by the runtime. Read always provides the state as it was last
* written.
*
* @tparam F
* context
* @tparam S
* state
*
* @see
* `Entity` for the event-sourcing equivalent
*/
trait DurableEntity[F[_], S]
extends StateReader[F, S]
with StateReaderHelpers[F, S]
with StateWriter[F, S]
31 changes: 30 additions & 1 deletion core/src/main/scala/endless/core/entity/Effector.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package endless.core.entity
import cats.syntax.flatMap._
import cats.effect.kernel.{Concurrent, Ref}
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.{Applicative, Monad}

import scala.concurrent.duration.FiniteDuration

/** `Effector` represents the ability to read the state of the entity, perform a possible
* passivation side-effect and further interact with the entity itself via its algebra.
* @tparam F
@@ -13,3 +18,27 @@ trait Effector[F[_], S, Alg[_[_]]] extends StateReader[F, S] with Passivator[F]
def ifKnown(f: S => F[Unit])(implicit monad: Monad[F]): F[Unit] =
read >>= (_.fold(Applicative[F].unit)(f))
}

object Effector {
def apply[F[_]: Concurrent, S, Alg[_[_]]](
entityAlg: Alg[F],
state: Option[S]
): F[Effector[F, S, Alg]] =
for {
passivationStateRef <- Ref.of[F, PassivationState](PassivationState.Unchanged)
} yield new Effector[F, S, Alg] {
def read: F[Option[S]] = state.pure[F]
def enablePassivation(after: FiniteDuration): F[Unit] =
passivationStateRef.set(PassivationState.After(after))
def disablePassivation: F[Unit] = passivationStateRef.set(PassivationState.Disabled)
def passivationState: F[PassivationState] = passivationStateRef.get
def self: Alg[F] = entityAlg
}

sealed trait PassivationState
object PassivationState {
final case class After(duration: FiniteDuration) extends PassivationState
object Disabled extends PassivationState
object Unchanged extends PassivationState
}
}
105 changes: 8 additions & 97 deletions core/src/main/scala/endless/core/entity/Entity.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package endless.core.entity

import cats.Monad
import cats.data.EitherT
import cats.syntax.either._
import endless.\/
import endless.core.event.EventWriter

/** `Entity[F, S, E]` is the ability to read an event-sourced entity state of type `S` and append
* events of type `E` affecting this state, together with the ability to compose such dependent
* effectful functions.
* events of type `E` affecting this state.
*
* These dual reader/writer abilities are what is needed to describe command handler behavior. When
* interpreting code involving `Entity`, the final resulting value in the monadic chain is
@@ -22,95 +17,11 @@ import endless.core.event.EventWriter
* state
* @tparam E
* event
*
* @see
* `DurableEntity` for the equivalent without event-sourcing
*/
trait Entity[F[_], S, E] extends StateReader[F, S] with EventWriter[F, E] with Monad[F] {

/** Convenience function which applies `fa` on the state if entity exists and wraps this in a
* `Right`, otherwise returns a `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnown[Error, A](fa: S => A)(ifUnknown: => Error): F[Error \/ A] =
ifKnownF[Error, A](s => pure(fa(s)))(ifUnknown)

/** Convenience function which applies `fa` on the state if entity exists and wraps this in a
* `Right`, otherwise returns a `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnownF[Error, A](fa: S => F[A])(ifUnknown: => Error): F[Error \/ A] =
ifKnownFE[Error, A](s => map(fa(s))(_.asRight))(ifUnknown)

/** Convenience function which applies `fa` on the state if entity exists, otherwise returns a
* `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnownFE[Error, A](fa: S => F[Error \/ A])(ifUnknown: => Error): F[Error \/ A] =
flatMap(read) {
case Some(state) => fa(state)
case None => pure(ifUnknown.asLeft)
}

/** Convenience function which applies `fa` on the state if entity exists and unwraps EitherT
* value, otherwise returns a `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnownT[Error, A](fa: S => EitherT[F, Error, A])(ifUnknown: => Error): F[Error \/ A] =
ifKnownFE(s => fa(s).value)(ifUnknown)

/** Convenience function which returns a in a `Right` if entity doesn't yet exist, otherwise calls
* `ifKnown` with the state and wraps this in a `Left`.
* @param fa
* success value when entity doesn't exist yet
* @param ifKnown
* function to compute left value in case of existing entity
*/
def ifUnknown[Error, A](a: => A)(ifKnown: S => Error): F[Error \/ A] =
ifUnknownF[Error, A](pure(a))(ifKnown)

/** Convenience function which invokes `fa` if entity doesn't yet exist and wraps this in a
* `Right`, otherwise calls `ifKnown` with the state and wraps this in a `Left`.
* @param fa
* success value when entity doesn't exist yet
* @param ifKnown
* function to compute left value in case of existing entity
*/
def ifUnknownF[Error, A](fa: => F[A])(ifKnown: S => Error): F[Error \/ A] =
ifUnknownFE[Error, A](map(fa)(_.asRight))(ifKnown)

/** Convenience function which invokes `fa` if entity doesn't yet exist and wraps this in a
* `Right`, otherwise calls `ifKnown` with the state and wraps this in a `Left`.
* @param fa
* success value when entity doesn't exist yet
* @param ifKnown
* function to compute left value in case of existing entity
*/
def ifUnknownFE[Error, A](fa: => F[Error \/ A])(ifKnown: S => Error): F[Error \/ A] =
flatMap(read) {
case None =>
fa
case Some(state) =>
pure(ifKnown(state).asLeft)
}

/** Convenience function which applies `fa` on the state if entity exists and unwraps EitherT
* value, otherwise returns a `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifUnknownT[Error, A](fa: => EitherT[F, Error, A])(ifUnknown: S => Error): F[Error \/ A] =
ifUnknownFE(fa.value)(ifUnknown)

}
trait Entity[F[_], S, E]
extends StateReader[F, S]
with StateReaderHelpers[F, S]
with EventWriter[F, E]
6 changes: 6 additions & 0 deletions core/src/main/scala/endless/core/entity/Passivator.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package endless.core.entity

import endless.core.entity.Effector.PassivationState

import scala.concurrent.duration.{Duration, FiniteDuration}

/** `Passivator` represents the ability to "passivate" an entity, i.e. flush out an entity from the
@@ -20,4 +22,8 @@ trait Passivator[F[_]] {
* message)
*/
def disablePassivation: F[Unit]

/** Get the current passivation state
*/
def passivationState: F[PassivationState]
}
21 changes: 0 additions & 21 deletions core/src/main/scala/endless/core/entity/Repository.scala

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/endless/core/entity/Self.scala
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ package endless.core.entity
* entity algebra
*/
trait Self[F[_], Alg[_[_]]] {
def self: F[Alg[F]]
def self: Alg[F]
}

object Self {
31 changes: 31 additions & 0 deletions core/src/main/scala/endless/core/entity/Sharding.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package endless.core.entity

import endless.core.protocol.{CommandProtocol, CommandSender}

/** `Sharding` represents the ability to access a specific entity in the sharded cluster, via a
* handle of the entity algebra
*
* @tparam F
* context
* @tparam ID
* id
* @tparam Alg
* entity command handling algebra
*/
trait Sharding[F[_], ID, Alg[_[_]]] {

/** Returns an instance of entity algebra `Alg` pointing to the entity with the specified ID
* @param id
* entity ID
* @return
* instance of `Alg` allowing to interact with the entity (issue commands)
*/
def entityFor(id: ID): Alg[F]
}

object Sharding {
implicit def apply[F[_], ID, Alg[_[_]]](implicit
commandProtocol: CommandProtocol[ID, Alg],
commandSender: CommandSender[F, ID]
): Sharding[F, ID, Alg] = (id: ID) => commandProtocol.clientFor(id)
}
77 changes: 77 additions & 0 deletions core/src/main/scala/endless/core/entity/SideEffect.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package endless.core.entity
import cats.Applicative
import cats.kernel.Eq
import cats.syntax.eq.*

/** `SideEffect[F, S, Alg]` represents a side-effect applied in context `F`. The side-effect is
* triggered just after events persistence if any, or after some reads for a read-only command. The
* mode of interpretation is determined by the `runModeFor` method, which defaults to `Async` but
* can be overridden. The passed `Effector` can be used to access entity state and algebra and to
* control passivation.
* @tparam F
* effect type
* @tparam S
* entity state
* @tparam Alg
* entity algebra
*/
trait SideEffect[F[_], S, Alg[_[_]]] {
def apply(trigger: SideEffect.Trigger, effector: Effector[F, S, Alg]): F[Unit]

def runModeFor(trigger: SideEffect.Trigger, state: Option[S])(implicit
applicative: Applicative[F]
): F[SideEffect.RunMode] = Applicative[F].pure(SideEffect.RunMode.Async)
}

object SideEffect {
def unit[F[_]: Applicative, S, Alg[_[_]]]: SideEffect[F, S, Alg] =
(_: Trigger, _: Effector[F, S, Alg]) => Applicative[F].unit

/** Trigger for the invocation of a side-effect: this allows for differentiated behavior depending
* on the context in which the side-effect is triggered.
*/
sealed trait Trigger {
def isAfterPersistence: Boolean = this === Trigger.AfterPersistence
def isAfterRead: Boolean = this === Trigger.AfterRead
def isAfterRecovery: Boolean = this === Trigger.AfterRecovery
}
object Trigger {

/** Triggered just after events or state persistence */
case object AfterPersistence extends Trigger

/** Triggered just after processing a read-only command (no events were written, the state
* hasn't changed)
*/
case object AfterRead extends Trigger

/** Triggered just after recovery */
case object AfterRecovery extends Trigger

implicit val eqTrigger: Eq[Trigger] = Eq.fromUniversalEquals
}

/** Run mode for a side-effect: `Async` (default value) means that the side-effect is triggered in
* "fire & forget" mode, while `Sync` means it is run to completion before any other command is
* processed by the entity.
*/
sealed trait RunMode
object RunMode {

/** Run to completion before any other command is processed by the entity.
*
* @note
* This mode should in most cases not be used for long-running side-effects, as it can hurt
* availability of the entity for command processing.
*/
case object Sync extends RunMode

/** Run in "fire & forget" mode.
*
* @note
* This mode requires careful consideration of the side-effect's concurrency and idempotency,
* as there is no limit on the number of invocations running simultaneously at any one time.
*/
case object Async extends RunMode
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/endless/core/entity/StateReader.scala
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ package endless.core.entity
*/
trait StateReader[F[_], S] {

/** Read the state from the environment, returns None if the entity doesn't yet exist
/** Read the entity state, returns None if the entity doesn't yet exist
* @return
* optional state in `F` context
*/
128 changes: 128 additions & 0 deletions core/src/main/scala/endless/core/entity/StateReaderHelpers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package endless.core.entity

import cats.Monad
import cats.data.EitherT
import cats.syntax.applicative.*
import cats.syntax.either.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import endless.\/

/** Set of convenience functions augmenting `StateReader` (that assume a `Monad` instance exists for
* `F`)
* @tparam F
* context
* @tparam S
* state
*/
trait StateReaderHelpers[F[_], S] extends StateReader[F, S] {
implicit def monad: Monad[F]

/** Convenience function which applies `fa` on the state if entity exists and wraps this in a
* `Right`, otherwise returns a `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnown[Error, A](fa: S => A)(ifUnknown: => Error): F[Error \/ A] =
ifKnownF[Error, A](s => fa(s).pure)(ifUnknown)

/** Convenience function which applies `fa` on the state if entity exists and wraps this in a
* `Right`, otherwise returns a `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnownF[Error, A](fa: S => F[A])(ifUnknown: => Error): F[Error \/ A] =
ifKnownFE[Error, A](s => fa(s).map(_.asRight))(ifUnknown)

/** Convenience function which applies `fa` on the state if entity exists, otherwise returns a
* `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnownFE[Error, A](fa: S => F[Error \/ A])(ifUnknown: => Error): F[Error \/ A] =
ifKnownElse(fa)(ifUnknown.asLeft[A].pure)

/** Convenience function which applies `fa` on the state if entity exists and unwraps EitherT
* value, otherwise returns a `Left` with the provided error value.
* @param fa
* function to apply on state
* @param ifUnknown
* left value in case of missing entity
*/
def ifKnownT[Error, A](fa: S => EitherT[F, Error, A])(ifUnknown: => Error): F[Error \/ A] =
ifKnownFE(s => fa(s).value)(ifUnknown)

/** Convenience function which applies `fa` on the state if entity exists, otherwise returns a
* default value
*
* @param fa
* function to apply on state
* @param ifUnknownF
* value in case of missing entity in `F` context
*/
def ifKnownElse[A](fa: S => F[A])(ifUnknownF: => F[A]): F[A] =
read.flatMap {
case Some(state) => fa(state)
case None => ifUnknownF
}

/** Convenience function which returns a in a `Right` if entity doesn't yet exist, otherwise calls
* `ifKnown` with the state and wraps this in a `Left`.
* @param fa
* success value when entity doesn't exist yet
* @param ifKnown
* function to compute left value in case of existing entity
*/
def ifUnknown[Error, A](a: => A)(ifKnown: S => Error): F[Error \/ A] =
ifUnknownF[Error, A](a.pure)(ifKnown)

/** Convenience function which invokes `fa` if entity doesn't yet exist and wraps this in a
* `Right`, otherwise calls `ifKnown` with the state and wraps this in a `Left`.
* @param fa
* success value when entity doesn't exist yet
* @param ifKnown
* function to compute left value in case of existing entity
*/
def ifUnknownF[Error, A](fa: => F[A])(ifKnown: S => Error): F[Error \/ A] =
ifUnknownFE[Error, A](fa.map(_.asRight))(ifKnown)

/** Convenience function which invokes `fa` if entity doesn't yet exist and wraps this in a
* `Right`, otherwise calls `ifKnown` with the state and wraps this in a `Left`.
* @param fa
* success value when entity doesn't exist yet
* @param ifKnown
* function to compute left value in case of existing entity
*/
def ifUnknownFE[Error, A](fa: => F[Error \/ A])(ifKnown: S => Error): F[Error \/ A] =
ifUnknownElse(fa)(state => ifKnown(state).asLeft[A].pure)

/** Convenience function which invokes `fa` if entity doesn't yet exist, otherwise calls ,
* `ifKnown` with the the state and wraps this in a `Left`.
* @param fa
* value wrapped in `EitherT` when entity doesn't exist yet
* @param ifKnown
* function to compute left value in case of existing entity
*/
def ifUnknownT[Error, A](fa: => EitherT[F, Error, A])(ifKnown: S => Error): F[Error \/ A] =
ifUnknownFE(fa.value)(ifKnown)

/** Convenience function which returns a value `fa` in `F` context, otherwise calls `ifKnown` with
* the state
*
* @param fa
* value in case of missing entity in `F` context
* @param ifKnown
* function to apply on state
*/
def ifUnknownElse[A](fa: => F[A])(ifKnown: S => F[A]): F[A] =
read.flatMap {
case Some(state) => ifKnown(state)
case None => fa
}
}
36 changes: 36 additions & 0 deletions core/src/main/scala/endless/core/entity/StateWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package endless.core.entity

/** `StateWriter[F, S]` is the ability to write a value of type `S`, where that value is
* semantically understood as the new state of the entity.
*
* @tparam F
* context
* @tparam S
* state
*/
trait StateWriter[F[_], S] {

/** Write the entity state
* @param s
* entity state
* @return
* unit in `F` context
*/
def write(s: S): F[Unit]

/** Modify the entity state with the given function
* @param f
* state modifier
* @return
* unit in `F` context
*/
def modify(f: S => S): F[Unit]

/** Modify the entity state with the given function expressed in `F` context
* @param f
* state modifier in `F` context
* @return
* unit in `F` context
*/
def modifyF(f: S => F[S]): F[Unit]
}
2 changes: 0 additions & 2 deletions core/src/main/scala/endless/core/event/EventApplier.scala
Original file line number Diff line number Diff line change
@@ -5,8 +5,6 @@ import endless.\/
/** Function that defines transition of the state given an event (or invalid event for the given
* state).
*
* @note
* returning `None` allows ignoring irrelevant events before entity is created
* @tparam S
* state
* @tparam E
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package endless.core.interpret

import cats.Applicative
import endless.core.entity.Entity

/** Interprets an algebra `Alg` expressed using `Entity` in context `F` with `EntityT`
*
* @tparam F
* effect type
* @tparam S
* state
* @tparam E
* event
* @tparam Alg
* entity algebra
* @return
* interpreted entity algebra in context `F`
*/
trait BehaviorInterpreter[F[_], S, E, Alg[_[_]]] {
def apply(entity: Entity[EntityT[F, S, E, *], S, E]): F[Alg[EntityT[F, S, E, *]]]
}

object BehaviorInterpreter {

/** Lifts a pure interpreter into an `EntityInterpreter` (which is expressed in context `F`)
* @param pureInterpreter
* pure interpreter (i.e. it doesn't require any effect to create the algebra instance)
* @tparam F
* effect type
* @tparam S
* state
* @tparam E
* event
* @tparam Alg
* entity algebra
* @return
* entity algebra interpreter in context `F`
*/
def lift[F[_]: Applicative, S, E, Alg[_[_]]](
pureInterpreter: Entity[EntityT[F, S, E, *], S, E] => Alg[EntityT[F, S, E, *]]
): BehaviorInterpreter[F, S, E, Alg] = (entity: Entity[EntityT[F, S, E, *], S, E]) =>
Applicative[F].pure(pureInterpreter(entity))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package endless.core.interpret

import cats.Applicative
import endless.core.entity.DurableEntity
import endless.core.interpret.DurableEntityT.DurableEntityT

/** Interprets an algebra `Alg` expressed using `DurableEntity` in context `F` with `DurableEntityT`
*
* @tparam F
* effect type
* @tparam S
* state
* @tparam Alg
* entity algebra
* @return
* interpreted entity algebra in context `F`
*/
trait DurableBehaviorInterpreter[F[_], S, Alg[_[_]]] {
def apply(entity: DurableEntity[DurableEntityT[F, S, *], S]): F[Alg[DurableEntityT[F, S, *]]]
}

object DurableBehaviorInterpreter {

/** Lifts a pure interpreter into an `DurableEntityInterpreter` (which is expressed in context
* `F`)
* @param pureInterpreter
* pure interpreter (i.e. it doesn't require any effect to create the algebra instance)
* @tparam F
* effect type
* @tparam S
* state
* @tparam Alg
* entity algebra
* @return
* entity algebra interpreter in context `F`
*/
def lift[F[_]: Applicative, S, Alg[_[_]]](
pureInterpreter: DurableEntity[DurableEntityT[F, S, *], S] => Alg[DurableEntityT[F, S, *]]
): DurableBehaviorInterpreter[F, S, Alg] =
(entity: DurableEntity[DurableEntityT[F, S, *], S]) =>
Applicative[F].pure(pureInterpreter(entity))
}
91 changes: 91 additions & 0 deletions core/src/main/scala/endless/core/interpret/DurableEntityT.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package endless.core.interpret

import cats.data.{IndexedStateT, StateT}
import cats.effect.kernel.Clock
import cats.{Applicative, Functor, Monad, ~>}
import endless.core.entity.DurableEntity
import org.typelevel.log4cats.Logger

import scala.concurrent.duration.FiniteDuration

object DurableEntityT {
sealed trait State[+S]
object State {
case object None extends State[Nothing]
final case class Existing[S](state: S) extends State[S]
final case class Updated[S](state: S) extends State[S]
}

/** `DurableEntityT[F, S, A]` is a type alias for StateT monad transformer from cats. `State` is
* the state of the entity, which can be get (exposed as `read`) and set (exposed as `write`)
* @tparam F
* context
* @tparam S
* entity state
* @tparam A
* value
*/
type DurableEntityT[F[_], S, A] = StateT[F, State[S], A]

def liftF[F[_]: Applicative, S, A](fa: F[A]): DurableEntityT[F, S, A] = StateT.liftF(fa)

implicit def liftK[F[_]: Applicative, S]: F ~> DurableEntityT[F, S, *] = StateT.liftK

def unit[F[_]: Applicative, S]: DurableEntityT[F, S, Unit] = liftF(Applicative[F].unit)

def stateReader[F[_]: Applicative, S]: DurableEntityT[F, S, Option[S]] =
StateT.get[F, State[S]].map {
case State.None => None
case State.Existing(state) => Some(state)
case State.Updated(state) => Some(state)
}

def stateWriter[F[_]: Applicative, S](state: S): DurableEntityT[F, S, Unit] =
StateT.set(State.Updated(state))

def stateModifier[F[_]: Applicative, S](f: S => S): DurableEntityT[F, S, Unit] =
StateT.modify {
case State.None => State.None
case State.Existing(state) => State.Updated(f(state))
case State.Updated(state) => State.Updated(f(state))
}

def stateModifierF[F[_]: Applicative, S](f: S => F[S]): DurableEntityT[F, S, Unit] =
StateT.modifyF {
case State.None => Applicative[F].pure(State.None)
case State.Existing(state) => Functor[F].map(f(state))(State.Updated(_))
case State.Updated(state) => Functor[F].map(f(state))(State.Updated(_))
}

/** Given that a monad instance can be found for F, this provides an DurableEntityT transformer
* instance for it. This is used by `deployDurableEntity`: the `createEntity` creator for entity
* algebra can thus be injected with an instance of `DurableEntity[F[_]]` interpreted with
* DurableEntityT[F, S, *]
*/
implicit def instance[F[_]: Monad, S]: DurableEntity[DurableEntityT[F, S, *], S] =
new DurableEntity[DurableEntityT[F, S, *], S] {
def read: DurableEntityT[F, S, Option[S]] = stateReader
def write(s: S): DurableEntityT[F, S, Unit] = stateWriter(s)
def modify(f: S => S): DurableEntityT[F, S, Unit] = stateModifier(f)
def modifyF(f: S => DurableEntityT[F, S, S]): DurableEntityT[F, S, Unit] =
stateModifierF(state => f(state).runA(State.Updated(state)))

implicit lazy val monad: Monad[DurableEntityT[F, S, *]] =
IndexedStateT.catsDataMonadForIndexedStateT
}

implicit def clockForDurableEntityT[F[_]: Applicative: Clock, S](implicit
A0: Applicative[DurableEntityT[F, S, *]]
): Clock[DurableEntityT[F, S, *]] =
new Clock[DurableEntityT[F, S, *]] {
def applicative: Applicative[DurableEntityT[F, S, *]] = A0

def monotonic: DurableEntityT[F, S, FiniteDuration] = liftF(Clock[F].monotonic)

def realTime: DurableEntityT[F, S, FiniteDuration] = liftF(Clock[F].realTime)
}

implicit def loggerForDurableEntityT[F[_]: Applicative, S](implicit
logger: Logger[F]
): Logger[DurableEntityT[F, S, *]] = logger.mapK(liftK[F, S])
}
84 changes: 0 additions & 84 deletions core/src/main/scala/endless/core/interpret/EffectorT.scala

This file was deleted.

8 changes: 0 additions & 8 deletions core/src/main/scala/endless/core/interpret/EntityLift.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package endless.core.interpret

import cats.conversions.all._
import cats.data.{Chain, NonEmptyChain}
import cats.syntax.applicative._
import cats.syntax.either._
import cats.syntax.applicative.*
import cats.syntax.either.*
import cats.{Applicative, Monad}
import endless.core.data.{EventsFolder, Folded}

45 changes: 33 additions & 12 deletions core/src/main/scala/endless/core/interpret/EntityT.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package endless.core.interpret

import cats.conversions.all._
import cats.data.{Chain, NonEmptyChain}
import cats.syntax.applicative._
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.effect.kernel.Clock
import cats.syntax.applicative.*
import cats.syntax.either.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.{Applicative, Functor, Monad, ~>}
import endless.core.data.{EventsFolder, Folded}
import endless.core.entity.Entity
import endless.core.event.EventApplier
import org.typelevel.log4cats.Logger

import scala.concurrent.duration.FiniteDuration

/** `EntityT[F, S, E, A]`` is data type implementing the `Entity[F, S, E]` state reader and event
* writer abilities. It is a monad transformer used as an interpreter for functional chains
@@ -48,9 +52,12 @@ final class EntityT[F[_], S, E, A](
case Left(reason) => reason.asLeft.pure
}
})

def map[B](f: A => B)(implicit monad: Monad[F]): EntityT[F, S, E, B] =
flatMap(a => EntityT.purr(f(a)))
}

object EntityT extends EntityRunFunctions with LoggerLiftingHelper {
object EntityT extends EntityRunFunctions {
def writer[F[_]: Applicative, S, E](newEvents: NonEmptyChain[E]): EntityT[F, S, E, Unit] =
new EntityT((_, existing) => write(newEvents)(existing))

@@ -61,17 +68,31 @@ object EntityT extends EntityRunFunctions with LoggerLiftingHelper {
def liftF[F[_]: Functor, S, E, A](fa: F[A]): EntityT[F, S, E, A] =
new EntityT((_, events) => fa.map(a => (events, a).asRight))

implicit def liftK[F[_]: Functor, S, E, A]: F ~> EntityT[F, S, E, *] =
implicit def liftK[F[_]: Functor, S, E]: F ~> EntityT[F, S, E, *] =
new (F ~> EntityT[F, S, E, *]) {
def apply[B](fa: F[B]): EntityT[F, S, E, B] = liftF(fa)
}

def reader[F[_]: Monad, S, E]: EntityT[F, S, E, Option[S]] = new EntityT(read[F, S, E])

implicit def instance[F[_], S, E](implicit
monad0: Monad[F]
): EntityLift[EntityT[F, S, E, *], F, S, E] =
new EntityTLiftInstance[F, S, E] {
override protected implicit def monad: Monad[F] = monad0
/** Given that a monad instance can be found for F, this provides an EntityT transformer instance
* for it. This is used by `deployEntity`: the `createEntity` creator for entity algebra can thus
* be injected with an instance of `Entity[F[_]]` interpreted with EntityT[F, S, E, *]
*/
implicit def instance[F[_]: Monad, S, E]
: Entity[EntityT[F, S, E, *], S, E] & Monad[EntityT[F, S, E, *]] =
new EntityTLiftInstance[F, S, E]

implicit def clockForEntityT[F[_]: Functor: Clock, S, E](implicit
A0: Applicative[EntityT[F, S, E, *]]
): Clock[EntityT[F, S, E, *]] =
new Clock[EntityT[F, S, E, *]] {
def applicative: Applicative[EntityT[F, S, E, *]] = A0
def monotonic: EntityT[F, S, E, FiniteDuration] = liftF(Clock[F].monotonic)
def realTime: EntityT[F, S, E, FiniteDuration] = liftF(Clock[F].realTime)
}

implicit def loggerForEntityT[F[_]: Functor, S, E](implicit
logger: Logger[F]
): Logger[EntityT[F, S, E, *]] = logger.mapK(liftK[F, S, E])
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@
package endless.core.interpret

import cats.Monad
import cats.conversions.all._
import cats.data.NonEmptyChain
import cats.syntax.applicative._
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.applicative.*
import cats.syntax.either.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import endless.core.entity.Entity

trait EntityTLiftInstance[F[_], S, E] extends EntityLift[EntityT[F, S, E, *], F, S, E] {
protected implicit def monad: Monad[F]
override def read: EntityT[F, S, E, Option[S]] = EntityT.reader[F, S, E]
class EntityTLiftInstance[F[_], S, E](implicit fMonad: Monad[F])
extends Entity[EntityT[F, S, E, *], S, E]
with Monad[EntityT[F, S, E, *]] {
implicit lazy val monad: Monad[EntityT[F, S, E, *]] = new Monad[EntityT[F, S, E, *]] {
def pure[A](x: A): EntityT[F, S, E, A] = EntityT.purr(x)

def flatMap[A, B](fa: EntityT[F, S, E, A])(f: A => EntityT[F, S, E, B]): EntityT[F, S, E, B] =
fa.flatMap(f)

def pure[A](a: A): EntityT[F, S, E, A] = EntityT.purr(a)
def tailRecM[A, B](a: A)(f: A => EntityT[F, S, E, Either[A, B]]): EntityT[F, S, E, B] =
new EntityT[F, S, E, B]((folder, events) =>
fMonad.tailRecM((events, a)) { case (events, a) =>
f(a).runAcc(folder, events).flatMap {
case Right((nextEvents, Left(nextA))) =>
f(nextA).runAcc(folder, nextEvents).map {
case Right((nextNextEvents, Left(a))) =>
(nextNextEvents, a).asLeft // we keep digging
case Right((nextNextEvents, Right(b))) => (nextNextEvents, b).asRight.asRight
case Left(invalidFoldReason) => invalidFoldReason.asLeft.asRight
}
case Right((nextEvents, Right(b))) => (nextEvents, b).asRight.asRight.pure[F]
case Left(invalidFoldReason) => invalidFoldReason.asLeft.asRight.pure[F]
}
}
)
}

override def read: EntityT[F, S, E, Option[S]] = EntityT.reader[F, S, E]
override def write(event: E, other: E*): EntityT[F, S, E, Unit] =
EntityT.writer(NonEmptyChain(event, other: _*))
EntityT.writer(NonEmptyChain(event, other *))

override def flatMap[A, B](fa: EntityT[F, S, E, A])(
f: A => EntityT[F, S, E, B]
): EntityT[F, S, E, B] = fa.flatMap(f)
def pure[A](x: A): EntityT[F, S, E, A] = monad.pure(x)

def tailRecM[A, B](a: A)(f: A => EntityT[F, S, E, Either[A, B]]): EntityT[F, S, E, B] =
new EntityT[F, S, E, B]((folder, events) =>
monad.tailRecM((events, a)) { case (events, a) =>
f(a).runAcc(folder, events).flatMap {
case Right((nextEvents, Left(nextA))) =>
f(nextA).runAcc(folder, nextEvents).map {
case Right((nextNextEvents, Left(a))) => (nextNextEvents, a).asLeft // we keep digging
case Right((nextNextEvents, Right(b))) => (nextNextEvents, b).asRight.asRight
case Left(invalidFoldReason) => invalidFoldReason.asLeft.asRight
}
case Right((nextEvents, Right(b))) => (nextEvents, b).asRight.asRight.pure
case Left(invalidFoldReason) => invalidFoldReason.asLeft.asRight.pure
}
}
)
def flatMap[A, B](fa: EntityT[F, S, E, A])(f: A => EntityT[F, S, E, B]): EntityT[F, S, E, B] =
monad.flatMap(fa)(f)

def tailRecM[A, B](a: A)(f: A => EntityT[F, S, E, Either[A, B]]): EntityT[F, S, E, B] =
monad.tailRecM(a)(f)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package endless.core.interpret

import cats.Applicative
import cats.effect.kernel.Resource
import endless.core.entity.Sharding

/** Interpret an algebra `RepositoryAlg` expressed using `Sharding` in context `F`, materializing
* the distributed repository
*
* @tparam F
* effect type
* @tparam ID
* entity ID
* @tparam Alg
* entity algebra
* @tparam RepositoryAlg
* repository algebra
* @return
* interpreted repository algebra in context `F`
*/
trait RepositoryInterpreter[F[_], ID, Alg[_[_]], RepositoryAlg[_[_]]] {
def apply(sharding: Sharding[F, ID, Alg]): Resource[F, RepositoryAlg[F]]
}

object RepositoryInterpreter {

/** Lifts a pure interpreter into an `RepositoryInterpreter` (which is expressed in context `F`)
* @param pureInterpreter
* pure interpreter (i.e. it doesn't require any effect to create the algebra instance)
* @tparam F
* effect type
* @tparam ID
* entity ID
* @tparam Alg
* entity algebra
* @tparam RepositoryAlg
* repository algebra
* @return
* repository algebra interpreter in context `F`
*/
def lift[F[_]: Applicative, ID, Alg[_[_]], RepositoryAlg[_[_]]](
pureInterpreter: Sharding[F, ID, Alg] => RepositoryAlg[F]
): RepositoryInterpreter[F, ID, Alg, RepositoryAlg] = (sharding: Sharding[F, ID, Alg]) =>
Resource.pure(pureInterpreter(sharding))
}
60 changes: 0 additions & 60 deletions core/src/main/scala/endless/core/interpret/RepositoryT.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package endless.core.interpret

import cats.Applicative
import endless.core.entity.SideEffect

/** Interprets a function `F[Unit]` describing a side-effect using `Effector` in context `F`
*
* @tparam F
* effect type
* @tparam S
* state
* @tparam Alg
* entity algebra
* @tparam RepositoryAlg
* repository algebra
* @return
* interpreted side-effect function in context `F`
*/
trait SideEffectInterpreter[F[_], S, Alg[_[_]], RepositoryAlg[_[_]]] {
def apply(repository: RepositoryAlg[F], alg: Alg[F]): F[SideEffect[F, S, Alg]]
}

object SideEffectInterpreter {

/** Lifts a pure interpreter into an `SideEffectInterpreter` (which is expressed in context `F`)
* @param pureSideEffect
* pure side-effect (i.e. it doesn't require any effect to create the side-effect instance)
* @tparam F
* effect type
* @tparam S
* state
* @tparam Alg
* entity algebra
* @tparam RepositoryAlg
* repository algebra
* @return
* side-effect interpreter in context `F`
*/
def lift[F[_]: Applicative, S, Alg[_[_]], RepositoryAlg[_[_]]](
pureSideEffect: (RepositoryAlg[F], Alg[F]) => SideEffect[F, S, Alg]
): SideEffectInterpreter[F, S, Alg, RepositoryAlg] =
(repository: RepositoryAlg[F], alg: Alg[F]) =>
Applicative[F].pure(pureSideEffect(repository, alg))

/** Creates a no-op side-effect interpreter, for repositories that don't require any side-effect
* @tparam F
* effect type
* @tparam S
* state
* @tparam Alg
* entity algebra
* @tparam RepositoryAlg
* repository algebra
* @return
* unit side-effect interpreter in context `F`
*/
def unit[F[_]: Applicative, S, Alg[_[_]], RepositoryAlg[_[_]]]
: SideEffectInterpreter[F, S, Alg, RepositoryAlg] =
lift((_, _) => SideEffect.unit[F, S, Alg])
}
41 changes: 32 additions & 9 deletions core/src/main/scala/endless/core/protocol/CommandProtocol.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,47 @@
package endless.core.protocol

/** `CommandProtocol` represents the serialization aspects of the entity.
/** `CommandProtocol` represents the transport aspects of the entity cluster.
*
* `client` provides an interpretation of the algebra with `OutgoingCommand[*]` for issuing
* commands for each defined function in the algebra, and `server` is a decoder for an instance of
* `IncomingCommand[F, Alg]` which represents an incoming command that can be directly run and
* contains its own reply encoder as well.
* `client` provides an implementation of the algebra which sends commands for each defined
* function in the algebra and decodes the reply, and `server` is a decoder for instances of
* `IncomingCommand[F, Alg]` that represent incoming commands that can be run and contain their own
* reply encoder.
* @tparam Alg
* the entity algebra
*/
trait CommandProtocol[Alg[_[_]]] {
trait CommandProtocol[ID, Alg[_[_]]] {

/** Decoder for `IncomingCommand[F, Alg]` which can run the command and encode the reply
* @tparam F
* context
*/
def server[F[_]]: Decoder[IncomingCommand[F, Alg]]

/** Instance of the entity algebra interpreted with `OutgoingCommand[*]` which contains the binary
* payload and can decode the command reply
/** Returns an instance of entity algebra that translates calls into commands, sends them via the
* `CommandSender` instance in implicit scope, and decodes the reply (implements an RPC-like
* client).
*/
def client: Alg[OutgoingCommand[*]]
def clientFor[F[_]](id: ID)(implicit sender: CommandSender[F, ID]): Alg[F]
}

object CommandProtocol {

/** Helper function that sends a command to the entity with the specified ID using the sender in
* implicit scope.
*
* @param id
* entity ID
* @param command
* command
* @param sender
* command sender
* @tparam F
* context
* @tparam R
* reply type
* @return
*/
def sendCommand[F[_], ID, R](id: ID, command: OutgoingCommand[R])(implicit
sender: CommandSender[F, ID]
): F[R] = sender.senderForID(id)(command)
}
20 changes: 0 additions & 20 deletions core/src/main/scala/endless/core/protocol/CommandRouter.scala

This file was deleted.

49 changes: 49 additions & 0 deletions core/src/main/scala/endless/core/protocol/CommandSender.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package endless.core.protocol

import cats.{Functor, ~>}
import cats.syntax.functor.*

/** `CommandSender[F, ID]` provides a natural transformation to deliver an outgoing command to where
* the entity resides and decode the reply as a simple value in the `F` context.
* @tparam F
* context
* @tparam ID
* entity ID
*/
trait CommandSender[F[_], ID] {

/** Returns a natural transformation to deliver an outgoing command to the entity with this
* particular ID.
* @param id
* entity ID
*/
def senderForID(id: ID): OutgoingCommand[*] ~> F
}

object CommandSender {

/** Local command sender, for testing purposes
* @param protocol
* command protocol, used for its server component
* @param alg
* entity algebra instance
* @tparam F
* context
* @tparam ID
* entity ID
* @tparam Alg
* entity algebra
* @return
* a command sender that runs the command locally
*/
def local[F[_]: Functor, ID, Alg[_[_]]](
protocol: CommandProtocol[ID, Alg],
alg: Alg[F]
): CommandSender[F, ID] = (_: ID) =>
new (OutgoingCommand ~> F) {
def apply[A](fa: OutgoingCommand[A]): F[A] = {
val incoming = protocol.server[F].decode(fa.payload)
incoming.runWith(alg).map(incoming.replyEncoder.encode).map(fa.replyDecoder.decode)
}
}
}
92 changes: 67 additions & 25 deletions core/src/test/scala/endless/core/entity/EffectorSuite.scala
Original file line number Diff line number Diff line change
@@ -1,39 +1,81 @@
package endless.core.entity

import cats.Id
import org.scalacheck.Prop.forAll

import scala.concurrent.duration.FiniteDuration

class EffectorSuite extends munit.ScalaCheckSuite {
property("ifKnown with state") {
forAll { (state: State) =>
var sideEffected = false
val effector = new TestEffector(Some(state))
effector.ifKnown(_ => sideEffected = true)
assert(sideEffected)
import cats.effect.IO
import cats.effect.kernel.Ref
import munit.ScalaCheckEffectSuite
import org.scalacheck.effect.PropF.*
import scala.concurrent.duration.*

class EffectorSuite extends munit.CatsEffectSuite with ScalaCheckEffectSuite {
test("ifKnown with state") {
forAllF { (state: State) =>
for {
sideEffected <- Ref.of[IO, Boolean](false)
effector <- Effector.apply(fooIO, Some(state))
_ <- effector.ifKnown(_ => sideEffected.set(true))
_ <- assertIO(sideEffected.get, true)
} yield ()
}
}

property("ifKnown without state") {
var sideEffected = false
val effector = new TestEffector(None)
effector.ifKnown(_ => sideEffected = true)
assert(!sideEffected)
test("ifKnown without state") {
for {
sideEffected <- Ref.of[IO, Boolean](false)
effector <- Effector.apply(fooIO, Option.empty[State])
_ <- effector.ifKnown(_ => sideEffected.set(true))
_ <- assertIO(sideEffected.get, false)
} yield ()
}

test("read allows to read state") {
forAllF { (state: State) =>
for {
effector <- Effector.apply(fooIO, Some(state))
_ <- assertIO(effector.read, Some(state))
} yield ()
}
}

test("enablePassivation sets passivation state") {
for {
effector <- Effector.apply(fooIO, Option.empty[State])
_ <- effector.enablePassivation(1.second)
_ <- assertIO(effector.passivationState, Effector.PassivationState.After(1.second))
} yield ()

}

test("disablePassivation sets passivation state to disabled") {
for {
effector <- Effector.apply(fooIO, Option.empty[State])
_ <- effector.disablePassivation
_ <- assertIO(effector.passivationState, Effector.PassivationState.Disabled)
} yield ()

}

test("read does not affect passivation state") {
for {
effector <- Effector.apply(fooIO, Option.empty[State])
_ <- effector.read
_ <- assertIO(effector.passivationState, Effector.PassivationState.Unchanged)
} yield ()

}

test("self allows to call entity algebra") {
for {
effector <- Effector.apply(fooIO, Option.empty[State])
_ <- effector.self.foo
} yield ()
}

type State = Int
type Event = String
trait Alg[F[_]] {
def foo: F[Unit]
}

class TestEffector(state: Option[State]) extends Effector[Id, State, Alg] {
def self: Id[Alg[Id]] = new Alg[Id] {
def foo: Id[Unit] = ()
}
def enablePassivation(after: FiniteDuration): Id[Unit] = ()
def disablePassivation: Id[Unit] = ()
def read: Id[Option[State]] = state
val fooIO = new Alg[IO] {
def foo: IO[Unit] = IO(())
}
}
Loading