diff --git a/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala b/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala index 53e8d8f..b2a5f17 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala @@ -552,34 +552,43 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg else Flow[EventEnvelope[Event]] .statefulMapConcat(() => { - // track backtracking offset - var latestBacktracking = Instant.EPOCH + // track backtracking offset per slice + var latestBacktrackingPerSlice = Map.empty[Int, Instant] + def latestBacktracking(slice: Int): Instant = latestBacktrackingPerSlice.get(slice) match { + case Some(instant) => instant + case None => Instant.EPOCH + } env => { env.offset match { case t: TimestampOffset => - if (EnvelopeOrigin.fromBacktracking(env)) { - latestBacktracking = t.timestamp + if (EnvelopeOrigin.fromQuery(env)) { env :: Nil - } else if (EnvelopeOrigin.fromHeartbeat(env)) { - latestBacktracking = t.timestamp - Nil // always drop heartbeats - } else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking == Instant.EPOCH) { - log.trace( - "Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.", - env.persistenceId, - env.sequenceNr) - Nil - } else if (EnvelopeOrigin.fromPubSub(env) && JDuration - .between(latestBacktracking, t.timestamp) - .compareTo(maxAheadOfBacktracking) > 0) { - // drop from pubsub when too far ahead from backtracking - log.debug( - "Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking.", - env.persistenceId, - env.sequenceNr) - Nil } else { - env :: Nil + val slice = persistenceExt.sliceForPersistenceId(env.persistenceId) + if (EnvelopeOrigin.fromBacktracking(env)) { + latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp) + env :: Nil + } else if (EnvelopeOrigin.fromHeartbeat(env)) { + latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp) + Nil // always drop heartbeats + } else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking(slice) == Instant.EPOCH) { + log.trace( + "Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.", + env.persistenceId, + env.sequenceNr) + Nil + } else if (EnvelopeOrigin.fromPubSub(env) && JDuration + .between(latestBacktracking(slice), t.timestamp) + .compareTo(maxAheadOfBacktracking) > 0) { + // drop from pubsub when too far ahead from backtracking + log.debug( + "Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking.", + env.persistenceId, + env.sequenceNr) + Nil + } else { + env :: Nil + } } case _ => env :: Nil diff --git a/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubSpec.scala b/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubSpec.scala index 892d9e0..7a185ea 100644 --- a/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubSpec.scala +++ b/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubSpec.scala @@ -216,6 +216,9 @@ class EventsBySlicePubSubSpec } "skipPubSubTooFarAhead" in { + persistenceExt.sliceForPersistenceId(envA1.persistenceId) should not be persistenceExt.sliceForPersistenceId( + envB1.persistenceId) + val (in, out) = TestSource[EventEnvelope[String]]() .via( @@ -231,27 +234,47 @@ class EventsBySlicePubSubSpec // all pubsub events dropped before the first backtracking event out.expectNoMessage() - val pidC = PersistenceId(entityType, "C") in.sendNext(backtrackingEnvelope(envA1)) out.expectNext(backtrackingEnvelope(envA1)) // now the pubsub event is passed through + in.sendNext(envA1) + out.expectNext(envA1) + in.sendNext(envA2) + out.expectNext(envA2) + + // but not for another slice + in.sendNext(envB1) + out.expectNoMessage() + // until we see backtracking for that slice + in.sendNext(backtrackingEnvelope(envB1)) + out.expectNext(backtrackingEnvelope(envB1)) in.sendNext(envB1) out.expectNext(envB1) - val time2 = envA1.offset + val time1 = envA1.offset .asInstanceOf[TimestampOffset] .timestamp + val time2 = time1 .plusMillis(settings.querySettings.backtrackingWindow.toMillis) - val envC1 = createEnvelope(pidC, 1L, "c1", time2.plusMillis(1)) - val envC2 = createEnvelope(pidC, 2L, "c2", time2.plusMillis(2)) + val envA3 = createEnvelope(pidA, 3L, "a3", time2.plusMillis(1)) + val envA4 = createEnvelope(pidA, 4L, "a4", time2.plusMillis(2)) + in.sendNext(envA3) + in.sendNext(envA4) + // dropped because > backtrackingWindow + out.expectNoMessage() + + val pidCSameSlice = + randomPersistenceIdForSlice(entityType, persistenceExt.sliceForPersistenceId(pidA.id)) + val envC1 = createEnvelope(pidCSameSlice, 1L, "c1", time2.plusMillis(1)) in.sendNext(envC1) // dropped because > backtrackingWindow out.expectNoMessage() - in.sendNext(backtrackingEnvelope(envB1)) - out.expectNext(backtrackingEnvelope(envB1)) - in.sendNext(envC2) - out.expectNext(envC2) + val pidDSameSlice = + randomPersistenceIdForSlice(entityType, persistenceExt.sliceForPersistenceId(pidA.id)) + val envD1 = createEnvelope(pidDSameSlice, 1L, "d1", time1.plusMillis(1)) + in.sendNext(envD1) + out.expectNext(envD1) } "dynamically enable/disable publishing based on throughput" in new Setup {