From 93e83afbd8deeec47d9ed41a1adee76901298f85 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 19 Apr 2024 16:34:08 +0800 Subject: [PATCH] perf: improve deletion, avoid multiple rows updating. --- .../deletion-optimize.excludes | 12 +++++ .../jdbc/journal/dao/DefaultJournalDao.scala | 9 ++-- .../jdbc/journal/dao/JournalQueries.scala | 44 +++++++----------- .../dao/legacy/ByteArrayJournalDao.scala | 9 ++-- .../journal/dao/legacy/JournalQueries.scala | 45 +++++++------------ 5 files changed, 48 insertions(+), 71 deletions(-) create mode 100644 core/src/main/mima-filters/5.4.1.backwards.execludes/deletion-optimize.excludes diff --git a/core/src/main/mima-filters/5.4.1.backwards.execludes/deletion-optimize.excludes b/core/src/main/mima-filters/5.4.1.backwards.execludes/deletion-optimize.excludes new file mode 100644 index 000000000..11464cfb8 --- /dev/null +++ b/core/src/main/mima-filters/5.4.1.backwards.execludes/deletion-optimize.excludes @@ -0,0 +1,12 @@ +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.markJournalMessagesAsDeleted") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.markJournalMessagesAsDeleted") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.highestMarkedSequenceNrForPersistenceId") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.highestMarkedSequenceNrForPersistenceId") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.selectByPersistenceIdAndMaxSequenceNumber") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.selectByPersistenceIdAndMaxSequenceNumber") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.allPersistenceIdsDistinct") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.allPersistenceIdsDistinct") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.journalRowByPersistenceIds") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.journalRowByPersistenceIds") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.ReadJournalQueries.journalRowByPersistenceIds") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.legacy.ReadJournalQueries.journalRowByPersistenceIds") diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala index b2b4a8221..94ed2b68e 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala @@ -50,9 +50,9 @@ class DefaultJournalDao( override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = { val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for { - _ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr) - highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId) - _ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1) + highestSequenceNr <- queries.highestSequenceNrForPersistenceIdBefore((persistenceId, maxSequenceNr)).result + _ <- queries.delete(persistenceId, highestSequenceNr - 1) + _ <- queries.markJournalMessageAsDeleted(persistenceId, highestSequenceNr) } yield () db.run(actions.transactionally) @@ -64,9 +64,6 @@ class DefaultJournalDao( } yield maybeHighestSeqNo.getOrElse(0L) } - private def highestMarkedSequenceNr(persistenceId: String) = - queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result - override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { def serializeAtomicWrite(aw: AtomicWrite): Try[Seq[(JournalAkkaSerializationRow, Set[String])]] = { diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala index 5df745eeb..9413ef398 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala @@ -23,6 +23,10 @@ class JournalQueries( private val insertAndReturn = JournalTable.returning(JournalTable.map(_.ordering)) private val TagTableC = Compiled(TagTable) + val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) + val highestSequenceNrForPersistenceIdBefore = Compiled(_highestSequenceNrForPersistenceIdBefore _) + val messagesQuery = Compiled(_messagesQuery _) + def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])( implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { val sorted = xs.sortBy(event => event._1.sequenceNumber) @@ -59,9 +63,6 @@ class JournalQueries( } } - private def selectAllJournalForPersistenceIdDesc(persistenceId: Rep[String]) = - selectAllJournalForPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc) - private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) = JournalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc) @@ -69,10 +70,10 @@ class JournalQueries( JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete } - def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) = + def markJournalMessageAsDeleted(persistenceId: String, sequenceNr: Long) = JournalTable .filter(_.persistenceId === persistenceId) - .filter(_.sequenceNumber <= maxSequenceNr) + .filter(_.sequenceNumber === sequenceNr) .filter(_.deleted === false) .map(_.deleted) .update(true) @@ -80,28 +81,15 @@ class JournalQueries( private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = selectAllJournalForPersistenceId(persistenceId).take(1).map(_.sequenceNumber).max - private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = - selectAllJournalForPersistenceId(persistenceId).filter(_.deleted === true).take(1).map(_.sequenceNumber).max - - val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) - - val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _) - - private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) = - selectAllJournalForPersistenceIdDesc(persistenceId).filter(_.sequenceNumber <= maxSequenceNr) - - val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _) - - private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] = - JournalTable.map(_.persistenceId).distinct - - val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct) - - def journalRowByPersistenceIds(persistenceIds: Iterable[String]): Query[Rep[String], String, Seq] = - for { - query <- JournalTable.map(_.persistenceId) - if query.inSetBind(persistenceIds) - } yield query + private def _highestSequenceNrForPersistenceIdBefore( + persistenceId: Rep[String], + maxSequenceNr: Rep[Long]): Rep[Long] = + selectAllJournalForPersistenceId(persistenceId) + .filter(_.sequenceNumber <= maxSequenceNr) + .take(1) + .map(_.sequenceNumber) + .max + .getOrElse(0L) private def _messagesQuery( persistenceId: Rep[String], @@ -116,6 +104,4 @@ class JournalQueries( .sortBy(_.sequenceNumber.asc) .take(max) - val messagesQuery = Compiled(_messagesQuery _) - } diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index a001d6df5..eb9d126aa 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -81,9 +81,9 @@ trait BaseByteArrayJournalDao // We should keep journal record with highest sequence number in order to be compliant // with @see [[akka.persistence.journal.JournalSpec]] val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for { - _ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr) - highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId) - _ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1) + highestSequenceNr <- queries.highestSequenceNrForPersistenceIdBefore((persistenceId, maxSequenceNr)).result + _ <- queries.delete(persistenceId, highestSequenceNr - 1) + _ <- queries.markJournalMessageAsDeleted(persistenceId, highestSequenceNr) } yield () db.run(actions.transactionally) @@ -101,9 +101,6 @@ trait BaseByteArrayJournalDao db.run(queries.update(persistenceId, sequenceNr, serializedRow.message).map(_ => Done)) } - private def highestMarkedSequenceNr(persistenceId: String) = - queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result - override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = for { maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalQueries.scala index c16db9ede..1782f3e21 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalQueries.scala @@ -15,12 +15,13 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg private val JournalTableC = Compiled(JournalTable) + val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) + val highestSequenceNrForPersistenceIdBefore = Compiled(_highestSequenceNrForPersistenceIdBefore _) + val messagesQuery = Compiled(_messagesQuery _) + def writeJournalRows(xs: Seq[JournalRow]) = JournalTableC ++= xs.sortBy(_.sequenceNumber) - private def selectAllJournalForPersistenceIdDesc(persistenceId: Rep[String]) = - selectAllJournalForPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc) - private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) = JournalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc) @@ -38,10 +39,10 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg baseQuery.map(_.message).update(replacement) } - def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) = + def markJournalMessageAsDeleted(persistenceId: String, sequenceNr: Long) = JournalTable .filter(_.persistenceId === persistenceId) - .filter(_.sequenceNumber <= maxSequenceNr) + .filter(_.sequenceNumber === sequenceNr) .filter(_.deleted === false) .map(_.deleted) .update(true) @@ -49,28 +50,15 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = selectAllJournalForPersistenceId(persistenceId).take(1).map(_.sequenceNumber).max - private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = - selectAllJournalForPersistenceId(persistenceId).filter(_.deleted === true).take(1).map(_.sequenceNumber).max - - val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) - - val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _) - - private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) = - selectAllJournalForPersistenceIdDesc(persistenceId).filter(_.sequenceNumber <= maxSequenceNr) - - val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _) - - private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] = - JournalTable.map(_.persistenceId).distinct - - val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct) - - def journalRowByPersistenceIds(persistenceIds: Iterable[String]): Query[Rep[String], String, Seq] = - for { - query <- JournalTable.map(_.persistenceId) - if query.inSetBind(persistenceIds) - } yield query + private def _highestSequenceNrForPersistenceIdBefore( + persistenceId: Rep[String], + maxSequenceNr: Rep[Long]): Rep[Long] = + selectAllJournalForPersistenceId(persistenceId) + .filter(_.sequenceNumber <= maxSequenceNr) + .take(1) + .map(_.sequenceNumber) + .max + .getOrElse(0L) private def _messagesQuery( persistenceId: Rep[String], @@ -84,7 +72,4 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg .filter(_.sequenceNumber <= toSequenceNr) .sortBy(_.sequenceNumber.asc) .take(max) - - val messagesQuery = Compiled(_messagesQuery _) - }