Skip to content

Commit

Permalink
feat: RES initial consumer filter
Browse files Browse the repository at this point in the history
* and define filters via ReplicationSettings
* deprecate grpcReplication with producer filter params,
  via settings instead
  • Loading branch information
patriknw committed Nov 21, 2023
1 parent a01f8e8 commit 97cfa8d
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import akka.projection.ProjectionBehavior
import akka.projection.ProjectionContext
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.grpc.internal.ProtoAnySerialization
Expand Down Expand Up @@ -75,19 +76,22 @@ private[akka] object ReplicationImpl {
*/
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: EventEnvelope[Event] => Boolean,
replicatedEntity: ReplicatedEntity[Command])(implicit system: ActorSystem[_]): ReplicationImpl[Command] = {
require(
system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.isInstanceOf[ClusterActorRefProvider],
"Replicated Event Sourcing over gRPC only possible together with Akka cluster (akka.actor.provider = cluster)")

if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter)
}

// set up a publisher
val eps = EventProducerSource(
settings.entityTypeKey.name,
settings.streamId,
Transformation.identity,
settings.eventProducerSettings,
producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean])
settings.producerFilter)
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))

val sharding = ClusterSharding(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package akka.projection.grpc.replication.javadsl

import java.util.concurrent.CompletionStage
import java.util.function.Predicate

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
Expand All @@ -16,18 +19,14 @@ import akka.cluster.sharding.typed.javadsl.EntityTypeKey
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.japi.function.{ Function => JFunction }
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.persistence.typed.javadsl.ReplicationContext
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.projection.grpc.producer.javadsl.EventProducer
import akka.projection.grpc.producer.javadsl.EventProducerSource
import akka.projection.grpc.replication.internal.ReplicationImpl
import java.util.concurrent.CompletionStage
import java.util.function.Predicate

import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.internal.TopicMatcher

/**
* Created using [[Replication.grpcReplication]], which starts sharding with the entity and
Expand Down Expand Up @@ -79,25 +78,6 @@ object Replication {
settings: ReplicationSettings[Command],
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {
val trueProducerFilter = new Predicate[EventEnvelope[Event]] {
override def test(env: EventEnvelope[Event]): Boolean = true
}
grpcReplication[Command, Event, State](settings, trueProducerFilter, replicatedBehaviorFactory, system)
}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: Predicate[EventEnvelope[Event]],
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {

val scalaReplicationSettings = settings.toScala

val replicatedEntity =
Expand All @@ -123,13 +103,8 @@ object Replication {
}))
.toScala)

val scalaProducerFilter: EventEnvelope[Event] => Boolean = producerFilter.test

val scalaRESOG =
ReplicationImpl.grpcReplication[Command, Event, State](
scalaReplicationSettings,
scalaProducerFilter,
replicatedEntity)(system)
ReplicationImpl.grpcReplication[Command, Event, State](scalaReplicationSettings, replicatedEntity)(system)
val jEventProducerSource = new EventProducerSource(
scalaRESOG.eventProducerService.entityType,
scalaRESOG.eventProducerService.streamId,
Expand All @@ -152,6 +127,24 @@ object Replication {
}
}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@Deprecated
@deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1")
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: Predicate[EventEnvelope[Event]],
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {
grpcReplication(settings.withProducerFilter(producerFilter), replicatedBehaviorFactory, system)

}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
Expand All @@ -160,18 +153,14 @@ object Replication {
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@Deprecated
@deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1")
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
topicExpression: String,
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {
val topicMatcher = TopicMatcher(topicExpression)
grpcReplication(
settings,
(env: EventEnvelope[Event]) => topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix),
replicatedBehaviorFactory,
system)

grpcReplication(settings.withProducerFilterTopicExpression(topicExpression), replicatedBehaviorFactory, system)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package akka.projection.grpc.replication.javadsl

import java.util.function.{ Function => JFunction }
import java.util.{ List => JList }

import akka.actor.typed.ActorSystem
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
Expand All @@ -19,10 +22,15 @@ import akka.projection.grpc.replication.internal.ReplicaImpl
import akka.projection.grpc.replication.scaladsl.{ ReplicationSettings => SReplicationSettings }
import akka.util.JavaDurationConverters.JavaDurationOps
import com.typesafe.config.Config

import java.time.Duration
import java.util.Collections
import java.util.Optional
import java.util.function.Predicate
import java.util.{ Set => JSet }

import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.internal.TopicMatcher
import akka.util.ccompat.JavaConverters._
import akka.projection.grpc.replication.internal.ReplicationProjectionProviderAdapter

Expand Down Expand Up @@ -67,7 +75,9 @@ object ReplicationSettings {
parallelUpdates,
replicationProjectionProvider,
Optional.empty(),
identity)
identity,
_ => true,
Collections.emptyList)
}

/**
Expand Down Expand Up @@ -117,7 +127,9 @@ object ReplicationSettings {
parallelUpdates = config.getInt("parallel-updates"),
replicationProjectionProvider = replicationProjectionProvider,
Optional.empty(),
identity)
identity,
_ => true,
Collections.emptyList)
}
}

Expand All @@ -136,9 +148,11 @@ final class ReplicationSettings[Command] private (
val parallelUpdates: Int,
val replicationProjectionProvider: ReplicationProjectionProvider,
val eventProducerInterceptor: Optional[EventProducerInterceptor],
val configureEntity: java.util.function.Function[
val configureEntity: JFunction[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]]) {
Entity[Command, ShardingEnvelope[Command]]],
val producerFilter: Predicate[EventEnvelope[Any]],
val initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]) {

def withSelfReplicaId(selfReplicaId: ReplicaId): ReplicationSettings[Command] =
copy(selfReplicaId = selfReplicaId)
Expand Down Expand Up @@ -181,11 +195,34 @@ final class ReplicationSettings[Command] private (
* Allows for changing the settings of the replicated entity, such as stop message, passivation strategy etc.
*/
def configureEntity(
configure: java.util.function.Function[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]]): ReplicationSettings[Command] =
configure: JFunction[Entity[Command, ShardingEnvelope[Command]], Entity[Command, ShardingEnvelope[Command]]])
: ReplicationSettings[Command] =
copy(configureEntity = configure)

/**
* Filter events matching the `producerFilter` predicate, for example based on tags.
*/
def withProducerFilter[Event](producerFilter: Predicate[EventEnvelope[Event]]): ReplicationSettings[Command] =
copy(producerFilter = producerFilter.asInstanceOf[Predicate[EventEnvelope[Any]]])

/**
* Filter events matching the topic expression according to MQTT specification, including wildcards.
* The topic of an event is defined by a tag with certain prefix, see `topic-tag-prefix` configuration.
*/
def withProducerFilterTopicExpression(topicExpression: String): ReplicationSettings[Command] = {
val topicMatcher = TopicMatcher(topicExpression)
withProducerFilter[Any](env => topicMatcher.matches(env, eventProducerSettings.topicTagPrefix))
}

/**
* Set the initial consumer filter to use for events. Should only be used for static, up front consumer filters.
* Combining this with updating consumer filters directly means that the filters may be reset to these
* filters.
*/
def withInitialConsumerFilter(
initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]): ReplicationSettings[Command] =
copy(initialConsumerFilter = initialConsumerFilter)

private def copy(
selfReplicaId: ReplicaId = selfReplicaId,
entityTypeKey: EntityTypeKey[Command] = entityTypeKey,
Expand All @@ -196,9 +233,12 @@ final class ReplicationSettings[Command] private (
parallelUpdates: Int = parallelUpdates,
projectionProvider: ReplicationProjectionProvider = replicationProjectionProvider,
eventProducerInterceptor: Optional[EventProducerInterceptor] = eventProducerInterceptor,
configureEntity: java.util.function.Function[
configureEntity: JFunction[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]] = configureEntity): ReplicationSettings[Command] =
Entity[Command, ShardingEnvelope[Command]]] = configureEntity,
producerFilter: Predicate[EventEnvelope[Any]] = producerFilter,
initialConsumerFilter: JList[ConsumerFilter.FilterCriteria] = initialConsumerFilter)
: ReplicationSettings[Command] =
new ReplicationSettings[Command](
selfReplicaId,
entityTypeKey,
Expand All @@ -209,7 +249,9 @@ final class ReplicationSettings[Command] private (
parallelUpdates,
projectionProvider,
eventProducerInterceptor,
configureEntity)
configureEntity,
producerFilter,
initialConsumerFilter)

override def toString =
s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, ${otherReplicas.asScala.mkString(", ")})"
Expand All @@ -227,5 +269,7 @@ final class ReplicationSettings[Command] private (
entityEventReplicationTimeout = entityEventReplicationTimeout.asScala,
parallelUpdates = parallelUpdates,
replicationProjectionProvider = ReplicationProjectionProviderAdapter.toScala(replicationProjectionProvider))
.withProducerFilter(producerFilter.test)
.withInitialConsumerFilter(initialConsumerFilter.asScala.toVector)

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.projection.grpc.replication.scaladsl

import scala.concurrent.Future

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
Expand All @@ -14,14 +16,11 @@ import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.replication.internal.ReplicationImpl
import scala.concurrent.Future

import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.internal.TopicMatcher

/**
* Created using [[Replication.grpcReplication]], which starts sharding with the entity and
Expand Down Expand Up @@ -70,23 +69,8 @@ object Replication {
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command])(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] =
grpcReplication[Command, Event, State](settings, (_: EventEnvelope[Event]) => true)(replicatedBehaviorFactory)

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: EventEnvelope[Event] => Boolean)(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] = {

val replicatedEntity =
ReplicatedEntity(
settings.selfReplicaId,
Expand All @@ -100,7 +84,23 @@ object Replication {
}
}))

ReplicationImpl.grpcReplication[Command, Event, State](settings, producerFilter, replicatedEntity)
ReplicationImpl.grpcReplication[Command, Event, State](settings, replicatedEntity)
}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1")
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: EventEnvelope[Event] => Boolean)(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] = {
grpcReplication(settings.withProducerFilter(producerFilter))(replicatedBehaviorFactory)
}

/**
Expand All @@ -111,14 +111,11 @@ object Replication {
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1")
def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command], topicExpression: String)(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] = {
val topicMatcher = TopicMatcher(topicExpression)
grpcReplication(
settings,
(env: EventEnvelope[Event]) =>
topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix))(replicatedBehaviorFactory)
grpcReplication(settings.withProducerFilterTopicExpression(topicExpression))(replicatedBehaviorFactory)
}

}
Loading

0 comments on commit 97cfa8d

Please sign in to comment.