diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala index b2b4a822..16090901 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala @@ -5,24 +5,28 @@ package akka.persistence.jdbc.journal.dao +import scala.collection.immutable +import scala.collection.immutable.Nil +import scala.collection.immutable.Seq +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.Try + import akka.NotUsed import akka.dispatch.ExecutionContexts import akka.persistence.jdbc.AkkaSerialization -import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig } +import akka.persistence.jdbc.config.BaseDaoConfig +import akka.persistence.jdbc.config.JournalConfig import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow import akka.persistence.journal.Tagged -import akka.persistence.{ AtomicWrite, PersistentRepr } +import akka.persistence.AtomicWrite +import akka.persistence.PersistentRepr import akka.serialization.Serialization import akka.stream.Materializer import akka.stream.scaladsl.Source import slick.jdbc.JdbcBackend.Database import slick.jdbc.JdbcProfile -import scala.collection.immutable -import scala.collection.immutable.{ Nil, Seq } -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.Try - /** * A [[JournalDao]] that uses Akka serialization to serialize the payload and store * the manifest and serializer id used. @@ -48,21 +52,34 @@ class DefaultJournalDao( val queries = new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration) - override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = { - val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for { - _ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr) - highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId) - _ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1) - } yield () + override def delete(persistenceId: String, toSequenceNr: Long): Future[Unit] = { + + // note: the passed toSequenceNr will be Long.MaxValue when doing a 'full' journal clean-up + // see JournalSpec's test: 'not reset highestSequenceNr after journal cleanup' + val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = + highestSequenceNrAction(persistenceId) + .flatMap { + // are we trying to delete the highest or even higher seqNr ? + case highestSeqNr if highestSeqNr <= toSequenceNr => + // if so, we delete up to the before last and + // mark the last as logically deleted preserving highestSeqNr + queries + .delete(persistenceId, highestSeqNr - 1) + .flatMap(_ => queries.markAsDeleted(persistenceId, highestSeqNr)) + case _ => + // if not, we delete up to the requested seqNr + queries.delete(persistenceId, toSequenceNr) + } + .map(_ => ()) db.run(actions.transactionally) } - override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { - for { - maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result) - } yield maybeHighestSeqNo.getOrElse(0L) - } + override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = + db.run(highestSequenceNrAction(persistenceId)) + + private def highestSequenceNrAction(persistenceId: String): DBIOAction[Long, NoStream, Effect.Read] = + queries.highestSequenceNrForPersistenceId(persistenceId).result.map(_.getOrElse(0)) private def highestMarkedSequenceNr(persistenceId: String) = queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala index 5df745ee..9ccf4914 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala @@ -69,6 +69,15 @@ class JournalQueries( JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete } + private[akka] def markAsDeleted(persistenceId: String, seqNr: Long) = + JournalTable + .filter(_.persistenceId === persistenceId) + .filter(_.sequenceNumber === seqNr) + .filter(_.deleted === false) + .map(_.deleted) + .update(true) + + @deprecated(message = "Intended to be internal API", since = "5.4.2") def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) = JournalTable .filter(_.persistenceId === persistenceId) diff --git a/scripts/launch-all.sh b/scripts/launch-all.sh index f7337f87..2bf03db8 100755 --- a/scripts/launch-all.sh +++ b/scripts/launch-all.sh @@ -20,9 +20,9 @@ while true; do done; } -docker-compose -f scripts/docker-compose.yml kill -docker-compose -f scripts/docker-compose.yml rm -f -docker-compose -f scripts/docker-compose.yml up -d +docker compose -f scripts/docker-compose.yml kill +docker compose -f scripts/docker-compose.yml rm -f +docker compose -f scripts/docker-compose.yml up -d wait 3306 MySQL wait 5432 Postgres wait 1521 Oracle diff --git a/scripts/launch-mysql.sh b/scripts/launch-mysql.sh index a3025179..956c7d33 100755 --- a/scripts/launch-mysql.sh +++ b/scripts/launch-mysql.sh @@ -20,7 +20,7 @@ while true; do done; } -docker-compose -f scripts/docker-compose.yml kill mysql -docker-compose -f scripts/docker-compose.yml rm -f mysql -docker-compose -f scripts/docker-compose.yml up -d mysql +docker compose -f scripts/docker-compose.yml kill mysql +docker compose -f scripts/docker-compose.yml rm -f mysql +docker compose -f scripts/docker-compose.yml up -d mysql wait 3306 MySQL diff --git a/scripts/launch-oracle.sh b/scripts/launch-oracle.sh index 92d91ea8..6ee17434 100755 --- a/scripts/launch-oracle.sh +++ b/scripts/launch-oracle.sh @@ -20,7 +20,7 @@ while true; do done; } -docker-compose -f scripts/docker-compose.yml kill oracle -docker-compose -f scripts/docker-compose.yml rm -f oracle -docker-compose -f scripts/docker-compose.yml up -d oracle +docker compose -f scripts/docker-compose.yml kill oracle +docker compose -f scripts/docker-compose.yml rm -f oracle +docker compose -f scripts/docker-compose.yml up -d oracle wait 1521 Oracle diff --git a/scripts/launch-postgres.sh b/scripts/launch-postgres.sh index 05487e12..463e7ba8 100755 --- a/scripts/launch-postgres.sh +++ b/scripts/launch-postgres.sh @@ -20,7 +20,7 @@ while true; do done; } -docker-compose -f scripts/docker-compose.yml kill postgres -docker-compose -f scripts/docker-compose.yml rm -f postgres -docker-compose -f scripts/docker-compose.yml up -d postgres +docker compose -f scripts/docker-compose.yml kill postgres +docker compose -f scripts/docker-compose.yml rm -f postgres +docker compose -f scripts/docker-compose.yml up -d postgres wait 5432 Postgres diff --git a/scripts/launch-sqlserver.sh b/scripts/launch-sqlserver.sh index a61cbff4..3f1c7fb8 100755 --- a/scripts/launch-sqlserver.sh +++ b/scripts/launch-sqlserver.sh @@ -20,7 +20,7 @@ while true; do done; } -docker-compose -f scripts/docker-compose.yml kill sqlserver -docker-compose -f scripts/docker-compose.yml rm -f sqlserver -docker-compose -f scripts/docker-compose.yml up -d sqlserver +docker compose -f scripts/docker-compose.yml kill sqlserver +docker compose -f scripts/docker-compose.yml rm -f sqlserver +docker compose -f scripts/docker-compose.yml up -d sqlserver wait 1433 SqlServer