-
Notifications
You must be signed in to change notification settings - Fork 142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: delete all but last #847
Conversation
def markAsDeleted(persistenceId: String, highestMarkedSequenceNr: Long) = | ||
JournalTable | ||
.filter(_.persistenceId === persistenceId) | ||
.filter(_.sequenceNumber <= maxSequenceNr) | ||
.filter(_.sequenceNumber === highestMarkedSequenceNr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
being more explicitly and only marking a single message as delete.
Only used when deleted events though and what we want is to delete the latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this will probably not work as expected. Looking further in the code.
core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala
Outdated
Show resolved
Hide resolved
bbfb7ed
to
39684aa
Compare
core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala
Show resolved
Hide resolved
// note: the passed toSequenceNr will be Long.MaxValue when doing a 'full' journal clean-up | ||
// see JournalSpec's test: 'not reset highestSequenceNr after journal cleanup' | ||
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = | ||
highestSequenceNrAction(persistenceId) | ||
.flatMap { | ||
// are we trying to delete the highest or even higher seqNr ? | ||
case highestSeqNr if highestSeqNr <= toSequenceNr => | ||
// if so, we delete up to the before last and | ||
// mark the last as logically deleted preserving highestSeqNr | ||
queries | ||
.delete(persistenceId, highestSeqNr - 1) | ||
.flatMap(_ => queries.markAsDeleted(persistenceId, highestSeqNr)) | ||
case _ => | ||
// if not, we delete up to the requested seqNr | ||
queries.delete(persistenceId, toSequenceNr) | ||
} | ||
.map(_ => ()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests for h2 are passing now locally. Should pass for the the other databases as well. All failures were comping from the akka journal specs.
_ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr) | ||
highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId) | ||
_ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was indeed not efficient and there was also a little bug here.
For a journal with events: 1, 2, 3, 4, 5
Deleting up to 3, would first:
logically delete: 1, 2, 3
collect the highest logically deleted nr, thus 3
delete everything until 3, so delete 1 and 2.
The journal remains with: 3, 4, 5 while 3 is invisible because logically deleted.
There is no reason to keep 3, because it's not the highest anyway.
e1abbc6
to
01d5d32
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but two questions
core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala
Show resolved
Hide resolved
We are having issues with mysql tests. Container not starting. 3 days ago it passed. Might be a intermittent issue. |
First delete all event but the last and then mark the last as delete.