Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delete all but last #847

Merged
merged 8 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@

package akka.persistence.jdbc.journal.dao

import scala.collection.immutable
import scala.collection.immutable.Nil
import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Try

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.config.BaseDaoConfig
import akka.persistence.jdbc.config.JournalConfig
import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow
import akka.persistence.journal.Tagged
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.persistence.AtomicWrite
import akka.persistence.PersistentRepr
import akka.serialization.Serialization
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import scala.collection.immutable
import scala.collection.immutable.{ Nil, Seq }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

/**
* A [[JournalDao]] that uses Akka serialization to serialize the payload and store
* the manifest and serializer id used.
Expand All @@ -48,21 +52,34 @@ class DefaultJournalDao(
val queries =
new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration)

override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = {
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for {
_ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)
highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId)
_ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1)
Comment on lines -53 to -55
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was indeed not efficient and there was also a little bug here.

For a journal with events: 1, 2, 3, 4, 5

Deleting up to 3, would first:
logically delete: 1, 2, 3
collect the highest logically deleted nr, thus 3
delete everything until 3, so delete 1 and 2.

The journal remains with: 3, 4, 5 while 3 is invisible because logically deleted.

There is no reason to keep 3, because it's not the highest anyway.

} yield ()
override def delete(persistenceId: String, toSequenceNr: Long): Future[Unit] = {

// note: the passed toSequenceNr will be Long.MaxValue when doing a 'full' journal clean-up
// see JournalSpec's test: 'not reset highestSequenceNr after journal cleanup'
octonato marked this conversation as resolved.
Show resolved Hide resolved
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] =
highestSequenceNrAction(persistenceId)
.flatMap {
// are we trying to delete the highest or even higher seqNr ?
case highestSeqNr if highestSeqNr <= toSequenceNr =>
// if so, we delete up to the before last and
// mark the last as logically deleted preserving highestSeqNr
queries
.delete(persistenceId, highestSeqNr - 1)
.flatMap(_ => queries.markAsDeleted(persistenceId, highestSeqNr))
case _ =>
// if not, we delete up to the requested seqNr
queries.delete(persistenceId, toSequenceNr)
}
.map(_ => ())
Comment on lines +57 to +73
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for h2 are passing now locally. Should pass for the the other databases as well. All failures were comping from the akka journal specs.


db.run(actions.transactionally)
}

override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
for {
maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
} yield maybeHighestSeqNo.getOrElse(0L)
}
override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
db.run(highestSequenceNrAction(persistenceId))

private def highestSequenceNrAction(persistenceId: String): DBIOAction[Long, NoStream, Effect.Read] =
queries.highestSequenceNrForPersistenceId(persistenceId).result.map(_.getOrElse(0))

private def highestMarkedSequenceNr(persistenceId: String) =
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ class JournalQueries(
JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete
}

private[akka] def markAsDeleted(persistenceId: String, seqNr: Long) =
JournalTable
.filter(_.persistenceId === persistenceId)
.filter(_.sequenceNumber === seqNr)
.filter(_.deleted === false)
.map(_.deleted)
.update(true)
octonato marked this conversation as resolved.
Show resolved Hide resolved

@deprecated(message = "Intended to be internal API", since = "5.4.2")
def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) =
octonato marked this conversation as resolved.
Show resolved Hide resolved
JournalTable
.filter(_.persistenceId === persistenceId)
Expand Down
6 changes: 3 additions & 3 deletions scripts/launch-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ while true; do
done;
}

docker-compose -f scripts/docker-compose.yml kill
docker-compose -f scripts/docker-compose.yml rm -f
docker-compose -f scripts/docker-compose.yml up -d
docker compose -f scripts/docker-compose.yml kill
docker compose -f scripts/docker-compose.yml rm -f
docker compose -f scripts/docker-compose.yml up -d
wait 3306 MySQL
wait 5432 Postgres
wait 1521 Oracle
Expand Down
6 changes: 3 additions & 3 deletions scripts/launch-mysql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ while true; do
done;
}

docker-compose -f scripts/docker-compose.yml kill mysql
docker-compose -f scripts/docker-compose.yml rm -f mysql
docker-compose -f scripts/docker-compose.yml up -d mysql
docker compose -f scripts/docker-compose.yml kill mysql
docker compose -f scripts/docker-compose.yml rm -f mysql
docker compose -f scripts/docker-compose.yml up -d mysql
wait 3306 MySQL
6 changes: 3 additions & 3 deletions scripts/launch-oracle.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ while true; do
done;
}

docker-compose -f scripts/docker-compose.yml kill oracle
docker-compose -f scripts/docker-compose.yml rm -f oracle
docker-compose -f scripts/docker-compose.yml up -d oracle
docker compose -f scripts/docker-compose.yml kill oracle
docker compose -f scripts/docker-compose.yml rm -f oracle
docker compose -f scripts/docker-compose.yml up -d oracle
wait 1521 Oracle
6 changes: 3 additions & 3 deletions scripts/launch-postgres.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ while true; do
done;
}

docker-compose -f scripts/docker-compose.yml kill postgres
docker-compose -f scripts/docker-compose.yml rm -f postgres
docker-compose -f scripts/docker-compose.yml up -d postgres
docker compose -f scripts/docker-compose.yml kill postgres
docker compose -f scripts/docker-compose.yml rm -f postgres
docker compose -f scripts/docker-compose.yml up -d postgres
wait 5432 Postgres
6 changes: 3 additions & 3 deletions scripts/launch-sqlserver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ while true; do
done;
}

docker-compose -f scripts/docker-compose.yml kill sqlserver
docker-compose -f scripts/docker-compose.yml rm -f sqlserver
docker-compose -f scripts/docker-compose.yml up -d sqlserver
docker compose -f scripts/docker-compose.yml kill sqlserver
docker compose -f scripts/docker-compose.yml rm -f sqlserver
docker compose -f scripts/docker-compose.yml up -d sqlserver
wait 1433 SqlServer