Skip to content

Commit

Permalink
javadsl
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 17, 2023
1 parent b90c5ad commit aa62b55
Showing 1 changed file with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ object ReplicationSettings {
parallelUpdates,
replicationProjectionProvider,
Optional.empty(),
identity)
identity,
indirectReplication = false)
}

/**
Expand Down Expand Up @@ -117,7 +118,8 @@ object ReplicationSettings {
parallelUpdates = config.getInt("parallel-updates"),
replicationProjectionProvider = replicationProjectionProvider,
Optional.empty(),
identity)
identity,
indirectReplication = false)
}
}

Expand All @@ -138,7 +140,8 @@ final class ReplicationSettings[Command] private (
val eventProducerInterceptor: Optional[EventProducerInterceptor],
val configureEntity: java.util.function.Function[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]]) {
Entity[Command, ShardingEnvelope[Command]]],
val indirectReplication: Boolean) {

def withSelfReplicaId(selfReplicaId: ReplicaId): ReplicationSettings[Command] =
copy(selfReplicaId = selfReplicaId)
Expand Down Expand Up @@ -186,6 +189,14 @@ final class ReplicationSettings[Command] private (
Entity[Command, ShardingEnvelope[Command]]]): ReplicationSettings[Command] =
copy(configureEntity = configure)

/**
* When enabled all events will be transferred from all replicas, otherwise only events from the origin
* replica will be transferred from the origin replica.
* When each replica is connected to each other replica it's most efficient to disable indirect replication.
*/
def withIndirectReplication(enabled: Boolean): ReplicationSettings[Command] =
copy(indirectReplication = enabled)

private def copy(
selfReplicaId: ReplicaId = selfReplicaId,
entityTypeKey: EntityTypeKey[Command] = entityTypeKey,
Expand All @@ -198,7 +209,8 @@ final class ReplicationSettings[Command] private (
eventProducerInterceptor: Optional[EventProducerInterceptor] = eventProducerInterceptor,
configureEntity: java.util.function.Function[
Entity[Command, ShardingEnvelope[Command]],
Entity[Command, ShardingEnvelope[Command]]] = configureEntity): ReplicationSettings[Command] =
Entity[Command, ShardingEnvelope[Command]]] = configureEntity,
indirectReplication: Boolean = indirectReplication): ReplicationSettings[Command] =
new ReplicationSettings[Command](
selfReplicaId,
entityTypeKey,
Expand All @@ -209,7 +221,8 @@ final class ReplicationSettings[Command] private (
parallelUpdates,
projectionProvider,
eventProducerInterceptor,
configureEntity)
configureEntity,
indirectReplication)

override def toString =
s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, ${otherReplicas.asScala.mkString(", ")})"
Expand All @@ -227,5 +240,6 @@ final class ReplicationSettings[Command] private (
entityEventReplicationTimeout = entityEventReplicationTimeout.asScala,
parallelUpdates = parallelUpdates,
replicationProjectionProvider = ReplicationProjectionProviderAdapter.toScala(replicationProjectionProvider))
.withIndirectReplication(indirectReplication)

}

0 comments on commit aa62b55

Please sign in to comment.