Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DynamoDB replay rejected events #1275

Merged
merged 28 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f172c54
fix: DynamoDB Opt-in to replay rejected events
patriknw Nov 28, 2024
85b2450
better recover
patriknw Nov 28, 2024
56b5552
more providers
patriknw Nov 28, 2024
8424450
bump: akka-persistence-dynamodb 2.0.3
patriknw Nov 28, 2024
cb9063c
test
patriknw Nov 28, 2024
a882e01
verify that it finds all events
patriknw Nov 28, 2024
5281789
unrelated test fail
patriknw Nov 28, 2024
df90091
add test for clock skew on event writes
pvlugter Nov 29, 2024
5984826
move replayIfPossible to adapted handler implementation
pvlugter Nov 29, 2024
5d9dcb8
add opt-in config setting for replay
pvlugter Nov 29, 2024
05e65ec
throw on replay failed, fix off-by-one, add offset store log prefix
pvlugter Nov 29, 2024
3bd2dc8
only fail replay for queries on missing replay (unexpected count)
pvlugter Nov 29, 2024
886fba0
add initial replay and tests for at-least-once grouped
pvlugter Nov 29, 2024
fa7f1d7
remove unnecessary implicits
pvlugter Nov 29, 2024
67fa7de
add initial replay implementation and tests for flow handler
pvlugter Nov 29, 2024
2576390
downgrade expected replay count warning to debug for flow handler
pvlugter Nov 29, 2024
df0217f
cleanup envelope source, log
patriknw Nov 29, 2024
5bdf207
replay sequentially in grouped at-least-once
patriknw Nov 29, 2024
3e0425e
additional validation to avoid duplicates for grouped at-least-once
patriknw Nov 29, 2024
362e543
no need to loadEnvelope of replayed
patriknw Nov 29, 2024
fa031e1
minor test cleanup
patriknw Nov 29, 2024
c68d93d
enable by default
patriknw Nov 29, 2024
673e606
comments in validation
patriknw Nov 29, 2024
2f237b9
log warning every 1000
patriknw Nov 29, 2024
ecf092c
exactly once
patriknw Nov 29, 2024
ccfbf4b
exactly once grouped
patriknw Nov 29, 2024
ce483e5
extract some trivial logging
patriknw Nov 29, 2024
2951934
enable mima
patriknw Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider
import akka.projection.scaladsl.SourceProvider
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source

class TestSourceProviderWithInput()(implicit val system: ActorSystem[_])
class TestSourceProviderWithInput(enableCurrentEventsByPersistenceId: Boolean)(implicit val system: ActorSystem[_])
extends SourceProvider[TimestampOffset, EventEnvelope[String]]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
with LoadEventQuery
with LoadEventsByPersistenceIdSourceProvider[String] {

def this()(implicit system: ActorSystem[_]) = this(enableCurrentEventsByPersistenceId = false)

private implicit val ec: ExecutionContext = system.executionContext
private val persistenceExt = Persistence(system)
Expand Down Expand Up @@ -96,4 +100,22 @@ class TestSourceProviderWithInput()(implicit val system: ActorSystem[_])
s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found."))
}
}

override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = {
if (enableCurrentEventsByPersistenceId)
Some(
Source(
envelopes
.iterator()
.asScala
.filter { env =>
env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr
}
.toVector))
else
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#State.evict")
3 changes: 3 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ akka.projection.dynamodb {
}
}

# Replay missed events for a particular persistence id when a sequence number is rejected by validation.
replay-on-rejected-sequence-numbers = on

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
# To use a separate client for projections this can be
# set to another config path that defines the config based on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object DynamoDBProjectionSettings {
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")),
retrySettings = RetrySettings(config.getConfig("offset-store.retries")))
retrySettings = RetrySettings(config.getConfig("offset-store.retries")),
replayOnRejectedSequenceNumbers = config.getBoolean("replay-on-rejected-sequence-numbers"))
}

/**
Expand All @@ -72,7 +73,8 @@ final class DynamoDBProjectionSettings private (
val offsetBatchSize: Int,
val offsetSliceReadParallelism: Int,
val timeToLiveSettings: TimeToLiveSettings,
val retrySettings: RetrySettings) {
val retrySettings: RetrySettings,
val replayOnRejectedSequenceNumbers: Boolean) {

// 25 is a hard limit of batch writes in DynamoDB
require(offsetBatchSize <= 25, s"offset-batch-size must be <= 25, was [$offsetBatchSize]")
Expand Down Expand Up @@ -122,6 +124,9 @@ final class DynamoDBProjectionSettings private (
def withRetrySettings(retrySettings: RetrySettings): DynamoDBProjectionSettings =
copy(retrySettings = retrySettings)

def withReplayOnRejectedSequenceNumbers(replayOnRejectedSequenceNumbers: Boolean): DynamoDBProjectionSettings =
copy(replayOnRejectedSequenceNumbers = replayOnRejectedSequenceNumbers)

@nowarn("msg=deprecated")
private def copy(
timestampOffsetTable: String = timestampOffsetTable,
Expand All @@ -132,7 +137,8 @@ final class DynamoDBProjectionSettings private (
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings,
retrySettings: RetrySettings = retrySettings) =
retrySettings: RetrySettings = retrySettings,
replayOnRejectedSequenceNumbers: Boolean = replayOnRejectedSequenceNumbers) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
useClient,
Expand All @@ -144,10 +150,24 @@ final class DynamoDBProjectionSettings private (
offsetBatchSize,
offsetSliceReadParallelism,
timeToLiveSettings,
retrySettings)
retrySettings,
replayOnRejectedSequenceNumbers)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $backtrackingWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
@nowarn("msg=deprecated")
override def toString: String =
s"DynamoDBProjectionSettings(" +
s"timestampOffsetTable=$timestampOffsetTable, " +
s"useClient=$useClient, " +
s"timeWindow=$timeWindow, " +
s"backtrackingWindow=$backtrackingWindow, " +
s"keepNumberOfEntries=$keepNumberOfEntries, " +
s"evictInterval=$evictInterval, " +
s"warnAboutFilteredEventsInFlow=$warnAboutFilteredEventsInFlow, " +
s"offsetBatchSize=$offsetBatchSize, " +
s"offsetSliceReadParallelism=$offsetSliceReadParallelism, " +
s"timeToLiveSettings=$timeToLiveSettings, " +
s"retrySettings=$retrySettings, " +
s"replayOnRejectedSequenceNumbers=$replayOnRejectedSequenceNumbers)"
}

object TimeToLiveSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private[projection] class DynamoDBOffsetStore(
projectionId: ProjectionId,
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: DynamoDBProjectionSettings,
val settings: DynamoDBProjectionSettings,
client: DynamoDbAsyncClient,
clock: Clock = Clock.systemUTC()) {

Expand All @@ -217,7 +217,7 @@ private[projection] class DynamoDBOffsetStore(
}

private val logger = LoggerFactory.getLogger(this.getClass)
private val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:"
val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:"

private val dao = new OffsetStoreDao(system, settings, projectionId, client)

Expand Down Expand Up @@ -605,10 +605,14 @@ private[projection] class DynamoDBOffsetStore(
FutureAccepted
} else if (!recordWithOffset.fromBacktracking) {
logUnexpected()
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
// and SourceProvider supports it.
FutureRejectedSeqNr
} else {
logUnexpected()
// This will result in projection restart (with normal configuration)
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
// and SourceProvider supports it.
// Otherwise this will result in projection restart (with normal configuration).
FutureRejectedBacktrackingSeqNr
}
} else if (seqNr == 1) {
Expand Down Expand Up @@ -682,6 +686,8 @@ private[projection] class DynamoDBOffsetStore(
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
// and SourceProvider supports it.
RejectedBacktrackingSeqNr
} else {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
Expand All @@ -690,6 +696,8 @@ private[projection] class DynamoDBOffsetStore(
seqNr,
pid,
recordWithOffset.offset)
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
// and SourceProvider supports it.
// Backtracking will emit missed event again.
RejectedSeqNr
}
Expand Down
Loading