Skip to content

Commit

Permalink
edge replication in sample, and better doc config
Browse files Browse the repository at this point in the history
* ReplicationSettings withProducerFilter
* grpcEdgeReplication
* rewrite ReplicationSettingsSpec to use ScalaTestWithActorTestKit
  • Loading branch information
patriknw committed Dec 5, 2023
1 parent e8c8f43 commit ba7ac8f
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,143 +4,214 @@

package akka.projection.grpc.replication

import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.projection.grpc.replication.javadsl.{ ReplicationProjectionProvider => JReplicationProjectionProvider }
import akka.projection.grpc.replication.javadsl.{ ReplicationSettings => JReplicationSettings }
import akka.projection.grpc.replication.scaladsl.ReplicationProjectionProvider
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.projection.grpc.replication.javadsl.{ ReplicationSettings => JReplicationSettings }
import akka.projection.grpc.replication.javadsl.{ ReplicationProjectionProvider => JReplicationProjectionProvider }
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.wordspec.AnyWordSpecLike

object ReplicationSettingsSpec {
val config = ConfigFactory
.parseString("""
my-replicated-entity {
# which of the replicas this node belongs to, should be the same
# across the nodes of each replica Akka cluster.
self-replica-id = dca
# Pick it up from an environment variable to re-use the same config
# without changes across replicas
self-replica-id = ${?SELF_REPLICA}
# max number of parallel in-flight (sent over sharding) entity updates
# per consumer/projection
parallel-updates = 8
# Fail the replication stream (and restart with backoff) if completing
# the write of a replicated event reaching the cluster takes more time
# than this.
entity-event-replication-timeout = 10s
replicas: [
{
# Unique identifier of the replica/datacenter, is stored in the events
# and cannot be changed after events have been persisted.
replica-id = "dca"
# Number of replication streams/projections to start to consume events
# from this replica
number-of-consumers = 4
# Akka gRPC client config block for how to reach this replica
# from the other replicas, note that binding the server/publishing
# endpoint of each replica is done separately, in code.
grpc.client {
host = "dca.example.com"
port = 8443
use-tls = true
}
},
{
replica-id = "dcb"
number-of-consumers = 4
# Optional - only run replication stream consumers for events from the
# remote replica on nodes with this role
consumers-on-cluster-role = dcb-consumer
grpc.client {
host = "dcb.example.com"
port = 8444
}
},
{
replica-id = "dcc"
number-of-consumers = 4
grpc.client {
host = "dcc.example.com"
port = 8445
}
}
]
}
// #config-replicated-shopping-cart
# Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart`
# is the same as the ShoppingCart entity type name.
replicated-shopping-cart {
# which of the replicas this node belongs to, should be the same
# across the nodes of each replica Akka cluster.
self-replica-id = us-east-1
# Pick it up from an environment variable to re-use the same config
# without changes across replicas
self-replica-id = ${?SELF_REPLICA}
# max number of parallel in-flight (sent over sharding) entity updates
# per consumer/projection
parallel-updates = 8
# Fail the replication stream (and restart with backoff) if completing
# the write of a replicated event reaching the cluster takes more time
# than this.
entity-event-replication-timeout = 10s
replicas: [
{
# Unique identifier of the replica/datacenter, is stored in the events
# and cannot be changed after events have been persisted.
replica-id = "us-east-1"
# Number of replication streams/projections to start to consume events
# from this replica
number-of-consumers = 4
# Akka gRPC client config block for how to reach this replica
# from the other replicas, note that binding the server/publishing
# endpoint of each replica is done separately, in code.
grpc.client {
host = "k8s-shopping-604179632a-148180922.us-east-2.elb.amazonaws.com"
host = ${?US_EAST_1_GRPC_HOST}
port = 443
port = ${?US_EAST_1_GRPC_PORT}
use-tls = true
}
},
{
replica-id = "eu-west-1"
number-of-consumers = 4
# Optional - only run replication stream consumers for events from the
# remote replica on nodes with this role
consumers-on-cluster-role = replication-consumer
grpc.client {
host = "k8s-shopping-19708e1324-24617530ddc6d2cb.elb.eu-west-1.amazonaws.com"
host = ${?EU_WEST_1_GRPC_HOST}
port = 443
port = ${?EU_WEST_1_GRPC_PORT}
}
}
]
}
// #config-replicated-shopping-cart
import scala.concurrent.duration.DurationInt
""")
.resolve()
}

class ReplicationSettingsSpec extends AnyWordSpec with Matchers {
class ReplicationSettingsSpec
extends ScalaTestWithActorTestKit(ReplicationSettingsSpec.config)
with AnyWordSpecLike
with Matchers
with LogCapturing {

trait MyCommand

"The ReplicationSettings" should {
"Parse from config" in {
implicit val system: ActorSystem[Unit] = ActorSystem[Unit](
Behaviors.empty[Unit],
"parse-test",
ConfigFactory.parseString("""
// #config
my-replicated-entity {
# which of the replicas this node belongs to, should be the same
# across the nodes of each replica Akka cluster.
self-replica-id = dca
# Pick it up from an environment variable to re-use the same config
# without changes across replicas
self-replica-id = ${?SELF_REPLICA}
# max number of parallel in-flight (sent over sharding) entity updates
# per consumer/projection
parallel-updates = 8
# Fail the replication stream (and restart with backoff) if completing
# the write of a replicated event reaching the cluster takes more time
# than this.
entity-event-replication-timeout = 10s
replicas: [
{
# Unique identifier of the replica/datacenter, is stored in the events
# and cannot be changed after events have been persisted.
replica-id = "dca"
# Number of replication streams/projections to start to consume events
# from this replica
number-of-consumers = 4
# Akka gRPC client config block for how to reach this replica
# from the other replicas, note that binding the server/publishing
# endpoint of each replica is done separately, in code.
grpc.client {
host = "dca.example.com"
port = 8443
use-tls = true
}
},
{
replica-id = "dcb"
number-of-consumers = 4
# Optional - only run replication stream consumers for events from the
# remote replica on nodes with this role
consumers-on-cluster-role = dcb-consumer
grpc.client {
host = "dcb.example.com"
port = 8444
}
},
{
replica-id = "dcc"
number-of-consumers = 4
grpc.client {
host = "dcc.example.com"
port = 8445
}
}
]
}
// #config
""").resolve())

try {
val settings = ReplicationSettings[MyCommand](
"Parse from config with scaladsl" in {
val settings = ReplicationSettings[MyCommand](
"my-replicated-entity",
// never actually used, just passed along
null: ReplicationProjectionProvider)
settings.streamId should ===("my-replicated-entity")
settings.entityEventReplicationTimeout should ===(10.seconds)
settings.selfReplicaId.id should ===("dca")
settings.otherReplicas.map(_.replicaId.id) should ===(Set("dcb", "dcc"))
settings.otherReplicas.forall(_.numberOfConsumers === 4) should ===(true)
settings.parallelUpdates should ===(8)

val replicaB = settings.otherReplicas.find(_.replicaId.id == "dcb").get
replicaB.grpcClientSettings.defaultPort should ===(8444)
replicaB.grpcClientSettings.serviceName should ===("dcb.example.com")
replicaB.consumersOnClusterRole should ===(Some("dcb-consumer"))
}

"Parse from config with javadsl" in {
val settings = ReplicationSettings[MyCommand](
"my-replicated-entity",
// never actually used, just passed along
null: ReplicationProjectionProvider)
val javaSettings = JReplicationSettings
.create(
classOf[MyCommand],
"my-replicated-entity",
// never actually used, just passed along
null: ReplicationProjectionProvider)
settings.streamId should ===("my-replicated-entity")
settings.entityEventReplicationTimeout should ===(10.seconds)
settings.selfReplicaId.id should ===("dca")
settings.otherReplicas.map(_.replicaId.id) should ===(Set("dcb", "dcc"))
settings.otherReplicas.forall(_.numberOfConsumers === 4) should ===(true)
settings.parallelUpdates should ===(8)

val replicaB = settings.otherReplicas.find(_.replicaId.id == "dcb").get
replicaB.grpcClientSettings.defaultPort should ===(8444)
replicaB.grpcClientSettings.serviceName should ===("dcb.example.com")
replicaB.consumersOnClusterRole should ===(Some("dcb-consumer"))

// And Java DSL
val javaSettings = JReplicationSettings
.create(
classOf[MyCommand],
"my-replicated-entity",
// never actually used, just passed along
null: JReplicationProjectionProvider,
system)
.withEdgeReplication(true)

val converted = javaSettings.toScala
converted.selfReplicaId should ===(settings.selfReplicaId)
converted.streamId should ===(settings.streamId)
converted.acceptEdgeReplication should ===(true)

converted.otherReplicas.foreach { replica =>
val scalaReplica = settings.otherReplicas.find(_.replicaId == replica.replicaId).get
replica.consumersOnClusterRole should ===(scalaReplica.consumersOnClusterRole)
replica.numberOfConsumers should ===(scalaReplica.numberOfConsumers)
// no equals on GrpcClientSettings
replica.grpcClientSettings.serviceName === (scalaReplica.grpcClientSettings.serviceName)
replica.grpcClientSettings.defaultPort === (scalaReplica.grpcClientSettings.defaultPort)
replica.grpcClientSettings.useTls === (scalaReplica.grpcClientSettings.useTls)
}
null: JReplicationProjectionProvider,
system)
.withEdgeReplication(true)

val converted = javaSettings.toScala
converted.selfReplicaId should ===(settings.selfReplicaId)
converted.streamId should ===(settings.streamId)
converted.acceptEdgeReplication should ===(true)

converted.otherReplicas.foreach { replica =>
val scalaReplica = settings.otherReplicas.find(_.replicaId == replica.replicaId).get
replica.consumersOnClusterRole should ===(scalaReplica.consumersOnClusterRole)
replica.numberOfConsumers should ===(scalaReplica.numberOfConsumers)
// no equals on GrpcClientSettings
replica.grpcClientSettings.serviceName === (scalaReplica.grpcClientSettings.serviceName)
replica.grpcClientSettings.defaultPort === (scalaReplica.grpcClientSettings.defaultPort)
replica.grpcClientSettings.useTls === (scalaReplica.grpcClientSettings.useTls)
}

converted.entityEventReplicationTimeout should ===(settings.entityEventReplicationTimeout)
converted.entityTypeKey === (settings.entityTypeKey)
converted.eventProducerInterceptor === (settings.eventProducerInterceptor)
converted.projectionProvider === (settings.projectionProvider)
converted.parallelUpdates === (settings.parallelUpdates)
converted.entityEventReplicationTimeout should ===(settings.entityEventReplicationTimeout)
converted.entityTypeKey === (settings.entityTypeKey)
converted.eventProducerInterceptor === (settings.eventProducerInterceptor)
converted.projectionProvider === (settings.projectionProvider)
converted.parallelUpdates === (settings.parallelUpdates)
}

} finally {
ActorTestKit.shutdown(system)
}
"Parse from doc config" in {
val docSettings = ReplicationSettings[MyCommand](
"replicated-shopping-cart",
// never actually used, just passed along
null: ReplicationProjectionProvider)
docSettings.streamId should ===("replicated-shopping-cart")
}

}

}
17 changes: 15 additions & 2 deletions docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ accept an entity name, a @apidoc[ReplicationProjectionProvider] and an actor sys
is expected to have a top level entry with the entity name containing this structure:

Scala
: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config }
: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config-replicated-shopping-cart }

Java
: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config }
: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config-replicated-shopping-cart }

The entries in the block refer to the local replica while `replicas` is a list of all replicas, including the node itself,
with details about how to reach the replicas across the network.
Expand Down Expand Up @@ -194,10 +194,23 @@ edge side, because there are no incoming connections.

On the edge side you start with `Replication.grpcEdgeReplication`.

Scala
: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-edge }

Java
: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-edge }

On the cloud side you would start with `Replication.grpcReplication` as described above, but with the addition
`withEdgeReplication(true)` in the @apidoc[ReplicationSettings] or enable `akka.projection.grpc.replication.accept-edge-replication`
configuration.

Scala
: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-allow-edge }

Java
: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-allow-edge }


## Serialization of events

The events are serialized for being passed over the wire using the same Akka serializer as configured for serializing
Expand Down
Loading

0 comments on commit ba7ac8f

Please sign in to comment.