From 4feeec73a1317a1c46e4bdc821293a48e8547897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 20 Nov 2023 17:48:02 +0100 Subject: [PATCH] WIP: a first test in place (not passing) --- .../src/it/resources/db/default-init.sql | 14 + .../PushReplicationIntegrationSpec.scala | 358 ++++++++++++++++++ .../EventPusherConsumerServiceImpl.scala | 12 +- .../internal/ReplicationImpl.scala | 3 +- 4 files changed, 385 insertions(+), 2 deletions(-) create mode 100644 akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/PushReplicationIntegrationSpec.scala diff --git a/akka-projection-grpc-tests/src/it/resources/db/default-init.sql b/akka-projection-grpc-tests/src/it/resources/db/default-init.sql index 2351d3f77..f717fdf82 100644 --- a/akka-projection-grpc-tests/src/it/resources/db/default-init.sql +++ b/akka-projection-grpc-tests/src/it/resources/db/default-init.sql @@ -136,4 +136,18 @@ CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCC ( -- the consumer lag is timestamp_consumed - timestamp_offset timestamp_consumed timestamp with time zone NOT NULL, PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) +); + +CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_EdgeA ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset timestamp with time zone NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed timestamp with time zone NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) ); \ No newline at end of file diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/PushReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/PushReplicationIntegrationSpec.scala new file mode 100644 index 000000000..1944a4974 --- /dev/null +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/PushReplicationIntegrationSpec.scala @@ -0,0 +1,358 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.MemberStatus +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.Http +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.crdt.LwwTime +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.replication +import akka.projection.grpc.replication.scaladsl.Replica +import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.R2dbcProjectionSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.testkit.SocketUtil +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +object PushReplicationIntegrationSpec { + + private def config(dc: ReplicaId): Config = + ConfigFactory.parseString(s""" + akka.actor.provider = cluster + akka.actor { + serialization-bindings { + "${classOf[replication.PushReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json + } + } + akka.http.server.preview.enable-http2 = on + akka.persistence.r2dbc { + query { + refresh-interval = 500 millis + # reducing this to have quicker test, triggers backtracking earlier + backtracking.behind-current-time = 3 seconds + } + } + akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } + } + akka.projection.r2dbc.offset-store { + timestamp-offset-table = "akka_projection_timestamp_offset_store_${dc.id}" + } + akka.remote.artery.canonical.host = "127.0.0.1" + akka.remote.artery.canonical.port = 0 + akka.actor.testkit.typed { + filter-leeway = 10s + system-shutdown-default = 30s + } + """) + + private val DCA = ReplicaId("DCA") + private val DCB = ReplicaId("DCB") + private val EdgeReplicaA = ReplicaId("EdgeA") + + object LWWHelloWorld { + + sealed trait Command + + case class Get(replyTo: ActorRef[String]) extends Command + + case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command + + sealed trait Event + + case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event + + object State { + val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId(""))) + } + + case class State(greeting: String, timestamp: LwwTime) + + def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) = + replicatedBehaviors.setup { replicationContext => + EventSourcedBehavior[Command, Event, State]( + replicationContext.persistenceId, + State.initial, { + case (State(greeting, _), Get(replyTo)) => + replyTo ! greeting + Effect.none + case (state, SetGreeting(greeting, replyTo)) => + Effect + .persist( + GreetingChanged( + greeting, + state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) + .thenRun((_: State) => replyTo ! Done) + }, { + case (currentState, GreetingChanged(newGreeting, newTimestamp)) => + if (newTimestamp.isAfter(currentState.timestamp)) + State(newGreeting, newTimestamp) + else currentState + }) + } + } +} + +class PushReplicationIntegrationSpec(testContainerConf: TestContainerConf) + extends ScalaTestWithActorTestKit( + akka.actor + .ActorSystem( + "ReplicationIntegrationSpecA", + PushReplicationIntegrationSpec + .config(PushReplicationIntegrationSpec.DCA) + .withFallback(testContainerConf.config)) + .toTyped) + with AnyWordSpecLike + with TestDbLifecycle + with BeforeAndAfterAll + with LogCapturing { + import PushReplicationIntegrationSpec._ + implicit val ec: ExecutionContext = system.executionContext + + def this() = this(new TestContainerConf) + + private val logger = LoggerFactory.getLogger(classOf[PushReplicationIntegrationSpec]) + override def typedSystem: ActorSystem[_] = testKit.system + + private val systems = Seq[ActorSystem[_]]( + typedSystem, + akka.actor + .ActorSystem( + "ReplicationIntegrationSpecB", + PushReplicationIntegrationSpec + .config(PushReplicationIntegrationSpec.DCB) + .withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "ReplicationIntegrationSpecEdgeA", + PushReplicationIntegrationSpec + .config(PushReplicationIntegrationSpec.EdgeReplicaA) + .withFallback(testContainerConf.config)) + .toTyped) + + private val grpcPorts = SocketUtil.temporaryServerAddresses(2, "127.0.0.1").map(_.getPort) + def grpcClientSettings(index: Int) = + GrpcClientSettings.connectToServiceAt("127.0.0.1", grpcPorts(index)).withTls(false) + private val replicaA = Replica(DCA, 2, grpcClientSettings(0)) + private val replicaB = Replica(DCB, 2, grpcClientSettings(1)) + private val allCloudReplicas = Set(replicaA, replicaB) + + /* + private val _ = Replica( + EdgeReplicaA, + 2, + // Note: there is no way to actively connect to this replica, instead the GrpcClientSettings would be how _it_ connects + // (to DCA in this case). The normal replicas does not have the Replica in their lists of all replicas + replicaA.grpcClientSettings) + */ + + private val testKitsPerDc = + Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), EdgeReplicaA -> ActorTestKit(systems(2))) + private val systemPerDc = Map(DCA -> system, DCB -> systems(1), EdgeReplicaA -> systems(2)) + private var replicationA: Replication[LWWHelloWorld.Command] = _ + private var replicationB: Replication[LWWHelloWorld.Command] = _ + private var edgeReplicationA: EdgeReplication[LWWHelloWorld.Command] = _ + private val entityIdOne = "one" + + override protected def beforeAll(): Unit = { + super.beforeAll() + // We can share the journal to save a bit of work, because the persistence id contains + // the dc so is unique (this is ofc completely synthetic, the whole point of replication + // over grpc is to replicate between different dcs/regions with completely separate databases). + // The offset tables need to be separate though to not get conflicts on projection names + systemPerDc.values.foreach { system => + val r2dbcProjectionSettings = R2dbcProjectionSettings(system) + Await.result( + r2dbcExecutor.updateOne("beforeAll delete")( + _.createStatement(s"delete from ${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")), + 10.seconds) + } + } + + val EntityTypeName = "hello-edge-world" + + def startReplica(replicaSystem: ActorSystem[_], replica: Replica): Future[Replication[LWWHelloWorld.Command]] = { + logger + .infoN( + "Starting replica [{}], system [{}] on port [{}]", + replica.replicaId, + replicaSystem.name, + replica.grpcClientSettings.defaultPort) + + val grpcPort = replica.grpcClientSettings.defaultPort + val settings = ReplicationSettings[LWWHelloWorld.Command]( + EntityTypeName, + replica.replicaId, + EventProducerSettings(replicaSystem), + allCloudReplicas, + 10.seconds, + 8, + R2dbcReplication()).withEdgeReplication(true) + val started = + Replication.grpcReplication(settings)(PushReplicationIntegrationSpec.LWWHelloWorld.apply)(replicaSystem) + + // start producer server + Http(system) + .newServerAt("127.0.0.1", grpcPort) + .bind(started.createSingleServiceHandler()) + .map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext) + .map(_ => started) + } + + def startEdgeReplica( + replicaSystem: ActorSystem[_], + selfReplicaId: ReplicaId, + connectTo: Replica): EdgeReplication[LWWHelloWorld.Command] = { + val settings = ReplicationSettings[LWWHelloWorld.Command]( + EntityTypeName, + selfReplicaId, + EventProducerSettings(replicaSystem), + Set(connectTo), + 10.seconds, + 8, + R2dbcReplication()).withEdgeReplication(true) + Replication.grpcEdgeReplication(settings)(PushReplicationIntegrationSpec.LWWHelloWorld.apply)(replicaSystem) + } + + "Replication over gRPC" should { + "form three one node clusters" in { + testKitsPerDc.values.foreach { testKit => + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + } + + "start two cloud replicas and one edge replica" in { + replicationA = startReplica(systemPerDc(DCA), replicaA).futureValue + replicationB = startReplica(systemPerDc(DCB), replicaB).futureValue + edgeReplicationA = startEdgeReplica(systemPerDc(EdgeReplicaA), EdgeReplicaA, replicaA) + logger.info("All three replication/producer services bound") + } + + "replicate writes directly from cloud to edge" in { + logger.infoN("Updating greeting for [{}] from dc [{}]", entityIdOne, DCA) + replicationA + .entityRefFactory(entityIdOne) + .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${DCA.id}", _)) + .futureValue + + val edgeEntityRef = edgeReplicationA.entityRefFactory(entityIdOne) + val probe = testKit.createTestProbe() + probe.awaitAssert({ + edgeEntityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${DCA.id}") + }, 10.seconds) + + // and also B ofc (unrelated to edge replication but for good measure) + val dcBEntityRef = replicationB.entityRefFactory(entityIdOne) + probe.awaitAssert({ + dcBEntityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${DCA.id}") + }, 10.seconds) + } + + "replicate writes from edge node to cloud" in { + logger.infoN("Updating greeting for [{}] from dc [{}]", entityIdOne, edgeReplicationA) + edgeReplicationA + .entityRefFactory(entityIdOne) + .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${EdgeReplicaA.id}", _)) + .futureValue + + val probe = testKit.createTestProbe() + // should reach the direct replica + val dcAEntityRef = replicationA.entityRefFactory(entityIdOne) + probe.awaitAssert({ + dcAEntityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${EdgeReplicaA.id}") + }, 10.seconds) + + // then indirectly replica B + // FIXME not working yet + pendingUntilFixed { + val dcBEntityRef = replicationB.entityRefFactory(entityIdOne) + probe.awaitAssert({ + dcBEntityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${DCB.id}") + }, 10.seconds) + } + + } + + // FIXME not working yet + "replicate writes from one DCB to DCA and then the edge node" in pendingUntilFixed { + logger.infoN("Updating greeting for [{}] from dc [{}]", entityIdOne, DCB) + replicationB + .entityRefFactory(entityIdOne) + .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${DCB.id}", _)) + .futureValue + + // should reach the other replica + val dcAEntityRef = replicationA.entityRefFactory(entityIdOne) + val probe = testKit.createTestProbe() + probe.awaitAssert({ + dcAEntityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${DCB.id}") + }, 10.seconds) + + // then edge + val edgeEntityRef = edgeReplicationA.entityRefFactory(entityIdOne) + probe.awaitAssert({ + edgeEntityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${DCB.id}") + }, 10.seconds) + + } + } + + protected override def afterAll(): Unit = { + logger.info("Shutting down all three DCs") + systems.foreach(_.terminate()) // speed up termination by terminating all at the once + // and then make sure they are completely shutdown + systems.foreach { system => + ActorTestKit.shutdown(system) + } + super.afterAll() + } +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index e4005b268..827618de7 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -46,6 +46,9 @@ import scala.concurrent.duration.DurationInt */ @InternalApi private[akka] object EventPusherConsumerServiceImpl { + + private val log = LoggerFactory.getLogger(getClass) + // See akka.persistence.r2dbc.internal.EnvelopeOrigin, but we don't have a dependency // to akka-persistence-r2dbc here def fromSnapshot(env: EventEnvelope[_]): Boolean = @@ -92,6 +95,7 @@ private[akka] object EventPusherConsumerServiceImpl { def applyForRES(replicationSettings: Set[ReplicationSettings[_]], preferProtobuf: ProtoAnySerialization.Prefer)( implicit system: ActorSystem[_]): EventPusherConsumerServiceImpl = { implicit val timeout: Timeout = 3.seconds // FIXME config + implicit val ec: ExecutionContext = system.executionContext val sharding = ClusterSharding(system) val ( @@ -105,7 +109,7 @@ private[akka] object EventPusherConsumerServiceImpl { // send event to entity in this replica val replicationId = ReplicationId.fromString(envelope.persistenceId) val destinationReplicaId = replicationId.withReplica(replicationSetting.selfReplicaId) - sharding + val askResult = sharding .entityRefFor(replicationSetting.entityTypeKey, destinationReplicaId.persistenceId.entityId) .asInstanceOf[EntityRef[PublishedEvent]] .ask[Done](replyTo => @@ -118,6 +122,12 @@ private[akka] object EventPusherConsumerServiceImpl { replicatedEventMetadata.originReplica, replicatedEventMetadata.version)), Some(replyTo))) + askResult.failed.foreach( + error => + log.warn( + s"Failing replication stream from [${replicatedEventMetadata.originReplica.id}], event pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]", + error)) + askResult case unexpected => throw new IllegalArgumentException( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index f6a655c4f..0bc88de1a 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -116,6 +116,7 @@ private[akka] object ReplicationImpl { val handler = EventProducer.grpcServiceHandler(Set(eps), settings.eventProducerInterceptor) if (settings.acceptEdgeReplication) { // Fold in edge push gRPC consumer service if enabled + log.info("Edge replication enabled for Replicated Entity [{}]", settings.entityTypeKey.name) val pushConsumer = EventConsumerServicePowerApiHandler.partial( EventPusherConsumerServiceImpl.applyForRES( Set(settings), @@ -271,7 +272,7 @@ private[akka] object ReplicationImpl { /* val grpcQuerySettings = { val s = GrpcQuerySettings(settings.streamId) // FIXME additional request metadata for auth and such - // remoteReplica.additionalQueryRequestMetadata.fold(s)(s.withAdditionalRequestMetadata) + remoteReplica.additionalQueryRequestMetadata.fold(s)(s.withAdditionalRequestMetadata) s } */ // FIXME do we need to know ingress replica id or is just being able to connect (host/port) enough?