diff --git a/akka-distributed-cluster-docs/src/main/paradox/feature-summary.md b/akka-distributed-cluster-docs/src/main/paradox/feature-summary.md
index 5fc475444..e7010eb02 100644
--- a/akka-distributed-cluster-docs/src/main/paradox/feature-summary.md
+++ b/akka-distributed-cluster-docs/src/main/paradox/feature-summary.md
@@ -59,6 +59,7 @@ Replicated Event Sourcing gives:
* redundancy to tolerate failures in one location and still be operational
* serve requests from a location near the user to provide better responsiveness
+* allow updates to an entity from several locations
* balance the load over many servers
The replicas of the entities are running in separate Akka Clusters for the reasons described in
diff --git a/akka-edge-docs/src/main/paradox/feature-summary.md b/akka-edge-docs/src/main/paradox/feature-summary.md
index a097bc76b..f8b1b0ee7 100644
--- a/akka-edge-docs/src/main/paradox/feature-summary.md
+++ b/akka-edge-docs/src/main/paradox/feature-summary.md
@@ -1,6 +1,9 @@
# Feature Summary
-The main feature of Akka Edge is Projections over gRPC - asynchronous brokerless service-to-service communication.
+Akka Edge has two main features:
+
+1. Projections over gRPC - asynchronous brokerless service-to-service communication
+1. Replicated Event Sourcing over gRPC - active-active entities
## Projections over gRPC
@@ -111,16 +114,39 @@ H2 database should not be used when the service is an Akka Cluster with more tha
* @extref[Reference documentation of Akka Projection gRPC](akka-projection:grpc.html)
* @extref[Reference documentation of Akka Projection gRPC with producer push](akka-projection:grpc-producer-push.html)
-## Replicated Event Sourcing is not for Edge
+## Replicated Event Sourcing over gRPC
+
+You would use Replicated Event Sourcing over gRPC for entities that can be updated in more than one geographical
+location, such as edge Point-of-Presence (PoP) and different cloud regions. This makes it possible to implement
+patterns such as active-active and hot standby.
+
+![Diagram showing services using Replicated Event Sourcing over gRPC between cloud and edge services](images/edge-res.svg)
+
+Replicated Event Sourcing gives:
+
+* redundancy to tolerate failures in one location and still be operational
+* serve requests from a location near the user to provide better responsiveness
+* allow updates to an entity from several locations
+* balance the load over many servers
+
+The replicas of the entities are running in separate Akka Clusters in the cloud and edge.
+A reliable event replication transport over gRPC is used between the Akka Clusters. The replica entities belong
+to the same logical Microservice, i.e. same [Bounded Context](https://martinfowler.com/bliki/BoundedContext.html)
+in Domain-Driven Design (DDD) terminology.
-@extref[Replicated Event Sourcing over gRPC](akka-distributed-cluster:feature-summary.html#replicated-event-sourcing-over-grpc)
-is a useful feature in Akka Distributed Cluster, but it is not recommended for edge use cases. The reasons why it is currently
-not supported for Akka Edge are:
+Note that the connection is established from the edge service. For this you need to setup @extref[Akka Replicated Event Sourcing gRPC with edge topology](akka-projection:grpc-replicated-event-sourcing-transport.html#edge-topology).
-* It requires gRPC connectivity in both directions between the replicas.
-* The overhead of CRDT metadata may become too large when there are many 100s of replicas, or if the replicas dynamically change over time.
+Filters can be used to define that a subset of the entities should be replicated to certain locations.
+The filters can be changed at runtime.
+
+@@@ note
+Events are stored in a database for each replica. There is no direct database access between a replica and
+the database of another replica, which means different databases, and even different database products, can
+be used for the replicas. For example, Postgres in the Cloud and H2 at the edge.
+@@@
+
+### Learn more
-That said, if you can overcome these restrictions it can be a good fit also for edge use cases. You might have
-a network topology that allows establishing connections in both directions (e.g. VPN solution) and you might not have
-that many edge services. The latter can also be mitigated by strict filters so that not all entities are replicated
-everywhere.
+* FIXME link to guide
+* @extref[Reference documentation of Akka Replicated Event Sourcing](akka:typed/replicated-eventsourcing.html)
+* @extref[Reference documentation of Akka Replicated Event Sourcing over gRPC](akka-projection:grpc-replicated-event-sourcing-transport.html)
diff --git a/akka-edge-docs/src/main/paradox/images/edge-res.drawio b/akka-edge-docs/src/main/paradox/images/edge-res.drawio
new file mode 100644
index 000000000..69dd2e17e
--- /dev/null
+++ b/akka-edge-docs/src/main/paradox/images/edge-res.drawio
@@ -0,0 +1,352 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka-edge-docs/src/main/paradox/images/edge-res.svg b/akka-edge-docs/src/main/paradox/images/edge-res.svg
new file mode 100644
index 000000000..a7f845953
--- /dev/null
+++ b/akka-edge-docs/src/main/paradox/images/edge-res.svg
@@ -0,0 +1,3 @@
+
+
+
\ No newline at end of file
diff --git a/akka-edge-docs/src/main/paradox/overview.md b/akka-edge-docs/src/main/paradox/overview.md
index d62f450a6..81723f416 100644
--- a/akka-edge-docs/src/main/paradox/overview.md
+++ b/akka-edge-docs/src/main/paradox/overview.md
@@ -28,6 +28,8 @@ active-active entities or one-way event replication as provided by @extref[Akka
The edge services connect to the cloud services and can use event replication in both directions to communicate with
cloud services. An edge service can be a single node or form a small Akka Cluster.
+The cloud and edge services may use active-active entities or one-way event replication between cloud and edge.
+
The edge service can be fully autonomous and continue working when there are network disruptions, or if the
edge service chooses to not always be connected. It will catch up on pending events when the communication is
established again.
diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala
index 1575efb58..ae9cc98b7 100644
--- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala
+++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala
@@ -199,7 +199,6 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
10.seconds,
8,
R2dbcReplication())
- .withEdgeReplication(true)
}
selfReplicaId match {
diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala
index c7b0fa06e..b17a882b1 100644
--- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala
+++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala
@@ -4,143 +4,214 @@
package akka.projection.grpc.replication
-import akka.actor.testkit.typed.scaladsl.ActorTestKit
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
+import scala.concurrent.duration._
+
+import akka.actor.testkit.typed.scaladsl.LogCapturing
+import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import akka.projection.grpc.replication.javadsl.{ ReplicationProjectionProvider => JReplicationProjectionProvider }
+import akka.projection.grpc.replication.javadsl.{ ReplicationSettings => JReplicationSettings }
import akka.projection.grpc.replication.scaladsl.ReplicationProjectionProvider
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
-import akka.projection.grpc.replication.javadsl.{ ReplicationSettings => JReplicationSettings }
-import akka.projection.grpc.replication.javadsl.{ ReplicationProjectionProvider => JReplicationProjectionProvider }
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
-import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object ReplicationSettingsSpec {
+ val config = ConfigFactory
+ .parseString("""
+ my-replicated-entity {
+ # which of the replicas this node belongs to, should be the same
+ # across the nodes of each replica Akka cluster.
+ self-replica-id = dca
+
+ # Pick it up from an environment variable to re-use the same config
+ # without changes across replicas
+ self-replica-id = ${?SELF_REPLICA}
+
+ # max number of parallel in-flight (sent over sharding) entity updates
+ # per consumer/projection
+ parallel-updates = 8
+
+ # Fail the replication stream (and restart with backoff) if completing
+ # the write of a replicated event reaching the cluster takes more time
+ # than this.
+ entity-event-replication-timeout = 10s
+
+ replicas: [
+ {
+ # Unique identifier of the replica/datacenter, is stored in the events
+ # and cannot be changed after events have been persisted.
+ replica-id = "dca"
+
+ # Number of replication streams/projections to start to consume events
+ # from this replica
+ number-of-consumers = 4
+
+ # Akka gRPC client config block for how to reach this replica
+ # from the other replicas, note that binding the server/publishing
+ # endpoint of each replica is done separately, in code.
+ grpc.client {
+ host = "dca.example.com"
+ port = 8443
+ use-tls = true
+ }
+ },
+ {
+ replica-id = "dcb"
+ number-of-consumers = 4
+ # Optional - only run replication stream consumers for events from the
+ # remote replica on nodes with this role
+ consumers-on-cluster-role = dcb-consumer
+ grpc.client {
+ host = "dcb.example.com"
+ port = 8444
+ }
+ },
+ {
+ replica-id = "dcc"
+ number-of-consumers = 4
+ grpc.client {
+ host = "dcc.example.com"
+ port = 8445
+ }
+ }
+ ]
+ }
+
+ // #config-replicated-shopping-cart
+ # Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart`
+ # is the same as the ShoppingCart entity type name.
+ replicated-shopping-cart {
+ # which of the replicas this node belongs to, should be the same
+ # across the nodes of each replica Akka cluster.
+ self-replica-id = us-east-1
+
+ # Pick it up from an environment variable to re-use the same config
+ # without changes across replicas
+ self-replica-id = ${?SELF_REPLICA}
+
+ # max number of parallel in-flight (sent over sharding) entity updates
+ # per consumer/projection
+ parallel-updates = 8
+
+ # Fail the replication stream (and restart with backoff) if completing
+ # the write of a replicated event reaching the cluster takes more time
+ # than this.
+ entity-event-replication-timeout = 10s
+
+ replicas: [
+ {
+ # Unique identifier of the replica/datacenter, is stored in the events
+ # and cannot be changed after events have been persisted.
+ replica-id = "us-east-1"
+
+ # Number of replication streams/projections to start to consume events
+ # from this replica
+ number-of-consumers = 4
+
+ # Akka gRPC client config block for how to reach this replica
+ # from the other replicas, note that binding the server/publishing
+ # endpoint of each replica is done separately, in code.
+ grpc.client {
+ host = "k8s-shopping-604179632a-148180922.us-east-2.elb.amazonaws.com"
+ host = ${?US_EAST_1_GRPC_HOST}
+ port = 443
+ port = ${?US_EAST_1_GRPC_PORT}
+ use-tls = true
+ }
+ },
+ {
+ replica-id = "eu-west-1"
+ number-of-consumers = 4
+ # Optional - only run replication stream consumers for events from the
+ # remote replica on nodes with this role
+ consumers-on-cluster-role = replication-consumer
+ grpc.client {
+ host = "k8s-shopping-19708e1324-24617530ddc6d2cb.elb.eu-west-1.amazonaws.com"
+ host = ${?EU_WEST_1_GRPC_HOST}
+ port = 443
+ port = ${?EU_WEST_1_GRPC_PORT}
+ }
+ }
+ ]
+ }
+ // #config-replicated-shopping-cart
-import scala.concurrent.duration.DurationInt
+ """)
+ .resolve()
+}
-class ReplicationSettingsSpec extends AnyWordSpec with Matchers {
+class ReplicationSettingsSpec
+ extends ScalaTestWithActorTestKit(ReplicationSettingsSpec.config)
+ with AnyWordSpecLike
+ with Matchers
+ with LogCapturing {
trait MyCommand
"The ReplicationSettings" should {
- "Parse from config" in {
- implicit val system: ActorSystem[Unit] = ActorSystem[Unit](
- Behaviors.empty[Unit],
- "parse-test",
- ConfigFactory.parseString("""
- // #config
- my-replicated-entity {
- # which of the replicas this node belongs to, should be the same
- # across the nodes of each replica Akka cluster.
- self-replica-id = dca
-
- # Pick it up from an environment variable to re-use the same config
- # without changes across replicas
- self-replica-id = ${?SELF_REPLICA}
-
- # max number of parallel in-flight (sent over sharding) entity updates
- # per consumer/projection
- parallel-updates = 8
-
- # Fail the replication stream (and restart with backoff) if completing
- # the write of a replicated event reaching the cluster takes more time
- # than this.
- entity-event-replication-timeout = 10s
-
- replicas: [
- {
- # Unique identifier of the replica/datacenter, is stored in the events
- # and cannot be changed after events have been persisted.
- replica-id = "dca"
-
- # Number of replication streams/projections to start to consume events
- # from this replica
- number-of-consumers = 4
-
- # Akka gRPC client config block for how to reach this replica
- # from the other replicas, note that binding the server/publishing
- # endpoint of each replica is done separately, in code.
- grpc.client {
- host = "dca.example.com"
- port = 8443
- use-tls = true
- }
- },
- {
- replica-id = "dcb"
- number-of-consumers = 4
- # Optional - only run replication stream consumers for events from the
- # remote replica on nodes with this role
- consumers-on-cluster-role = dcb-consumer
- grpc.client {
- host = "dcb.example.com"
- port = 8444
- }
- },
- {
- replica-id = "dcc"
- number-of-consumers = 4
- grpc.client {
- host = "dcc.example.com"
- port = 8445
- }
- }
- ]
- }
- // #config
- """).resolve())
-
- try {
- val settings = ReplicationSettings[MyCommand](
+ "Parse from config with scaladsl" in {
+ val settings = ReplicationSettings[MyCommand](
+ "my-replicated-entity",
+ // never actually used, just passed along
+ null: ReplicationProjectionProvider)
+ settings.streamId should ===("my-replicated-entity")
+ settings.entityEventReplicationTimeout should ===(10.seconds)
+ settings.selfReplicaId.id should ===("dca")
+ settings.otherReplicas.map(_.replicaId.id) should ===(Set("dcb", "dcc"))
+ settings.otherReplicas.forall(_.numberOfConsumers === 4) should ===(true)
+ settings.parallelUpdates should ===(8)
+
+ val replicaB = settings.otherReplicas.find(_.replicaId.id == "dcb").get
+ replicaB.grpcClientSettings.defaultPort should ===(8444)
+ replicaB.grpcClientSettings.serviceName should ===("dcb.example.com")
+ replicaB.consumersOnClusterRole should ===(Some("dcb-consumer"))
+ }
+
+ "Parse from config with javadsl" in {
+ val settings = ReplicationSettings[MyCommand](
+ "my-replicated-entity",
+ // never actually used, just passed along
+ null: ReplicationProjectionProvider)
+ val javaSettings = JReplicationSettings
+ .create(
+ classOf[MyCommand],
"my-replicated-entity",
// never actually used, just passed along
- null: ReplicationProjectionProvider)
- settings.streamId should ===("my-replicated-entity")
- settings.entityEventReplicationTimeout should ===(10.seconds)
- settings.selfReplicaId.id should ===("dca")
- settings.otherReplicas.map(_.replicaId.id) should ===(Set("dcb", "dcc"))
- settings.otherReplicas.forall(_.numberOfConsumers === 4) should ===(true)
- settings.parallelUpdates should ===(8)
-
- val replicaB = settings.otherReplicas.find(_.replicaId.id == "dcb").get
- replicaB.grpcClientSettings.defaultPort should ===(8444)
- replicaB.grpcClientSettings.serviceName should ===("dcb.example.com")
- replicaB.consumersOnClusterRole should ===(Some("dcb-consumer"))
-
- // And Java DSL
- val javaSettings = JReplicationSettings
- .create(
- classOf[MyCommand],
- "my-replicated-entity",
- // never actually used, just passed along
- null: JReplicationProjectionProvider,
- system)
- .withEdgeReplication(true)
-
- val converted = javaSettings.toScala
- converted.selfReplicaId should ===(settings.selfReplicaId)
- converted.streamId should ===(settings.streamId)
- converted.acceptEdgeReplication should ===(true)
-
- converted.otherReplicas.foreach { replica =>
- val scalaReplica = settings.otherReplicas.find(_.replicaId == replica.replicaId).get
- replica.consumersOnClusterRole should ===(scalaReplica.consumersOnClusterRole)
- replica.numberOfConsumers should ===(scalaReplica.numberOfConsumers)
- // no equals on GrpcClientSettings
- replica.grpcClientSettings.serviceName === (scalaReplica.grpcClientSettings.serviceName)
- replica.grpcClientSettings.defaultPort === (scalaReplica.grpcClientSettings.defaultPort)
- replica.grpcClientSettings.useTls === (scalaReplica.grpcClientSettings.useTls)
- }
+ null: JReplicationProjectionProvider,
+ system)
+ .withEdgeReplication(true)
+
+ val converted = javaSettings.toScala
+ converted.selfReplicaId should ===(settings.selfReplicaId)
+ converted.streamId should ===(settings.streamId)
+ converted.acceptEdgeReplication should ===(true)
+
+ converted.otherReplicas.foreach { replica =>
+ val scalaReplica = settings.otherReplicas.find(_.replicaId == replica.replicaId).get
+ replica.consumersOnClusterRole should ===(scalaReplica.consumersOnClusterRole)
+ replica.numberOfConsumers should ===(scalaReplica.numberOfConsumers)
+ // no equals on GrpcClientSettings
+ replica.grpcClientSettings.serviceName === (scalaReplica.grpcClientSettings.serviceName)
+ replica.grpcClientSettings.defaultPort === (scalaReplica.grpcClientSettings.defaultPort)
+ replica.grpcClientSettings.useTls === (scalaReplica.grpcClientSettings.useTls)
+ }
- converted.entityEventReplicationTimeout should ===(settings.entityEventReplicationTimeout)
- converted.entityTypeKey === (settings.entityTypeKey)
- converted.eventProducerInterceptor === (settings.eventProducerInterceptor)
- converted.projectionProvider === (settings.projectionProvider)
- converted.parallelUpdates === (settings.parallelUpdates)
+ converted.entityEventReplicationTimeout should ===(settings.entityEventReplicationTimeout)
+ converted.entityTypeKey === (settings.entityTypeKey)
+ converted.eventProducerInterceptor === (settings.eventProducerInterceptor)
+ converted.projectionProvider === (settings.projectionProvider)
+ converted.parallelUpdates === (settings.parallelUpdates)
+ }
- } finally {
- ActorTestKit.shutdown(system)
- }
+ "Parse from doc config" in {
+ val docSettings = ReplicationSettings[MyCommand](
+ "replicated-shopping-cart",
+ // never actually used, just passed along
+ null: ReplicationProjectionProvider)
+ docSettings.streamId should ===("replicated-shopping-cart")
}
+
}
}
diff --git a/akka-projection-grpc/src/main/resources/reference.conf b/akka-projection-grpc/src/main/resources/reference.conf
index ebfd627fd..30b296691 100644
--- a/akka-projection-grpc/src/main/resources/reference.conf
+++ b/akka-projection-grpc/src/main/resources/reference.conf
@@ -57,6 +57,9 @@ akka.projection.grpc {
replication {
+ # Allow edge replicas to connect and replicate updates
+ accept-edge-replication = off
+
# Replicated event sourcing from edge sends each event over sharding, in case that delivery
# fails or times out, retry this number of times, with an increasing backoff conntrolled by
# the min and max backoff settings
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala
index 4b329005c..8f2a5fac4 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala
@@ -147,7 +147,7 @@ object ReplicationSettings {
parallelUpdates = config.getInt("parallel-updates"),
projectionProvider = replicationProjectionProvider,
eventProducerInterceptor = None,
- acceptEdgeReplication = false,
+ acceptEdgeReplication = replicationConfig.getBoolean("accept-edge-replication"),
configureEntity = identity,
producerFilter = _ => true,
initialConsumerFilter = Vector.empty,
diff --git a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
index 54c48e119..4833bdc4c 100644
--- a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
+++ b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
@@ -116,10 +116,10 @@ accept an entity name, a @apidoc[ReplicationProjectionProvider] and an actor sys
is expected to have a top level entry with the entity name containing this structure:
Scala
-: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config }
+: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config-replicated-shopping-cart }
Java
-: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config }
+: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config-replicated-shopping-cart }
The entries in the block refer to the local replica while `replicas` is a list of all replicas, including the node itself,
with details about how to reach the replicas across the network.
@@ -130,7 +130,12 @@ and other connection options as when using Akka gRPC directly. For more details
It is also possible to set up @apidoc[akka.projection.grpc.replication.*.ReplicationSettings] through APIs only and not rely
on the configuration file at all.
-### Binding the publisher
+### Fully connected topology
+
+In a network topology where each replica cluster can connect to each other replica cluster the configuration should
+list all replicas and gRPC server must be started in each replica.
+
+#### Binding the publisher
Binding the publisher is a manual step to allow arbitrary customization of the Akka HTTP server and combining the endpoint
with other HTTP and gRPC routes.
@@ -172,7 +177,41 @@ Scala
Java
: @@snip [ShoppingCartServer.java](/samples/grpc/shopping-cart-service-java/src/main/resources/grpc.conf) { #http2 }
-### Serialization of events
+### Edge topology
+
+In some use cases it is not possible to use a @ref[fully connected topology](#fully-connected-topology), for example because of firewalls or NAT in front of each producer. The consumer may also not know about all producers up front.
+
+This is typical when using @extref:[Replicated Event Sourcing at the edge](akka-edge:feature-summary.html#replicated-event-sourcing-over-grpc).
+where the connection can only be established from the edge service to the cloud service.
+
+For this purpose, Akka Replicated Event Sourcing gRPC has a mode where the replication streams for both consuming
+and producing events are initiated by one side. In this way a star topology can be defined, and it's possible
+to combine with replicas that are fully connected.
+
+You would still define how to connect to other replicas as described above, but it's only needed on the edge side, and
+it would typically only define one or a few cloud replicas that it will connect to. A gRPC server is not needed on the
+edge side, because there are no incoming connections.
+
+On the edge side you start with `Replication.grpcEdgeReplication`.
+
+Scala
+: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-edge }
+
+Java
+: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-edge }
+
+On the cloud side you would start with `Replication.grpcReplication` as described above, but with the addition
+`withEdgeReplication(true)` in the @apidoc[ReplicationSettings] or enable `akka.projection.grpc.replication.accept-edge-replication`
+configuration.
+
+Scala
+: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-allow-edge }
+
+Java
+: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-allow-edge }
+
+
+## Serialization of events
The events are serialized for being passed over the wire using the same Akka serializer as configured for serializing
the events for storage.
@@ -192,7 +231,7 @@ By default, events from all Replicated Event Sourced entities are replicated.
The same kind of filters as described in @ref:[Akka Projection gRPC Filters](grpc.md#filters) can be used for
Replicated Event Sourcing.
-The producer defined filter:
+The producer filter is defined with `withProducerFilter` or `withProducerFilterTopicExpression` in @apidoc[ReplicationSettings]:
Scala
: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-producerFilter }
@@ -200,18 +239,16 @@ Scala
Java
: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-producerFilter }
-Consumer defined filters are updated as described in @ref:[Akka Projection gRPC Consumer defined filter](grpc.md#consumer-defined-filter)
+The initial consumer filter is defined with `withInitialConsumerFilter` in @apidoc[ReplicationSettings].
+Consumer defined filters can be updated in runtime as described in @ref:[Akka Projection gRPC Consumer defined filter](grpc.md#consumer-defined-filter)
One thing to note is that `streamId` is always the same as the `entityType` when using Replicated Event Sourcing.
The entity id based filter criteria must include the replica id as suffix to the entity id, with `|` separator.
-Replicated Event Sourcing is bidirectional replication, and therefore you would typically have to define the same
-filters on both sides. That is not handled automatically.
-
## Sample projects
-Source code and build files for complete sample projects can be found in the @extref:[Akka Distributed Cluster Guide](akka-distributed-cluster:guide/3-active-active.html).
+Source code and build files for complete sample projects can be found in the @extref:[Akka Distributed Cluster Guide](akka-distributed-cluster:guide/3-active-active.html) and @extref:[Akka Edge Guide](akka-edge:guide.html).
## Security
diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java
index b8af8cd96..7ad153f34 100644
--- a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java
+++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java
@@ -10,6 +10,7 @@
import akka.persistence.query.typed.EventEnvelope;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
+import akka.projection.grpc.replication.javadsl.EdgeReplication;
import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors;
import akka.projection.grpc.replication.javadsl.Replication;
import akka.projection.grpc.replication.javadsl.ReplicationSettings;
@@ -47,6 +48,8 @@ public final class ShoppingCart
extends EventSourcedBehaviorWithEnforcedReplies<
ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
+ public static final String ENTITY_TYPE = "replicated-shopping-cart";
+
static final String SMALL_QUANTITY_TAG = "small";
static final String MEDIUM_QUANTITY_TAG = "medium";
static final String LARGE_QUANTITY_TAG = "large";
@@ -308,7 +311,7 @@ public static Replication init(ActorSystem> system) {
ReplicationSettings replicationSettings =
ReplicationSettings.create(
Command.class,
- "replicated-shopping-cart",
+ ShoppingCart.ENTITY_TYPE,
R2dbcReplication.create(system),
system);
return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system);
@@ -327,18 +330,17 @@ public static Behavior create(
// Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter.
// #init-producerFilter
public static Replication initWithProducerFilter(ActorSystem> system) {
+ Predicate> producerFilter =
+ envelope -> envelope.getTags().contains(VIP_CUSTOMER_TAG);
ReplicationSettings replicationSettings =
ReplicationSettings.create(
Command.class,
- "replicated-shopping-cart",
+ ShoppingCart.ENTITY_TYPE,
R2dbcReplication.create(system),
- system);
+ system)
+ .withProducerFilter(producerFilter);
- Predicate> producerFilter = envelope -> {
- return envelope.getTags().contains(VIP_CUSTOMER_TAG);
- };
-
- return Replication.grpcReplication(replicationSettings, producerFilter, ShoppingCart::createWithProducerFilter, system);
+ return Replication.grpcReplication(replicationSettings, ShoppingCart::createWithProducerFilter, system);
}
public static Behavior createWithProducerFilter(
@@ -354,6 +356,31 @@ public static Behavior createWithProducerFilter(
}
// #init-producerFilter
+ // #init-allow-edge
+ public static Replication initAllowEdge(ActorSystem> system) {
+ ReplicationSettings replicationSettings =
+ ReplicationSettings.create(
+ Command.class,
+ ShoppingCart.ENTITY_TYPE,
+ R2dbcReplication.create(system),
+ system)
+ .withEdgeReplication(true);
+ return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system);
+ }
+ // #init-allow-edge
+
+ // #init-edge
+ public static EdgeReplication initEdge(ActorSystem> system) {
+ ReplicationSettings replicationSettings =
+ ReplicationSettings.create(
+ Command.class,
+ ShoppingCart.ENTITY_TYPE,
+ R2dbcReplication.create(system),
+ system);
+ return Replication.grpcEdgeReplication(replicationSettings, ShoppingCart::create, system);
+ }
+ // #init-edge
+
private final ActorContext context;
private final ReplicationContext replicationContext;
private final boolean isLeader;
diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf
index 125a34756..806175308 100644
--- a/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf
+++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf
@@ -4,6 +4,8 @@ akka.projection.grpc {
}
}
+# Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart`
+# is the same as the ShoppingCart.ENTITY_TYPE.
replicated-shopping-cart {
self-replica-id = replica1
self-replica-id = ${?SELF_REPLICA_ID}
diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf
index 125a34756..b9813995a 100644
--- a/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf
+++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf
@@ -4,6 +4,8 @@ akka.projection.grpc {
}
}
+# Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart`
+# is the same as the ShoppingCart.EntityType.
replicated-shopping-cart {
self-replica-id = replica1
self-replica-id = ${?SELF_REPLICA_ID}
diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala
index 6f5cccc26..42bc4cc9f 100644
--- a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala
+++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala
@@ -20,6 +20,7 @@ import akka.persistence.typed.scaladsl.ReplyEffect
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors
import akka.projection.grpc.replication.scaladsl.Replication
+import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.projection.r2dbc.scaladsl.R2dbcReplication
import akka.serialization.jackson.CborSerializable
@@ -196,12 +197,13 @@ object ShoppingCart {
// the replica they were created (but can be marked VIP at any point in time before being closed)
// #init-producerFilter
def initWithProducerFilter(implicit system: ActorSystem[_]): Replication[Command] = {
- val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication())
val producerFilter: EventEnvelope[Event] => Boolean = { envelope =>
envelope.tags.contains(VipCustomerTag)
}
+ val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication())
+ .withProducerFilter(producerFilter)
- Replication.grpcReplication(replicationSettings, producerFilter)(ShoppingCart.applyWithProducerFilter)
+ Replication.grpcReplication(replicationSettings)(ShoppingCart.applyWithProducerFilter)
}
def applyWithProducerFilter(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = {
@@ -213,6 +215,21 @@ object ShoppingCart {
}
// #init-producerFilter
+ // #init-allow-edge
+ def initAllowEdge(implicit system: ActorSystem[_]): EdgeReplication[Command] = {
+ val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication())
+ .withEdgeReplication(true)
+ Replication.grpcEdgeReplication(replicationSettings)(ShoppingCart.apply)
+ }
+ // #init-allow-edge
+
+ // #init-edge
+ def initEdge(implicit system: ActorSystem[_]): EdgeReplication[Command] = {
+ val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication())
+ Replication.grpcEdgeReplication(replicationSettings)(ShoppingCart.apply)
+ }
+ // #init-edge
+
}
class ShoppingCart(