Skip to content

Commit

Permalink
test of edge consumer filter
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 21, 2023
1 parent 97cfa8d commit 55b02cc
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 39 deletions.
16 changes: 15 additions & 1 deletion akka-projection-grpc-tests/src/it/resources/db/default-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,18 @@ CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCC (
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);
);

CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCD (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
slice INT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
-- timestamp_offset is the db_timestamp of the original event
timestamp_offset timestamp with time zone NOT NULL,
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import akka.grpc.GrpcClientSettings
import akka.http.scaladsl.Http
import akka.persistence.typed.ReplicaId
import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestData
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.ConsumerFilter.IncludeTags
import akka.projection.grpc.consumer.ConsumerFilter.UpdateFilter
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.replication.scaladsl.Replica
import akka.projection.grpc.replication.scaladsl.Replication
Expand All @@ -37,7 +41,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

object IndirectReplicationIntegrationSpec {
object EdgeReplicationIntegrationSpec {

private def config(dc: ReplicaId): Config =
ConfigFactory.parseString(s"""
Expand Down Expand Up @@ -71,57 +75,72 @@ object IndirectReplicationIntegrationSpec {
}
""")

private val DCA = ReplicaId("DCA")
private val DCB = ReplicaId("DCB")
private val DCC = ReplicaId("DCC")
private val CloudReplicaA = ReplicaId("DCA")
private val CloudReplicaB = ReplicaId("DCB")
private val EdgeReplicaC = ReplicaId("DCC")
private val EdgeReplicaD = ReplicaId("DCD")

}

class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)
class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
extends ScalaTestWithActorTestKit(
akka.actor
.ActorSystem(
"IndirectReplicationIntegrationSpecA",
IndirectReplicationIntegrationSpec
.config(IndirectReplicationIntegrationSpec.DCA)
"EdgeReplicationIntegrationSpecA",
EdgeReplicationIntegrationSpec
.config(EdgeReplicationIntegrationSpec.CloudReplicaA)
.withFallback(testContainerConf.config))
.toTyped)
with AnyWordSpecLike
with TestDbLifecycle
with BeforeAndAfterAll
with LogCapturing {
import IndirectReplicationIntegrationSpec._
with LogCapturing
with TestData {
import EdgeReplicationIntegrationSpec._
import ReplicationIntegrationSpec.LWWHelloWorld
implicit val ec: ExecutionContext = system.executionContext

def this() = this(new TestContainerConf)

private val logger = LoggerFactory.getLogger(classOf[IndirectReplicationIntegrationSpec])
private val logger = LoggerFactory.getLogger(classOf[EdgeReplicationIntegrationSpec])
override def typedSystem: ActorSystem[_] = testKit.system

private val systems = Seq[ActorSystem[_]](
typedSystem,
akka.actor
.ActorSystem(
"IndirectReplicationIntegrationSpecB",
IndirectReplicationIntegrationSpec.config(DCB).withFallback(testContainerConf.config))
"EdgeReplicationIntegrationSpecB",
EdgeReplicationIntegrationSpec.config(CloudReplicaB).withFallback(testContainerConf.config))
.toTyped,
akka.actor
.ActorSystem(
"IndirectReplicationIntegrationSpecC",
IndirectReplicationIntegrationSpec.config(DCC).withFallback(testContainerConf.config))
"EdgeReplicationIntegrationSpecC",
EdgeReplicationIntegrationSpec.config(EdgeReplicaC).withFallback(testContainerConf.config))
.toTyped,
akka.actor
.ActorSystem(
"EdgeReplicationIntegrationSpecD",
EdgeReplicationIntegrationSpec.config(EdgeReplicaD).withFallback(testContainerConf.config))
.toTyped)

private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort)
private val allDcsAndPorts = Seq(DCA, DCB, DCC).zip(grpcPorts)
private val allDcsAndPorts = Seq(CloudReplicaA, CloudReplicaB, EdgeReplicaC, EdgeReplicaD).zip(grpcPorts)
private val allReplicas = allDcsAndPorts.map {
case (id, port) =>
Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false))
}.toSet

private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2)))
private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> systems(2))
private val entityIds = Set("one", "two", "three")
private val testKitsPerDc = Map(
CloudReplicaA -> testKit,
CloudReplicaB -> ActorTestKit(systems(1)),
EdgeReplicaC -> ActorTestKit(systems(2)),
EdgeReplicaD -> ActorTestKit(systems(3)))
private val systemPerDc =
Map(CloudReplicaA -> system, CloudReplicaB -> systems(1), EdgeReplicaC -> systems(2), EdgeReplicaD -> systems(3))
private val entityIds = Set(
nextPid(LWWHelloWorld.EntityType.name).entityId,
nextPid(LWWHelloWorld.EntityType.name).entityId,
nextPid(LWWHelloWorld.EntityType.name).entityId)

override protected def beforeAll(): Unit = {
super.beforeAll()
Expand All @@ -141,10 +160,11 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)

def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = {
val otherReplicas = selfReplicaId match {
case DCA => allReplicas.filter(r => r.replicaId == DCB || r.replicaId == DCC)
case DCB => allReplicas.filter(_.replicaId == DCA)
case DCC => allReplicas.filter(_.replicaId == DCA)
case other => throw new IllegalArgumentException(other.id)
case CloudReplicaA => allReplicas.filterNot(_.replicaId == CloudReplicaA)
case CloudReplicaB => allReplicas.filterNot(_.replicaId == CloudReplicaB)
case EdgeReplicaC => allReplicas.filter(_.replicaId == CloudReplicaA)
case EdgeReplicaD => allReplicas.filter(_.replicaId == CloudReplicaA)
case other => throw new IllegalArgumentException(other.id)
}

val settings = ReplicationSettings[LWWHelloWorld.Command](
Expand Down Expand Up @@ -177,7 +197,7 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)
}

"Replication over gRPC" should {
"form three one node clusters" in {
"form one node clusters" in {
testKitsPerDc.values.foreach { testKit =>
val cluster = Cluster(testKit.system)
cluster.manager ! Join(cluster.selfMember.address)
Expand All @@ -187,7 +207,7 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)
}
}

"start three replicas" in {
"start replicas" in {
val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map {
case (replica, index) =>
val system = systems(index)
Expand All @@ -212,7 +232,18 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)
logger.info("All three replication/producer services bound")
}

"replicate writes from one dc to the other two" in {
"replicate indirectly" in {
val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId

// Edge replicas are only connected to CloudReplicaA
ClusterSharding(systemPerDc(CloudReplicaB))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello from B", _))
.futureValue
assertGreeting(entityId, "Hello from B")
}

"replicate writes from one dc to the other DCs" in {
systemPerDc.keys.foreach { dc =>
withClue(s"from ${dc.id}") {
Future
Expand Down Expand Up @@ -284,6 +315,78 @@ class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)
}
}

"use consumer filter on tag" in {
val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId
ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello All", _))
.futureValue

ConsumerFilter(systemPerDc(EdgeReplicaC)).ref ! UpdateFilter(
LWWHelloWorld.EntityType.name,
List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-C"))))
ConsumerFilter(systemPerDc(EdgeReplicaD)).ref ! UpdateFilter(
LWWHelloWorld.EntityType.name,
List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-D"))))

// let the filter propagate to producer
Thread.sleep(1000)

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetTag("tag-C", _))
.futureValue

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello C", _))
.futureValue

eventually {
ClusterSharding(systemPerDc(EdgeReplicaC))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.Get(_))
.futureValue shouldBe "Hello C"
}

// but not updated in D
ClusterSharding(systemPerDc(EdgeReplicaD))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.Get(_))
.futureValue shouldBe "Hello All"

// change tag
ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetTag("tag-D", _))
.futureValue

// previous greeting should be replicated
eventually {
ClusterSharding(systemPerDc(EdgeReplicaD))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.Get(_))
.futureValue shouldBe "Hello C"
}

ClusterSharding(systemPerDc(CloudReplicaA))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting("Hello D", _))
.futureValue
eventually {
ClusterSharding(systemPerDc(EdgeReplicaD))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.Get(_))
.futureValue shouldBe "Hello D"
}

// but not updated in C
ClusterSharding(systemPerDc(EdgeReplicaC))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.Get(_))
.futureValue shouldBe "Hello C"
}

protected override def afterAll(): Unit = {
logger.info("Shutting down all three DCs")
systems.foreach(_.terminate()) // speed up termination by terminating all at the once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,42 +87,58 @@ object ReplicationIntegrationSpec {
val EntityType: EntityTypeKey[Command] = EntityTypeKey[Command]("hello-world")

sealed trait Command

case class Get(replyTo: ActorRef[String]) extends Command

case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command
final case class Get(replyTo: ActorRef[String]) extends Command
final case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command
final case class SetTag(tag: String, replyTo: ActorRef[Done]) extends Command

sealed trait Event

case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event
final case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event
final case class TagChanged(tag: String, timestamp: LwwTime) extends Event

object State {
val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId("")))
val initial =
State("Hello world", LwwTime(Long.MinValue, ReplicaId("")), "", LwwTime(Long.MinValue, ReplicaId("")))
}

case class State(greeting: String, timestamp: LwwTime)
case class State(greeting: String, greetingTimestamp: LwwTime, tag: String, tagTimestamp: LwwTime)

def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) =
replicatedBehaviors.setup { replicationContext =>
EventSourcedBehavior[Command, Event, State](
replicationContext.persistenceId,
State.initial, {
case (State(greeting, _), Get(replyTo)) =>
case (State(greeting, _, _, _), Get(replyTo)) =>
replyTo ! greeting
Effect.none
case (state, SetGreeting(greeting, replyTo)) =>
Effect
.persist(
GreetingChanged(
greeting,
state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)))
state.greetingTimestamp
.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)))
.thenRun((_: State) => replyTo ! Done)
case (state, SetTag(tag, replyTo)) =>
Effect
.persist(
TagChanged(
tag,
state.greetingTimestamp
.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)))
.thenRun((_: State) => replyTo ! Done)
}, {
case (currentState, GreetingChanged(newGreeting, newTimestamp)) =>
if (newTimestamp.isAfter(currentState.timestamp))
State(newGreeting, newTimestamp)
if (newTimestamp.isAfter(currentState.greetingTimestamp))
currentState.copy(newGreeting, newTimestamp)
else currentState
case (currentState, TagChanged(newTag, newTimestamp)) =>
if (newTimestamp.isAfter(currentState.tagTimestamp))
currentState.copy(tag = newTag, tagTimestamp = newTimestamp)
else currentState
})
.withTaggerForState {
case (state, _) => if (state.tag == "") Set.empty else Set(state.tag)
}
}
}
}
Expand Down

0 comments on commit 55b02cc

Please sign in to comment.