Skip to content

Commit

Permalink
fix: skipPubSubTooFarAhead per slice (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Nov 27, 2024
1 parent bde2660 commit e67b786
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,34 +552,43 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
else
Flow[EventEnvelope[Event]]
.statefulMapConcat(() => {
// track backtracking offset
var latestBacktracking = Instant.EPOCH
// track backtracking offset per slice
var latestBacktrackingPerSlice = Map.empty[Int, Instant]
def latestBacktracking(slice: Int): Instant = latestBacktrackingPerSlice.get(slice) match {
case Some(instant) => instant
case None => Instant.EPOCH
}
env => {
env.offset match {
case t: TimestampOffset =>
if (EnvelopeOrigin.fromBacktracking(env)) {
latestBacktracking = t.timestamp
if (EnvelopeOrigin.fromQuery(env)) {
env :: Nil
} else if (EnvelopeOrigin.fromHeartbeat(env)) {
latestBacktracking = t.timestamp
Nil // always drop heartbeats
} else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking == Instant.EPOCH) {
log.trace(
"Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.",
env.persistenceId,
env.sequenceNr)
Nil
} else if (EnvelopeOrigin.fromPubSub(env) && JDuration
.between(latestBacktracking, t.timestamp)
.compareTo(maxAheadOfBacktracking) > 0) {
// drop from pubsub when too far ahead from backtracking
log.debug(
"Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking.",
env.persistenceId,
env.sequenceNr)
Nil
} else {
env :: Nil
val slice = persistenceExt.sliceForPersistenceId(env.persistenceId)
if (EnvelopeOrigin.fromBacktracking(env)) {
latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp)
env :: Nil
} else if (EnvelopeOrigin.fromHeartbeat(env)) {
latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp)
Nil // always drop heartbeats
} else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking(slice) == Instant.EPOCH) {
log.trace(
"Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.",
env.persistenceId,
env.sequenceNr)
Nil
} else if (EnvelopeOrigin.fromPubSub(env) && JDuration
.between(latestBacktracking(slice), t.timestamp)
.compareTo(maxAheadOfBacktracking) > 0) {
// drop from pubsub when too far ahead from backtracking
log.debug(
"Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking.",
env.persistenceId,
env.sequenceNr)
Nil
} else {
env :: Nil
}
}
case _ =>
env :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class EventsBySlicePubSubSpec
}

"skipPubSubTooFarAhead" in {
persistenceExt.sliceForPersistenceId(envA1.persistenceId) should not be persistenceExt.sliceForPersistenceId(
envB1.persistenceId)

val (in, out) =
TestSource[EventEnvelope[String]]()
.via(
Expand All @@ -231,27 +234,47 @@ class EventsBySlicePubSubSpec
// all pubsub events dropped before the first backtracking event
out.expectNoMessage()

val pidC = PersistenceId(entityType, "C")
in.sendNext(backtrackingEnvelope(envA1))
out.expectNext(backtrackingEnvelope(envA1))
// now the pubsub event is passed through
in.sendNext(envA1)
out.expectNext(envA1)
in.sendNext(envA2)
out.expectNext(envA2)

// but not for another slice
in.sendNext(envB1)
out.expectNoMessage()
// until we see backtracking for that slice
in.sendNext(backtrackingEnvelope(envB1))
out.expectNext(backtrackingEnvelope(envB1))
in.sendNext(envB1)
out.expectNext(envB1)

val time2 = envA1.offset
val time1 = envA1.offset
.asInstanceOf[TimestampOffset]
.timestamp
val time2 = time1
.plusMillis(settings.querySettings.backtrackingWindow.toMillis)
val envC1 = createEnvelope(pidC, 1L, "c1", time2.plusMillis(1))
val envC2 = createEnvelope(pidC, 2L, "c2", time2.plusMillis(2))
val envA3 = createEnvelope(pidA, 3L, "a3", time2.plusMillis(1))
val envA4 = createEnvelope(pidA, 4L, "a4", time2.plusMillis(2))
in.sendNext(envA3)
in.sendNext(envA4)
// dropped because > backtrackingWindow
out.expectNoMessage()

val pidCSameSlice =
randomPersistenceIdForSlice(entityType, persistenceExt.sliceForPersistenceId(pidA.id))
val envC1 = createEnvelope(pidCSameSlice, 1L, "c1", time2.plusMillis(1))
in.sendNext(envC1)
// dropped because > backtrackingWindow
out.expectNoMessage()

in.sendNext(backtrackingEnvelope(envB1))
out.expectNext(backtrackingEnvelope(envB1))
in.sendNext(envC2)
out.expectNext(envC2)
val pidDSameSlice =
randomPersistenceIdForSlice(entityType, persistenceExt.sliceForPersistenceId(pidA.id))
val envD1 = createEnvelope(pidDSameSlice, 1L, "d1", time1.plusMillis(1))
in.sendNext(envD1)
out.expectNext(envD1)
}

"dynamically enable/disable publishing based on throughput" in new Setup {
Expand Down

0 comments on commit e67b786

Please sign in to comment.