Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RES initial consumer filter #1072

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import akka.projection.ProjectionBehavior
import akka.projection.ProjectionContext
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.grpc.producer.scaladsl.EventProducer
Expand Down Expand Up @@ -76,12 +77,15 @@ private[akka] object ReplicationImpl {
*/
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: EventEnvelope[Event] => Boolean,
replicatedEntity: ReplicatedEntity[Command])(implicit system: ActorSystem[_]): ReplicationImpl[Command] = {
require(
system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.isInstanceOf[ClusterActorRefProvider],
"Replicated Event Sourcing over gRPC only possible together with Akka cluster (akka.actor.provider = cluster)")

if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter)
}

// set up a publisher
val onlyLocalOriginTransformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper(envelope =>
envelope.eventMetadata match {
Expand All @@ -99,7 +103,7 @@ private[akka] object ReplicationImpl {
settings.streamId,
onlyLocalOriginTransformer,
settings.eventProducerSettings,
producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean])
settings.producerFilter)

val sharding = ClusterSharding(system)
sharding.init(replicatedEntity.entity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package akka.projection.grpc.replication.javadsl

import java.util.concurrent.CompletionStage
import java.util.function.Predicate

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
Expand All @@ -16,18 +19,14 @@ import akka.cluster.sharding.typed.javadsl.EntityTypeKey
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.japi.function.{ Function => JFunction }
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.persistence.typed.javadsl.ReplicationContext
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.projection.grpc.producer.javadsl.EventProducer
import akka.projection.grpc.producer.javadsl.EventProducerSource
import akka.projection.grpc.replication.internal.ReplicationImpl
import java.util.concurrent.CompletionStage
import java.util.function.Predicate

import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.internal.TopicMatcher

/**
* Created using [[Replication.grpcReplication]], which starts sharding with the entity and
Expand Down Expand Up @@ -79,25 +78,6 @@ object Replication {
settings: ReplicationSettings[Command],
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {
val trueProducerFilter = new Predicate[EventEnvelope[Event]] {
override def test(env: EventEnvelope[Event]): Boolean = true
}
grpcReplication[Command, Event, State](settings, trueProducerFilter, replicatedBehaviorFactory, system)
}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: Predicate[EventEnvelope[Event]],
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {

val scalaReplicationSettings = settings.toScala

val replicatedEntity =
Expand All @@ -123,13 +103,8 @@ object Replication {
}))
.toScala)

val scalaProducerFilter: EventEnvelope[Event] => Boolean = producerFilter.test

val scalaRESOG =
ReplicationImpl.grpcReplication[Command, Event, State](
scalaReplicationSettings,
scalaProducerFilter,
replicatedEntity)(system)
ReplicationImpl.grpcReplication[Command, Event, State](scalaReplicationSettings, replicatedEntity)(system)
val jEventProducerSource = new EventProducerSource(
scalaRESOG.eventProducerService.entityType,
scalaRESOG.eventProducerService.streamId,
Expand All @@ -152,6 +127,24 @@ object Replication {
}
}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@Deprecated
@deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1")
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: Predicate[EventEnvelope[Event]],
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {
grpcReplication(settings.withProducerFilter(producerFilter), replicatedBehaviorFactory, system)

}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
Expand All @@ -160,18 +153,14 @@ object Replication {
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@Deprecated
@deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1")
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
topicExpression: String,
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {
val topicMatcher = TopicMatcher(topicExpression)
grpcReplication(
settings,
(env: EventEnvelope[Event]) => topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix),
replicatedBehaviorFactory,
system)

grpcReplication(settings.withProducerFilterTopicExpression(topicExpression), replicatedBehaviorFactory, system)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package akka.projection.grpc.replication.javadsl

import java.util.function.{ Function => JFunction }
import java.util.{ List => JList }

import akka.actor.typed.ActorSystem
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
Expand All @@ -19,10 +22,15 @@ import akka.projection.grpc.replication.internal.ReplicaImpl
import akka.projection.grpc.replication.scaladsl.{ ReplicationSettings => SReplicationSettings }
import akka.util.JavaDurationConverters.JavaDurationOps
import com.typesafe.config.Config

import java.time.Duration
import java.util.Collections
import java.util.Optional
import java.util.function.Predicate
import java.util.{ Set => JSet }

import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.internal.TopicMatcher
import akka.util.ccompat.JavaConverters._
import akka.projection.grpc.replication.internal.ReplicationProjectionProviderAdapter

Expand Down Expand Up @@ -68,7 +76,9 @@ object ReplicationSettings {
replicationProjectionProvider,
Optional.empty(),
identity,
indirectReplication = false)
indirectReplication = false,
_ => true,
Collections.emptyList)
}

/**
Expand Down Expand Up @@ -119,7 +129,9 @@ object ReplicationSettings {
replicationProjectionProvider = replicationProjectionProvider,
Optional.empty(),
identity,
indirectReplication = false)
indirectReplication = false,
_ => true,
Collections.emptyList)
}
}

Expand All @@ -138,10 +150,12 @@ final class ReplicationSettings[Command] private (
val parallelUpdates: Int,
val replicationProjectionProvider: ReplicationProjectionProvider,
val eventProducerInterceptor: Optional[EventProducerInterceptor],
val configureEntity: java.util.function.Function[
val configureEntity: JFunction[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]],
val indirectReplication: Boolean) {
val indirectReplication: Boolean,
val producerFilter: Predicate[EventEnvelope[Any]],
val initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]) {

def withSelfReplicaId(selfReplicaId: ReplicaId): ReplicationSettings[Command] =
copy(selfReplicaId = selfReplicaId)
Expand Down Expand Up @@ -184,9 +198,8 @@ final class ReplicationSettings[Command] private (
* Allows for changing the settings of the replicated entity, such as stop message, passivation strategy etc.
*/
def configureEntity(
configure: java.util.function.Function[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]]): ReplicationSettings[Command] =
configure: JFunction[Entity[Command, ShardingEnvelope[Command]], Entity[Command, ShardingEnvelope[Command]]])
: ReplicationSettings[Command] =
copy(configureEntity = configure)

/**
Expand All @@ -197,6 +210,30 @@ final class ReplicationSettings[Command] private (
def withIndirectReplication(enabled: Boolean): ReplicationSettings[Command] =
copy(indirectReplication = enabled)

/**
* Filter events matching the `producerFilter` predicate, for example based on tags.
*/
def withProducerFilter[Event](producerFilter: Predicate[EventEnvelope[Event]]): ReplicationSettings[Command] =
copy(producerFilter = producerFilter.asInstanceOf[Predicate[EventEnvelope[Any]]])

/**
* Filter events matching the topic expression according to MQTT specification, including wildcards.
* The topic of an event is defined by a tag with certain prefix, see `topic-tag-prefix` configuration.
*/
def withProducerFilterTopicExpression(topicExpression: String): ReplicationSettings[Command] = {
val topicMatcher = TopicMatcher(topicExpression)
withProducerFilter[Any](env => topicMatcher.matches(env, eventProducerSettings.topicTagPrefix))
}

/**
* Set the initial consumer filter to use for events. Should only be used for static, up front consumer filters.
* Combining this with updating consumer filters directly means that the filters may be reset to these
* filters.
*/
def withInitialConsumerFilter(
initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]): ReplicationSettings[Command] =
copy(initialConsumerFilter = initialConsumerFilter)

private def copy(
selfReplicaId: ReplicaId = selfReplicaId,
entityTypeKey: EntityTypeKey[Command] = entityTypeKey,
Expand All @@ -207,10 +244,13 @@ final class ReplicationSettings[Command] private (
parallelUpdates: Int = parallelUpdates,
projectionProvider: ReplicationProjectionProvider = replicationProjectionProvider,
eventProducerInterceptor: Optional[EventProducerInterceptor] = eventProducerInterceptor,
configureEntity: java.util.function.Function[
configureEntity: JFunction[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]] = configureEntity,
indirectReplication: Boolean = indirectReplication): ReplicationSettings[Command] =
indirectReplication: Boolean = indirectReplication,
producerFilter: Predicate[EventEnvelope[Any]] = producerFilter,
initialConsumerFilter: JList[ConsumerFilter.FilterCriteria] = initialConsumerFilter)
: ReplicationSettings[Command] =
new ReplicationSettings[Command](
selfReplicaId,
entityTypeKey,
Expand All @@ -222,7 +262,9 @@ final class ReplicationSettings[Command] private (
projectionProvider,
eventProducerInterceptor,
configureEntity,
indirectReplication)
indirectReplication,
producerFilter,
initialConsumerFilter)

override def toString =
s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, ${otherReplicas.asScala.mkString(", ")})"
Expand All @@ -241,5 +283,7 @@ final class ReplicationSettings[Command] private (
parallelUpdates = parallelUpdates,
replicationProjectionProvider = ReplicationProjectionProviderAdapter.toScala(replicationProjectionProvider))
.withIndirectReplication(indirectReplication)
.withProducerFilter(producerFilter.test)
.withInitialConsumerFilter(initialConsumerFilter.asScala.toVector)

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.projection.grpc.replication.scaladsl

import scala.concurrent.Future

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
Expand All @@ -14,14 +16,11 @@ import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.replication.internal.ReplicationImpl
import scala.concurrent.Future

import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.internal.TopicMatcher

/**
* Created using [[Replication.grpcReplication]], which starts sharding with the entity and
Expand Down Expand Up @@ -70,23 +69,8 @@ object Replication {
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command])(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] =
grpcReplication[Command, Event, State](settings, (_: EventEnvelope[Event]) => true)(replicatedBehaviorFactory)

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: EventEnvelope[Event] => Boolean)(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] = {

val replicatedEntity =
ReplicatedEntity(
settings.selfReplicaId,
Expand All @@ -100,7 +84,23 @@ object Replication {
}
}))

ReplicationImpl.grpcReplication[Command, Event, State](settings, producerFilter, replicatedEntity)
ReplicationImpl.grpcReplication[Command, Event, State](settings, replicatedEntity)
}

/**
* Called to bootstrap the entity on each cluster node in each of the replicas.
*
* Filter events matching the `producerFilter` predicate, for example based on tags.
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1")
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
producerFilter: EventEnvelope[Event] => Boolean)(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] = {
grpcReplication(settings.withProducerFilter(producerFilter))(replicatedBehaviorFactory)
}

/**
Expand All @@ -111,14 +111,11 @@ object Replication {
*
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
@deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1")
def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command], topicExpression: String)(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] = {
val topicMatcher = TopicMatcher(topicExpression)
grpcReplication(
settings,
(env: EventEnvelope[Event]) =>
topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix))(replicatedBehaviorFactory)
grpcReplication(settings.withProducerFilterTopicExpression(topicExpression))(replicatedBehaviorFactory)
}

}
Loading
Loading