-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: DynamoDB replay rejected events #1275
Conversation
7163160
to
f172c54
Compare
case RejectedBacktrackingSeqNr => | ||
triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { | ||
replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are more places, but wanted to try this one first
Refs #1276 |
Also added a test for replaying on rejection due to clock skew on event writes (for akka/akka-persistence-dynamodb#108). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
val fromSeqNr = storedSeqNr + 1 | ||
val toSeqNr = originalEventEnvelope.sequenceNr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be useful to have a debug log (or possibly warning) that replay has been triggered, with the details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And to give us something to search for in logs to check how often this is happening.
...egration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe off by one?
...ction-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala
Outdated
Show resolved
Hide resolved
...ction-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala
Outdated
Show resolved
Hide resolved
...ction-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala
Outdated
Show resolved
Hide resolved
...ction-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala
Outdated
Show resolved
Hide resolved
...ction-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala
Outdated
Show resolved
Hide resolved
// FIXME: should we be grouping the replayed envelopes? based on what? | ||
loadEnvelope(envelope.asInstanceOf[Envelope], sourceProvider).flatMap { | ||
loadedEnvelope => | ||
val offset = extractOffsetPidSeqNr(sourceProvider, loadedEnvelope) | ||
if (isFilteredEvent(loadedEnvelope)) { | ||
offsetStore.saveOffset(offset) | ||
} else { | ||
delegate | ||
.process(Seq(loadedEnvelope)) | ||
.flatMap { _ => | ||
offsetStore.saveOffset(offset) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the accepted processing for grouped is different from at-least-once, so the rest could probably be shared between these implementations at least.
But this is currently processing the envelopes one at a time. Could make sense to group the replayed envelopes, since it's a grouped handler, but not sure what we would base the grouping on as we don't have knowledge here of what that is for the projection.
Starting to look at what flow handler would need, makes me realise that it would be useful to have the projection context passing and projection telemetry specific to replays. |
isAcceptedEnvelopes.foldLeft(FutureDone) { | ||
case (previous, (env, RejectedSeqNr)) => | ||
previous.flatMap { _ => | ||
replayIfPossible(env).map(_ => Done)(ExecutionContext.parasitic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might feel strange at first to replay the rejected first, and then process the accepted, but as far as I can see that should be fine. The order per pid should still be fine, and it will replay from previously stored seqNr so that could cover some of the accepted as well for that pid. I added a second validation step before processing the accepted to remove such duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, saw that was already there and seemed to make sense. And then second validation.
triggerReplayIfPossible(sourceProvider, offsetStore, env).map { | ||
case true => Done | ||
case false => throwRejectedEnvelope(sourceProvider, env) | ||
isAcceptedEnvelopes.foldLeft(FutureDone) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's important to run the replays sequentially so that we don't have concurrent processing and offset save.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼
} | ||
} | ||
|
||
private def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is duplication between these different replayIfPossible methods, but let's try to extract that in a separate PR so that we can release this asap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and could also see if we can refactor things to have group processing and include telemetry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
To handle unforeseen cases of rejection it will replay missing events via CurrentEventsByPersistenceIdTypedQuery
Also bumps akka-persistence-dynamodb to 2.0.3, since CurrentEventsByPersistenceIdTypedQuery is implemented in that version.
Refs #1276