Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Some more RES edge tests #1078

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import akka.Done
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand All @@ -31,6 +32,7 @@ import akka.projection.grpc.consumer.ConsumerFilter.UpdateFilter
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.replication.scaladsl.Replica
import akka.projection.grpc.replication.scaladsl.Replication
import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.projection.r2dbc.R2dbcProjectionSettings
import akka.projection.r2dbc.scaladsl.R2dbcReplication
Expand Down Expand Up @@ -159,23 +161,59 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
}

def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = {
val otherReplicas = selfReplicaId match {
case CloudReplicaA => allReplicas.filterNot(_.replicaId == CloudReplicaA)
case CloudReplicaB => allReplicas.filterNot(_.replicaId == CloudReplicaB)
case EdgeReplicaC => allReplicas.filter(_.replicaId == CloudReplicaA)
case EdgeReplicaD => allReplicas.filter(_.replicaId == CloudReplicaA)
case other => throw new IllegalArgumentException(other.id)
def replicationSettings(otherReplicas: Set[Replica]) = {
ReplicationSettings[LWWHelloWorld.Command](
LWWHelloWorld.EntityType.name,
selfReplicaId,
EventProducerSettings(replicaSystem),
otherReplicas,
10.seconds,
8,
R2dbcReplication())
.withEdgeReplication(true)
}

val settings = ReplicationSettings[LWWHelloWorld.Command](
LWWHelloWorld.EntityType.name,
selfReplicaId,
EventProducerSettings(replicaSystem),
otherReplicas,
10.seconds,
8,
R2dbcReplication())
Replication.grpcReplication(settings)(LWWHelloWorld.apply)(replicaSystem)
selfReplicaId match {
case CloudReplicaA =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaB)
Replication.grpcReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case CloudReplicaB =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA)
Replication.grpcReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case other =>
throw new IllegalArgumentException(other.id)
}
}

def startEdgeReplica(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this test to use grpcEdgeReplication

replicaSystem: ActorSystem[_],
selfReplicaId: ReplicaId): EdgeReplication[LWWHelloWorld.Command] = {
def replicationSettings(otherReplicas: Set[Replica]) = {
ReplicationSettings[LWWHelloWorld.Command](
LWWHelloWorld.EntityType.name,
selfReplicaId,
EventProducerSettings(replicaSystem),
otherReplicas,
10.seconds,
8,
R2dbcReplication())
.withEdgeReplication(true)
}

selfReplicaId match {
case EdgeReplicaC =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA)
Replication.grpcEdgeReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case EdgeReplicaD =>
val otherReplicas = allReplicas.filter(_.replicaId == CloudReplicaA)
Replication.grpcEdgeReplication(replicationSettings(otherReplicas))(LWWHelloWorld.apply)(replicaSystem)

case other =>
throw new IllegalArgumentException(other.id)
}
}

def assertGreeting(entityId: String, expected: String): Unit = {
Expand Down Expand Up @@ -217,19 +255,41 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
replica.replicaId,
system.name,
replica.grpcClientSettings.defaultPort)
val started = startReplica(system, replica.replicaId)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
.map(_ => replica.replicaId -> started)
if (replica.replicaId == CloudReplicaA || replica.replicaId == CloudReplicaB) {

val started = startReplica(system, replica.replicaId)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
.map(_ => Done)
} else {
startEdgeReplica(system, replica.replicaId)
Future.successful(Done)
}
})

replicasStarted.futureValue
logger.info("All three replication/producer services bound")
logger.info("All replication/producer services bound")
}

"replicate directly" in {
val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A1", _))
.futureValue
assertGreeting(entityId, "Hello from A1")

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A2", _))
.futureValue
assertGreeting(entityId, "Hello from A2")
}

"replicate indirectly" in {
Expand All @@ -238,9 +298,43 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
// Edge replicas are only connected to CloudReplicaA
ClusterSharding(systemPerDc(CloudReplicaB))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from B", _))
.ask(LWWHelloWorld.SetGreeting("Hello from B1", _))
.futureValue
assertGreeting(entityId, "Hello from B1")

ClusterSharding(systemPerDc(CloudReplicaB))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from B2", _))
.futureValue
assertGreeting(entityId, "Hello from B2")
}

"replicate both directions" in {
val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A1", _))
.futureValue
assertGreeting(entityId, "Hello from A1")

ClusterSharding(systemPerDc(EdgeReplicaC))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from C1", _))
.futureValue
assertGreeting(entityId, "Hello from C1")

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from A2", _))
.futureValue
assertGreeting(entityId, "Hello from A2")

ClusterSharding(systemPerDc(EdgeReplicaC))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from C2", _))
.futureValue
assertGreeting(entityId, "Hello from B")
assertGreeting(entityId, "Hello from C2")
}

"replicate writes from one dc to the other DCs" in {
Expand Down Expand Up @@ -290,7 +384,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
})
}
})
.futureValue // all three updated in roughly parallel
.futureValue // all updated in roughly parallel

// All 3 should eventually arrive at the same value
testKit
Expand Down Expand Up @@ -384,7 +478,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
}

protected override def afterAll(): Unit = {
logger.info("Shutting down all three DCs")
logger.info("Shutting down all DCs")
systems.foreach(_.terminate()) // speed up termination by terminating all at the once
// and then make sure they are completely shutdown
systems.foreach { system =>
Expand Down
Loading