From ce1dfb7a7965e2c8ee2b5182da37875923098134 Mon Sep 17 00:00:00 2001 From: JeanFrancoisGuena Date: Fri, 14 Jun 2019 11:23:36 +0200 Subject: [PATCH] Source for current Persistence Ids processed in a streamed manner --- ...ScalaDriverPersistenceReadJournaller.scala | 37 ++++--------------- 1 file changed, 8 insertions(+), 29 deletions(-) diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala index 8c935c60..ec5a2fe6 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala @@ -18,9 +18,9 @@ import org.mongodb.scala.model.Sorts._ import scala.collection.JavaConverters._ import scala.collection.immutable.Seq +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Random, Try} +import scala.util.Try object CurrentAllEvents { def source(driver: ScalaMongoDriver)(implicit m: Materializer): Source[Event, NotUsed] = { @@ -48,33 +48,12 @@ object CurrentAllEvents { object CurrentPersistenceIds { def source(driver: ScalaMongoDriver)(implicit m: Materializer): Source[String, NotUsed] = { implicit val ec: ExecutionContext = driver.querySideDispatcher - val temporaryCollectionName: String = s"persistenceids-${System.currentTimeMillis()}-${Random.nextInt(1000)}" - - Source.fromFuture(for { - collections <- driver.journalCollectionsAsFuture - tmpNames <- Future.sequence(collections.zipWithIndex.map { case (c,idx) => - val nameWithIndex = s"$temporaryCollectionName-$idx" - c.aggregate( - project(include(PROCESSOR_ID)) :: - group(s"$$$PROCESSOR_ID") :: - out(nameWithIndex) :: - Nil - ) - .asAkka - .runWith(Sink.headOption) - .map(_ => nameWithIndex) - }) - tmps <- Future.sequence(tmpNames.map(driver.collection)) - } yield tmps ) - .flatMapConcat(_.map(_.find().asAkka).reduceLeftOption(_ ++ _).getOrElse(Source.empty)) - .mapConcat(c => List(c.asDocument().getString("_id").getValue)) - .alsoTo(Sink.onComplete{ _ => - driver - .getCollectionsAsFuture(temporaryCollectionName) - .foreach(cols => - cols.foreach(_.drop().toFuture()) - ) - }) + + Source.fromFuture(driver.journalCollectionsAsFuture) + .mapConcat(identity) + .flatMapConcat(_.aggregate(project(include(PROCESSOR_ID)) :: group(s"$$$PROCESSOR_ID") :: Nil).asAkka) + .map { doc => Option(doc.getString("_id").getValue).getOrElse("") } + .filterNot(_.isEmpty) } }