diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index 354c561d0..2f0656e71 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -34,6 +34,7 @@ import akka.projection.ProjectionBehavior import akka.projection.ProjectionContext import akka.projection.ProjectionId import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.grpc.consumer.ConsumerFilter import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal import akka.projection.grpc.producer.scaladsl.EventProducer @@ -76,12 +77,15 @@ private[akka] object ReplicationImpl { */ def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], - producerFilter: EventEnvelope[Event] => Boolean, replicatedEntity: ReplicatedEntity[Command])(implicit system: ActorSystem[_]): ReplicationImpl[Command] = { require( system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.isInstanceOf[ClusterActorRefProvider], "Replicated Event Sourcing over gRPC only possible together with Akka cluster (akka.actor.provider = cluster)") + if (settings.initialConsumerFilter.nonEmpty) { + ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter) + } + // set up a publisher val onlyLocalOriginTransformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper(envelope => envelope.eventMetadata match { @@ -99,7 +103,7 @@ private[akka] object ReplicationImpl { settings.streamId, onlyLocalOriginTransformer, settings.eventProducerSettings, - producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean]) + settings.producerFilter) val sharding = ClusterSharding(system) sharding.init(replicatedEntity.entity) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala index 6e10463d8..c64da74c3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala @@ -4,6 +4,9 @@ package akka.projection.grpc.replication.javadsl +import java.util.concurrent.CompletionStage +import java.util.function.Predicate + import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.annotation.ApiMayChange @@ -16,6 +19,7 @@ import akka.cluster.sharding.typed.javadsl.EntityTypeKey import akka.http.javadsl.model.HttpRequest import akka.http.javadsl.model.HttpResponse import akka.japi.function.{ Function => JFunction } +import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicationId import akka.persistence.typed.internal.ReplicationContextImpl import akka.persistence.typed.javadsl.ReplicationContext @@ -23,11 +27,6 @@ import akka.persistence.typed.scaladsl.ReplicatedEventSourcing import akka.projection.grpc.producer.javadsl.EventProducer import akka.projection.grpc.producer.javadsl.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl -import java.util.concurrent.CompletionStage -import java.util.function.Predicate - -import akka.persistence.query.typed.EventEnvelope -import akka.projection.grpc.internal.TopicMatcher /** * Created using [[Replication.grpcReplication]], which starts sharding with the entity and @@ -79,25 +78,6 @@ object Replication { settings: ReplicationSettings[Command], replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { - val trueProducerFilter = new Predicate[EventEnvelope[Event]] { - override def test(env: EventEnvelope[Event]): Boolean = true - } - grpcReplication[Command, Event, State](settings, trueProducerFilter, replicatedBehaviorFactory, system) - } - - /** - * Called to bootstrap the entity on each cluster node in each of the replicas. - * - * Filter events matching the `producerFilter` predicate, for example based on tags. - * - * Important: Note that this does not publish the endpoint, additional steps are needed! - */ - def grpcReplication[Command, Event, State]( - settings: ReplicationSettings[Command], - producerFilter: Predicate[EventEnvelope[Event]], - replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], - system: ActorSystem[_]): Replication[Command] = { - val scalaReplicationSettings = settings.toScala val replicatedEntity = @@ -123,13 +103,8 @@ object Replication { })) .toScala) - val scalaProducerFilter: EventEnvelope[Event] => Boolean = producerFilter.test - val scalaRESOG = - ReplicationImpl.grpcReplication[Command, Event, State]( - scalaReplicationSettings, - scalaProducerFilter, - replicatedEntity)(system) + ReplicationImpl.grpcReplication[Command, Event, State](scalaReplicationSettings, replicatedEntity)(system) val jEventProducerSource = new EventProducerSource( scalaRESOG.eventProducerService.entityType, scalaRESOG.eventProducerService.streamId, @@ -152,6 +127,24 @@ object Replication { } } + /** + * Called to bootstrap the entity on each cluster node in each of the replicas. + * + * Filter events matching the `producerFilter` predicate, for example based on tags. + * + * Important: Note that this does not publish the endpoint, additional steps are needed! + */ + @Deprecated + @deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1") + def grpcReplication[Command, Event, State]( + settings: ReplicationSettings[Command], + producerFilter: Predicate[EventEnvelope[Event]], + replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], + system: ActorSystem[_]): Replication[Command] = { + grpcReplication(settings.withProducerFilter(producerFilter), replicatedBehaviorFactory, system) + + } + /** * Called to bootstrap the entity on each cluster node in each of the replicas. * @@ -160,18 +153,14 @@ object Replication { * * Important: Note that this does not publish the endpoint, additional steps are needed! */ + @Deprecated + @deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1") def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], topicExpression: String, replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { - val topicMatcher = TopicMatcher(topicExpression) - grpcReplication( - settings, - (env: EventEnvelope[Event]) => topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix), - replicatedBehaviorFactory, - system) - + grpcReplication(settings.withProducerFilterTopicExpression(topicExpression), replicatedBehaviorFactory, system) } } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala index a14d991dc..0db1d5e1b 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala @@ -4,6 +4,9 @@ package akka.projection.grpc.replication.javadsl +import java.util.function.{ Function => JFunction } +import java.util.{ List => JList } + import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit @@ -19,10 +22,15 @@ import akka.projection.grpc.replication.internal.ReplicaImpl import akka.projection.grpc.replication.scaladsl.{ ReplicationSettings => SReplicationSettings } import akka.util.JavaDurationConverters.JavaDurationOps import com.typesafe.config.Config - import java.time.Duration +import java.util.Collections import java.util.Optional +import java.util.function.Predicate import java.util.{ Set => JSet } + +import akka.persistence.query.typed.EventEnvelope +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.internal.TopicMatcher import akka.util.ccompat.JavaConverters._ import akka.projection.grpc.replication.internal.ReplicationProjectionProviderAdapter @@ -68,7 +76,9 @@ object ReplicationSettings { replicationProjectionProvider, Optional.empty(), identity, - indirectReplication = false) + indirectReplication = false, + _ => true, + Collections.emptyList) } /** @@ -119,7 +129,9 @@ object ReplicationSettings { replicationProjectionProvider = replicationProjectionProvider, Optional.empty(), identity, - indirectReplication = false) + indirectReplication = false, + _ => true, + Collections.emptyList) } } @@ -138,10 +150,12 @@ final class ReplicationSettings[Command] private ( val parallelUpdates: Int, val replicationProjectionProvider: ReplicationProjectionProvider, val eventProducerInterceptor: Optional[EventProducerInterceptor], - val configureEntity: java.util.function.Function[ + val configureEntity: JFunction[ Entity[Command, ShardingEnvelope[Command]], Entity[Command, ShardingEnvelope[Command]]], - val indirectReplication: Boolean) { + val indirectReplication: Boolean, + val producerFilter: Predicate[EventEnvelope[Any]], + val initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]) { def withSelfReplicaId(selfReplicaId: ReplicaId): ReplicationSettings[Command] = copy(selfReplicaId = selfReplicaId) @@ -184,9 +198,8 @@ final class ReplicationSettings[Command] private ( * Allows for changing the settings of the replicated entity, such as stop message, passivation strategy etc. */ def configureEntity( - configure: java.util.function.Function[ - Entity[Command, ShardingEnvelope[Command]], - Entity[Command, ShardingEnvelope[Command]]]): ReplicationSettings[Command] = + configure: JFunction[Entity[Command, ShardingEnvelope[Command]], Entity[Command, ShardingEnvelope[Command]]]) + : ReplicationSettings[Command] = copy(configureEntity = configure) /** @@ -197,6 +210,30 @@ final class ReplicationSettings[Command] private ( def withIndirectReplication(enabled: Boolean): ReplicationSettings[Command] = copy(indirectReplication = enabled) + /** + * Filter events matching the `producerFilter` predicate, for example based on tags. + */ + def withProducerFilter[Event](producerFilter: Predicate[EventEnvelope[Event]]): ReplicationSettings[Command] = + copy(producerFilter = producerFilter.asInstanceOf[Predicate[EventEnvelope[Any]]]) + + /** + * Filter events matching the topic expression according to MQTT specification, including wildcards. + * The topic of an event is defined by a tag with certain prefix, see `topic-tag-prefix` configuration. + */ + def withProducerFilterTopicExpression(topicExpression: String): ReplicationSettings[Command] = { + val topicMatcher = TopicMatcher(topicExpression) + withProducerFilter[Any](env => topicMatcher.matches(env, eventProducerSettings.topicTagPrefix)) + } + + /** + * Set the initial consumer filter to use for events. Should only be used for static, up front consumer filters. + * Combining this with updating consumer filters directly means that the filters may be reset to these + * filters. + */ + def withInitialConsumerFilter( + initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]): ReplicationSettings[Command] = + copy(initialConsumerFilter = initialConsumerFilter) + private def copy( selfReplicaId: ReplicaId = selfReplicaId, entityTypeKey: EntityTypeKey[Command] = entityTypeKey, @@ -207,10 +244,13 @@ final class ReplicationSettings[Command] private ( parallelUpdates: Int = parallelUpdates, projectionProvider: ReplicationProjectionProvider = replicationProjectionProvider, eventProducerInterceptor: Optional[EventProducerInterceptor] = eventProducerInterceptor, - configureEntity: java.util.function.Function[ + configureEntity: JFunction[ Entity[Command, ShardingEnvelope[Command]], Entity[Command, ShardingEnvelope[Command]]] = configureEntity, - indirectReplication: Boolean = indirectReplication): ReplicationSettings[Command] = + indirectReplication: Boolean = indirectReplication, + producerFilter: Predicate[EventEnvelope[Any]] = producerFilter, + initialConsumerFilter: JList[ConsumerFilter.FilterCriteria] = initialConsumerFilter) + : ReplicationSettings[Command] = new ReplicationSettings[Command]( selfReplicaId, entityTypeKey, @@ -222,7 +262,9 @@ final class ReplicationSettings[Command] private ( projectionProvider, eventProducerInterceptor, configureEntity, - indirectReplication) + indirectReplication, + producerFilter, + initialConsumerFilter) override def toString = s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, ${otherReplicas.asScala.mkString(", ")})" @@ -241,5 +283,7 @@ final class ReplicationSettings[Command] private ( parallelUpdates = parallelUpdates, replicationProjectionProvider = ReplicationProjectionProviderAdapter.toScala(replicationProjectionProvider)) .withIndirectReplication(indirectReplication) + .withProducerFilter(producerFilter.test) + .withInitialConsumerFilter(initialConsumerFilter.asScala.toVector) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala index eef641984..b14880e7f 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala @@ -4,6 +4,8 @@ package akka.projection.grpc.replication.scaladsl +import scala.concurrent.Future + import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.annotation.ApiMayChange @@ -14,14 +16,11 @@ import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse +import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicationId import akka.persistence.typed.scaladsl.ReplicatedEventSourcing import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl -import scala.concurrent.Future - -import akka.persistence.query.typed.EventEnvelope -import akka.projection.grpc.internal.TopicMatcher /** * Created using [[Replication.grpcReplication]], which starts sharding with the entity and @@ -70,23 +69,8 @@ object Replication { * Important: Note that this does not publish the endpoint, additional steps are needed! */ def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command])( - replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( - implicit system: ActorSystem[_]): Replication[Command] = - grpcReplication[Command, Event, State](settings, (_: EventEnvelope[Event]) => true)(replicatedBehaviorFactory) - - /** - * Called to bootstrap the entity on each cluster node in each of the replicas. - * - * Filter events matching the `producerFilter` predicate, for example based on tags. - * - * Important: Note that this does not publish the endpoint, additional steps are needed! - */ - def grpcReplication[Command, Event, State]( - settings: ReplicationSettings[Command], - producerFilter: EventEnvelope[Event] => Boolean)( replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( implicit system: ActorSystem[_]): Replication[Command] = { - val replicatedEntity = ReplicatedEntity( settings.selfReplicaId, @@ -100,7 +84,23 @@ object Replication { } })) - ReplicationImpl.grpcReplication[Command, Event, State](settings, producerFilter, replicatedEntity) + ReplicationImpl.grpcReplication[Command, Event, State](settings, replicatedEntity) + } + + /** + * Called to bootstrap the entity on each cluster node in each of the replicas. + * + * Filter events matching the `producerFilter` predicate, for example based on tags. + * + * Important: Note that this does not publish the endpoint, additional steps are needed! + */ + @deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1") + def grpcReplication[Command, Event, State]( + settings: ReplicationSettings[Command], + producerFilter: EventEnvelope[Event] => Boolean)( + replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( + implicit system: ActorSystem[_]): Replication[Command] = { + grpcReplication(settings.withProducerFilter(producerFilter))(replicatedBehaviorFactory) } /** @@ -111,14 +111,11 @@ object Replication { * * Important: Note that this does not publish the endpoint, additional steps are needed! */ + @deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1") def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command], topicExpression: String)( replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( implicit system: ActorSystem[_]): Replication[Command] = { - val topicMatcher = TopicMatcher(topicExpression) - grpcReplication( - settings, - (env: EventEnvelope[Event]) => - topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix))(replicatedBehaviorFactory) + grpcReplication(settings.withProducerFilterTopicExpression(topicExpression))(replicatedBehaviorFactory) } } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala index 5c6c403f0..dc338c8f9 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala @@ -4,6 +4,10 @@ package akka.projection.grpc.replication.scaladsl +import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.annotation.InternalApi @@ -11,7 +15,10 @@ import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.grpc.GrpcClientSettings +import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicaId +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.internal.TopicMatcher import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor import akka.projection.grpc.replication.internal.ReplicaImpl @@ -20,9 +27,6 @@ import akka.util.JavaDurationConverters._ import akka.util.ccompat.JavaConverters._ import com.typesafe.config.Config -import scala.concurrent.duration.FiniteDuration -import scala.reflect.ClassTag - @ApiMayChange object ReplicationSettings { @@ -82,7 +86,9 @@ object ReplicationSettings { projectionProvider = replicationProjectionProvider, None, identity, - indirectReplication = false) + indirectReplication = false, + producerFilter = _ => true, + initialConsumerFilter = Vector.empty) } /** @@ -135,7 +141,9 @@ object ReplicationSettings { projectionProvider = replicationProjectionProvider, None, identity, - indirectReplication) + indirectReplication, + producerFilter = _ => true, + initialConsumerFilter = Vector.empty) } } @@ -155,7 +163,9 @@ final class ReplicationSettings[Command] private ( val projectionProvider: ReplicationProjectionProvider, val eventProducerInterceptor: Option[EventProducerInterceptor], val configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]], - val indirectReplication: Boolean) { + val indirectReplication: Boolean, + val producerFilter: EventEnvelope[Any] => Boolean, + val initialConsumerFilter: immutable.Seq[ConsumerFilter.FilterCriteria]) { require( !otherReplicas.exists(_.replicaId == selfReplicaId), @@ -218,6 +228,30 @@ final class ReplicationSettings[Command] private ( def withIndirectReplication(enabled: Boolean): ReplicationSettings[Command] = copy(indirectReplication = enabled) + /** + * Filter events matching the `producerFilter` predicate, for example based on tags. + */ + def withProducerFilter[Event](producerFilter: EventEnvelope[Event] => Boolean): ReplicationSettings[Command] = + copy(producerFilter = producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean]) + + /** + * Filter events matching the topic expression according to MQTT specification, including wildcards. + * The topic of an event is defined by a tag with certain prefix, see `topic-tag-prefix` configuration. + */ + def withProducerFilterTopicExpression(topicExpression: String): ReplicationSettings[Command] = { + val topicMatcher = TopicMatcher(topicExpression) + withProducerFilter[Any](env => topicMatcher.matches(env, eventProducerSettings.topicTagPrefix)) + } + + /** + * Set the initial consumer filter to use for events. Should only be used for static, up front consumer filters. + * Combining this with updating consumer filters directly means that the filters may be reset to these + * filters. + */ + def withInitialConsumerFilter( + initialConsumerFilter: immutable.Seq[ConsumerFilter.FilterCriteria]): ReplicationSettings[Command] = + copy(initialConsumerFilter = initialConsumerFilter) + private def copy( selfReplicaId: ReplicaId = selfReplicaId, entityTypeKey: EntityTypeKey[Command] = entityTypeKey, @@ -230,7 +264,9 @@ final class ReplicationSettings[Command] private ( producerInterceptor: Option[EventProducerInterceptor] = eventProducerInterceptor, configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]] = configureEntity, - indirectReplication: Boolean = indirectReplication) = + indirectReplication: Boolean = indirectReplication, + producerFilter: EventEnvelope[Any] => Boolean = producerFilter, + initialConsumerFilter: immutable.Seq[ConsumerFilter.FilterCriteria] = initialConsumerFilter) = new ReplicationSettings[Command]( selfReplicaId, entityTypeKey, @@ -242,7 +278,9 @@ final class ReplicationSettings[Command] private ( projectionProvider, producerInterceptor, configureEntity, - indirectReplication) + indirectReplication, + producerFilter, + initialConsumerFilter) override def toString = s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, $otherReplicas)"