Skip to content

Commit

Permalink
test: Some more RES edge tests (#1078)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Dec 4, 2023
1 parent 871a817 commit 751de86
Show file tree
Hide file tree
Showing 3 changed files with 575 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import akka.Done
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand All @@ -31,6 +32,7 @@ import akka.projection.grpc.consumer.ConsumerFilter.UpdateFilter
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.replication.scaladsl.Replica
import akka.projection.grpc.replication.scaladsl.Replication
import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.projection.r2dbc.R2dbcProjectionSettings
import akka.projection.r2dbc.scaladsl.R2dbcReplication
Expand Down Expand Up @@ -159,23 +161,59 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
}

def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = {
val otherReplicas = selfReplicaId match {
case CloudReplicaA => allReplicas.filterNot(_.replicaId == CloudReplicaA)
case CloudReplicaB => allReplicas.filterNot(_.replicaId == CloudReplicaB)
case EdgeReplicaC => allReplicas.filter(_.replicaId == CloudReplicaA)
case EdgeReplicaD => allReplicas.filter(_.replicaId == CloudReplicaA)
case other => throw new IllegalArgumentException(other.id)
def replicationSettings(otherReplicas: Set[Replica]) = {
ReplicationSettings[LWWHelloWorld.Command](
LWWHelloWorld.EntityType.name,
selfReplicaId,
EventProducerSettings(replicaSystem),
otherReplicas,
10.seconds,
8,
R2dbcReplication())
.withEdgeReplication(true)
}

val settings = ReplicationSettings[LWWHelloWorld.Command](
LWWHelloWorld.EntityType.name,
selfReplicaId,
EventProducerSettings(replicaSystem),
otherReplicas,
10.seconds,
8,
R2dbcReplication())
Replication.grpcReplication(settings)(LWWHelloWorld.apply)(replicaSystem)
selfReplicaId match {
case CloudReplicaA =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaB)
Replication.grpcReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case CloudReplicaB =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA)
Replication.grpcReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case other =>
throw new IllegalArgumentException(other.id)
}
}

def startEdgeReplica(
replicaSystem: ActorSystem[_],
selfReplicaId: ReplicaId): EdgeReplication[LWWHelloWorld.Command] = {
def replicationSettings(otherReplicas: Set[Replica]) = {
ReplicationSettings[LWWHelloWorld.Command](
LWWHelloWorld.EntityType.name,
selfReplicaId,
EventProducerSettings(replicaSystem),
otherReplicas,
10.seconds,
8,
R2dbcReplication())
.withEdgeReplication(true)
}

selfReplicaId match {
case EdgeReplicaC =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA)
Replication.grpcEdgeReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case EdgeReplicaD =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA)
Replication.grpcEdgeReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case other =>
throw new IllegalArgumentException(other.id)
}
}

def assertGreeting(entityId: String, expected: String): Unit = {
Expand Down Expand Up @@ -217,19 +255,41 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
replica.replicaId,
system.name,
replica.grpcClientSettings.defaultPort)
val started = startReplica(system, replica.replicaId)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
.map(_ => replica.replicaId -> started)
if (replica.replicaId == CloudReplicaA || replica.replicaId == CloudReplicaB) {

val started = startReplica(system, replica.replicaId)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
.map(_ => Done)
} else {
startEdgeReplica(system, replica.replicaId)
Future.successful(Done)
}
})

replicasStarted.futureValue
logger.info("All three replication/producer services bound")
logger.info("All replication/producer services bound")
}

"replicate directly" in {
val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A1", _))
.futureValue
assertGreeting(entityId, "Hello from A1")

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A2", _))
.futureValue
assertGreeting(entityId, "Hello from A2")
}

"replicate indirectly" in {
Expand All @@ -238,9 +298,43 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
// Edge replicas are only connected to CloudReplicaA
ClusterSharding(systemPerDc(CloudReplicaB))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from B", _))
.ask(LWWHelloWorld.SetGreeting("Hello from B1", _))
.futureValue
assertGreeting(entityId, "Hello from B1")

ClusterSharding(systemPerDc(CloudReplicaB))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from B2", _))
.futureValue
assertGreeting(entityId, "Hello from B2")
}

"replicate both directions" in {
val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A1", _))
.futureValue
assertGreeting(entityId, "Hello from A1")

ClusterSharding(systemPerDc(EdgeReplicaC))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from C1", _))
.futureValue
assertGreeting(entityId, "Hello from C1")

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A2", _))
.futureValue
assertGreeting(entityId, "Hello from A2")

ClusterSharding(systemPerDc(EdgeReplicaC))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from C2", _))
.futureValue
assertGreeting(entityId, "Hello from B")
assertGreeting(entityId, "Hello from C2")
}

"replicate writes from one dc to the other DCs" in {
Expand Down Expand Up @@ -290,7 +384,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
})
}
})
.futureValue // all three updated in roughly parallel
.futureValue // all updated in roughly parallel

// All 3 should eventually arrive at the same value
testKit
Expand Down Expand Up @@ -384,7 +478,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
}

protected override def afterAll(): Unit = {
logger.info("Shutting down all three DCs")
logger.info("Shutting down all DCs")
systems.foreach(_.terminate()) // speed up termination by terminating all at the once
// and then make sure they are completely shutdown
systems.foreach { system =>
Expand Down
Loading

0 comments on commit 751de86

Please sign in to comment.