Skip to content

Commit

Permalink
perf: Reducing memory cost while replaying (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam authored Sep 14, 2023
1 parent 60ecab2 commit ed541ff
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal {
journalDao
.messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, journalConfig.daoConfig.replayBatchSize, None)
.take(max)
.mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr))
.runForeach { case (repr, _) =>
recoveryCallback(repr)
.runForeach {
case Success((repr, _)) =>
recoveryCallback(repr)
case Failure(ex) => throw ex
}
.map(_ => ())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import akka.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed
import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }

import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration
import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -67,7 +66,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
akka.pattern.after(delay, scheduler)(retrieveNextBatch())
}
}
.mapConcat(identity(_))
.mapConcat(identity)
}

}

0 comments on commit ed541ff

Please sign in to comment.