Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow projections to trigger replay of events by persistence ID for read journals that expose currentEventsByPersistenceId #1256

Closed
wants to merge 1 commit into from

Conversation

leviramsey
Copy link
Contributor

The present behavior of projections is to fail if a gap in sequence numbers is detected from any backtracking query (e.g. from R2DBC or DynamoDB) and rely on restarting with backoff to clear the deck and hopefully succeed again.

For gRPC projections, the fact that persistence IDs can be filtered means that gaps are a normal, expected thing, so the CanTriggerReplay mechanism was added to fill in those gaps. To my mind, though, it seems like the intuitive thing to do in the interest of resilience is to do this even when projecting from outside of gRPC, if the source exposes currentEventsByPersistenceId.

Unit tests etc. are TODO; posting this now for socialization.

…stenceId to have replay triggered by projection
@johanandren
Copy link
Member

The scenario when this is needed, is when the events in order was not yet visible from an underlying journal (because the specific db settings was tuned too tight?), and restarting the projection is too expensive?

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complex enough that it makes me think we should maybe take a step back and see if we could do this in a less duct-tapey way, I don't have a clear idea of what that solution would be though.

override def byPersistenceIdQuery: CurrentEventsByPersistenceIdTypedQuery = query

override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long, triggeredBySeqNr: Long): Unit =
fill(persistenceId, fromSeqNr, triggeredBySeqNr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this would trigger a replay for that pid in any source created from this provider instance. That might be fine because we generally show creating a provider instance specifically where it is used, so not really expected to be shared.

I guess that explains why you made the replay logic an actor that can be shared and needs to manage it's lifecycle. Quite inconvenient that it cannot be about a specific running query when that is what we'd really want.


/** INTERNAL API */
@InternalApi
override private[scaladsl] val gapFillerRef = new AtomicReference()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messy with this kinda stateful thing on an interface that is meant to be stateless/without lifecycle. If there was lifecycle for the provider it would be fine to start it on provider start or lazily and then stop on provider stop. :/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even better, if we could tie it to the specific query

@patriknw
Copy link
Member

I understand that it's tempting to try to work around the problems with this replay mechanism, but doesn't feel good that we don't understand the reason for missed events. A major drawback is that with this workaround we wouldn't notice the problem and solve the root cause.

Also note that there could be a missing event and no further events for that pid, and then this replay wouldn't be triggered and the event is never processed.

override val maxSlice: Int,
adjustStartOffset: Option[Offset] => Future[Option[Offset]])
extends EventsBySlicesSourceProvider[Event](system, query, entityType, minSlice, maxSlice, adjustStartOffset)
with CanTriggerReplay
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if there could be an easier way? The SourceProvider could expose the underlying query (internal api). Then we could implement all of it in DynamoDBProjectionImpl when it sees the RejectedBacktrackingSeqNr. Look if the underlying query implements CurrentEventsByPersistenceIdTypedQuery and request the missing events via that.

Then call the same kind of offsetStore.validate and delegate.process for each of the missing events. All that is within the original AdaptedHandler so it completes the Future when all is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might well be an easier way. The complexity in this one arises from the possibility of the same source provider providing multiple sources (which isn't something that's really prevented by the current API, though we do demonstrate the pattern of a new provider per projection start).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yeah, if there were an alternative interface that a provider could expose (CanQueryToFillGaps?), that removes the constraint

@patriknw
Copy link
Member

Closing this since we have found the explanation of the root cause and fixed that in #1260.

@patriknw patriknw closed this Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants