From 55b02ccb70a8fb143f6aeb4da5841a2fb6f606d3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 21 Nov 2023 14:35:38 +0100 Subject: [PATCH] test of edge consumer filter --- .../src/it/resources/db/default-init.sql | 16 +- ...a => EdgeReplicationIntegrationSpec.scala} | 155 +++++++++++++++--- .../ReplicationIntegrationSpec.scala | 40 +++-- 3 files changed, 172 insertions(+), 39 deletions(-) rename akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/{IndirectReplicationIntegrationSpec.scala => EdgeReplicationIntegrationSpec.scala} (63%) diff --git a/akka-projection-grpc-tests/src/it/resources/db/default-init.sql b/akka-projection-grpc-tests/src/it/resources/db/default-init.sql index 2351d3f77..38564f87e 100644 --- a/akka-projection-grpc-tests/src/it/resources/db/default-init.sql +++ b/akka-projection-grpc-tests/src/it/resources/db/default-init.sql @@ -136,4 +136,18 @@ CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCC ( -- the consumer lag is timestamp_consumed - timestamp_offset timestamp_consumed timestamp with time zone NOT NULL, PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) -); \ No newline at end of file +); + +CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCD ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset timestamp with time zone NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed timestamp with time zone NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) +); diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/IndirectReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala similarity index 63% rename from akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/IndirectReplicationIntegrationSpec.scala rename to akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala index 7f89bd046..5912127b7 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/IndirectReplicationIntegrationSpec.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala @@ -23,7 +23,11 @@ import akka.grpc.GrpcClientSettings import akka.http.scaladsl.Http import akka.persistence.typed.ReplicaId import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestData import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.consumer.ConsumerFilter.IncludeTags +import akka.projection.grpc.consumer.ConsumerFilter.UpdateFilter import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.replication.scaladsl.Replica import akka.projection.grpc.replication.scaladsl.Replication @@ -37,7 +41,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory -object IndirectReplicationIntegrationSpec { +object EdgeReplicationIntegrationSpec { private def config(dc: ReplicaId): Config = ConfigFactory.parseString(s""" @@ -71,57 +75,72 @@ object IndirectReplicationIntegrationSpec { } """) - private val DCA = ReplicaId("DCA") - private val DCB = ReplicaId("DCB") - private val DCC = ReplicaId("DCC") + private val CloudReplicaA = ReplicaId("DCA") + private val CloudReplicaB = ReplicaId("DCB") + private val EdgeReplicaC = ReplicaId("DCC") + private val EdgeReplicaD = ReplicaId("DCD") } -class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf) +class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf) extends ScalaTestWithActorTestKit( akka.actor .ActorSystem( - "IndirectReplicationIntegrationSpecA", - IndirectReplicationIntegrationSpec - .config(IndirectReplicationIntegrationSpec.DCA) + "EdgeReplicationIntegrationSpecA", + EdgeReplicationIntegrationSpec + .config(EdgeReplicationIntegrationSpec.CloudReplicaA) .withFallback(testContainerConf.config)) .toTyped) with AnyWordSpecLike with TestDbLifecycle with BeforeAndAfterAll - with LogCapturing { - import IndirectReplicationIntegrationSpec._ + with LogCapturing + with TestData { + import EdgeReplicationIntegrationSpec._ import ReplicationIntegrationSpec.LWWHelloWorld implicit val ec: ExecutionContext = system.executionContext def this() = this(new TestContainerConf) - private val logger = LoggerFactory.getLogger(classOf[IndirectReplicationIntegrationSpec]) + private val logger = LoggerFactory.getLogger(classOf[EdgeReplicationIntegrationSpec]) override def typedSystem: ActorSystem[_] = testKit.system private val systems = Seq[ActorSystem[_]]( typedSystem, akka.actor .ActorSystem( - "IndirectReplicationIntegrationSpecB", - IndirectReplicationIntegrationSpec.config(DCB).withFallback(testContainerConf.config)) + "EdgeReplicationIntegrationSpecB", + EdgeReplicationIntegrationSpec.config(CloudReplicaB).withFallback(testContainerConf.config)) .toTyped, akka.actor .ActorSystem( - "IndirectReplicationIntegrationSpecC", - IndirectReplicationIntegrationSpec.config(DCC).withFallback(testContainerConf.config)) + "EdgeReplicationIntegrationSpecC", + EdgeReplicationIntegrationSpec.config(EdgeReplicaC).withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationSpecD", + EdgeReplicationIntegrationSpec.config(EdgeReplicaD).withFallback(testContainerConf.config)) .toTyped) private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort) - private val allDcsAndPorts = Seq(DCA, DCB, DCC).zip(grpcPorts) + private val allDcsAndPorts = Seq(CloudReplicaA, CloudReplicaB, EdgeReplicaC, EdgeReplicaD).zip(grpcPorts) private val allReplicas = allDcsAndPorts.map { case (id, port) => Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false)) }.toSet - private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2))) - private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> systems(2)) - private val entityIds = Set("one", "two", "three") + private val testKitsPerDc = Map( + CloudReplicaA -> testKit, + CloudReplicaB -> ActorTestKit(systems(1)), + EdgeReplicaC -> ActorTestKit(systems(2)), + EdgeReplicaD -> ActorTestKit(systems(3))) + private val systemPerDc = + Map(CloudReplicaA -> system, CloudReplicaB -> systems(1), EdgeReplicaC -> systems(2), EdgeReplicaD -> systems(3)) + private val entityIds = Set( + nextPid(LWWHelloWorld.EntityType.name).entityId, + nextPid(LWWHelloWorld.EntityType.name).entityId, + nextPid(LWWHelloWorld.EntityType.name).entityId) override protected def beforeAll(): Unit = { super.beforeAll() @@ -141,10 +160,11 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf) def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { val otherReplicas = selfReplicaId match { - case DCA => allReplicas.filter(r => r.replicaId == DCB || r.replicaId == DCC) - case DCB => allReplicas.filter(_.replicaId == DCA) - case DCC => allReplicas.filter(_.replicaId == DCA) - case other => throw new IllegalArgumentException(other.id) + case CloudReplicaA => allReplicas.filterNot(_.replicaId == CloudReplicaA) + case CloudReplicaB => allReplicas.filterNot(_.replicaId == CloudReplicaB) + case EdgeReplicaC => allReplicas.filter(_.replicaId == CloudReplicaA) + case EdgeReplicaD => allReplicas.filter(_.replicaId == CloudReplicaA) + case other => throw new IllegalArgumentException(other.id) } val settings = ReplicationSettings[LWWHelloWorld.Command]( @@ -177,7 +197,7 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf) } "Replication over gRPC" should { - "form three one node clusters" in { + "form one node clusters" in { testKitsPerDc.values.foreach { testKit => val cluster = Cluster(testKit.system) cluster.manager ! Join(cluster.selfMember.address) @@ -187,7 +207,7 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf) } } - "start three replicas" in { + "start replicas" in { val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map { case (replica, index) => val system = systems(index) @@ -212,7 +232,18 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf) logger.info("All three replication/producer services bound") } - "replicate writes from one dc to the other two" in { + "replicate indirectly" in { + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + + // Edge replicas are only connected to CloudReplicaA + ClusterSharding(systemPerDc(CloudReplicaB)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from B", _)) + .futureValue + assertGreeting(entityId, "Hello from B") + } + + "replicate writes from one dc to the other DCs" in { systemPerDc.keys.foreach { dc => withClue(s"from ${dc.id}") { Future @@ -284,6 +315,78 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf) } } + "use consumer filter on tag" in { + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello All", _)) + .futureValue + + ConsumerFilter(systemPerDc(EdgeReplicaC)).ref ! UpdateFilter( + LWWHelloWorld.EntityType.name, + List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-C")))) + ConsumerFilter(systemPerDc(EdgeReplicaD)).ref ! UpdateFilter( + LWWHelloWorld.EntityType.name, + List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-D")))) + + // let the filter propagate to producer + Thread.sleep(1000) + + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetTag("tag-C", _)) + .futureValue + + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello C", _)) + .futureValue + + eventually { + ClusterSharding(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello C" + } + + // but not updated in D + ClusterSharding(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello All" + + // change tag + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetTag("tag-D", _)) + .futureValue + + // previous greeting should be replicated + eventually { + ClusterSharding(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello C" + } + + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello D", _)) + .futureValue + eventually { + ClusterSharding(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello D" + } + + // but not updated in C + ClusterSharding(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello C" + } + protected override def afterAll(): Unit = { logger.info("Shutting down all three DCs") systems.foreach(_.terminate()) // speed up termination by terminating all at the once diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala index e1da530f2..5a652bd47 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala @@ -87,27 +87,27 @@ object ReplicationIntegrationSpec { val EntityType: EntityTypeKey[Command] = EntityTypeKey[Command]("hello-world") sealed trait Command - - case class Get(replyTo: ActorRef[String]) extends Command - - case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command + final case class Get(replyTo: ActorRef[String]) extends Command + final case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command + final case class SetTag(tag: String, replyTo: ActorRef[Done]) extends Command sealed trait Event - - case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event + final case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event + final case class TagChanged(tag: String, timestamp: LwwTime) extends Event object State { - val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId(""))) + val initial = + State("Hello world", LwwTime(Long.MinValue, ReplicaId("")), "", LwwTime(Long.MinValue, ReplicaId(""))) } - case class State(greeting: String, timestamp: LwwTime) + case class State(greeting: String, greetingTimestamp: LwwTime, tag: String, tagTimestamp: LwwTime) def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) = replicatedBehaviors.setup { replicationContext => EventSourcedBehavior[Command, Event, State]( replicationContext.persistenceId, State.initial, { - case (State(greeting, _), Get(replyTo)) => + case (State(greeting, _, _, _), Get(replyTo)) => replyTo ! greeting Effect.none case (state, SetGreeting(greeting, replyTo)) => @@ -115,14 +115,30 @@ object ReplicationIntegrationSpec { .persist( GreetingChanged( greeting, - state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) + state.greetingTimestamp + .increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) + .thenRun((_: State) => replyTo ! Done) + case (state, SetTag(tag, replyTo)) => + Effect + .persist( + TagChanged( + tag, + state.greetingTimestamp + .increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) .thenRun((_: State) => replyTo ! Done) }, { case (currentState, GreetingChanged(newGreeting, newTimestamp)) => - if (newTimestamp.isAfter(currentState.timestamp)) - State(newGreeting, newTimestamp) + if (newTimestamp.isAfter(currentState.greetingTimestamp)) + currentState.copy(newGreeting, newTimestamp) + else currentState + case (currentState, TagChanged(newTag, newTimestamp)) => + if (newTimestamp.isAfter(currentState.tagTimestamp)) + currentState.copy(tag = newTag, tagTimestamp = newTimestamp) else currentState }) + .withTaggerForState { + case (state, _) => if (state.tag == "") Set.empty else Set(state.tag) + } } } }