From 48e921f050fa470e2b612796b2fa826457a74e2f Mon Sep 17 00:00:00 2001 From: Ruud Welling Date: Wed, 28 Aug 2019 20:39:40 +0200 Subject: [PATCH 1/2] Libary upgrades in preparation for scala 2.13 --- build.sbt | 58 +++++++++++-------- .../persistence/mongodb/MongoJournal.scala | 3 +- .../persistence/mongodb/MongoMetrics.scala | 2 +- .../persistence/mongodb/BaseUnitTest.scala | 4 +- .../mongodb/ScalaDriverSerializers.scala | 2 +- .../mongodb/ScalaDriverBsonPayloadSpec.scala | 8 +-- 6 files changed, 44 insertions(+), 33 deletions(-) diff --git a/build.sbt b/build.sbt index 83f194e9..9a46218e 100644 --- a/build.sbt +++ b/build.sbt @@ -1,34 +1,47 @@ val releaseV = "2.2.10" -val scalaV = "2.11.12" +val scala211V = "2.11.12" +val scala212V = "2.12.9" -val AkkaV = "2.5.12" //min version to have Serialization.withTransportInformation -val MongoJavaDriverVersion = "3.8.2" +val scalaV = scala211V + + +val LegacyAkkaV = "2.5.12" //min version to have Serialization.withTransportInformation +val LatestAkkaV = "2.5.25" +def akkaV(sv: String): String = sv match { + case "2.11" => LegacyAkkaV + case _ => LatestAkkaV +} + +val MongoJavaDriverVersion = "3.11.0" def commonDeps(sv:String) = Seq( - ("com.typesafe.akka" %% "akka-persistence" % AkkaV) + ("com.typesafe.akka" %% "akka-persistence" % akkaV(sv)) .exclude("org.iq80.leveldb", "leveldb") .exclude("org.fusesource.leveldbjni", "leveldbjni-all"), (sv match { - case "2.11" => "nl.grons" %% "metrics-scala" % "3.5.5_a2.3" - case "2.12" => "nl.grons" %% "metrics-scala" % "3.5.5_a2.4" + case "2.11" => "nl.grons" %% "metrics4-akka_a24" % "4.0.8" + case "2.12" | "2.13" => "nl.grons" %% "metrics4-akka_a25" % "4.0.8" }) .exclude("com.typesafe.akka", "akka-actor_2.11") - .exclude("com.typesafe.akka", "akka-actor_2.12"), - "com.typesafe.akka" %% "akka-persistence-query" % AkkaV % "compile", + .exclude("com.typesafe.akka", "akka-actor_2.12") + .exclude("com.typesafe.akka", "akka-actor_2.13"), + "com.typesafe.akka" %% "akka-persistence-query" % akkaV(sv) % "compile", + "com.typesafe.akka" %% "akka-persistence" % akkaV(sv) % "compile", + "com.typesafe.akka" %% "akka-actor" % akkaV(sv) % "compile", "org.mongodb" % "mongodb-driver-core" % MongoJavaDriverVersion % "compile", "org.mongodb" % "mongodb-driver" % MongoJavaDriverVersion % "test", "org.slf4j" % "slf4j-api" % "1.7.22" % "test", "org.apache.logging.log4j" % "log4j-api" % "2.5" % "test", "org.apache.logging.log4j" % "log4j-core" % "2.5" % "test", "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.5" % "test", - "org.scalatest" %% "scalatest" % "3.0.1" % "test", + "org.scalatest" %% "scalatest" % "3.0.8" % "test", "junit" % "junit" % "4.11" % "test", "org.mockito" % "mockito-all" % "1.9.5" % "test", - "com.typesafe.akka" %% "akka-slf4j" % AkkaV % "test", - "com.typesafe.akka" %% "akka-testkit" % AkkaV % "test", - "com.typesafe.akka" %% "akka-persistence-tck" % AkkaV % "test", - "com.typesafe.akka" %% "akka-cluster-sharding" % AkkaV % "test" + "com.typesafe.akka" %% "akka-slf4j" % akkaV(sv) % "test", + "com.typesafe.akka" %% "akka-testkit" % akkaV(sv) % "test", + "com.typesafe.akka" %% "akka-persistence-tck" % akkaV(sv) % "test", + "com.typesafe.akka" %% "akka-cluster-sharding" % akkaV(sv) % "test" ) lazy val Travis = config("travis").extend(Test) @@ -39,13 +52,12 @@ ThisBuild / scalaVersion := scalaV val commonSettings = Seq( scalaVersion := scalaV, - crossScalaVersions := Seq("2.11.12", "2.12.8"), - dependencyOverrides += "org.mongodb" % "mongodb-driver" % "3.8.2" , + crossScalaVersions := Seq(scala211V, scala212V), libraryDependencies ++= commonDeps(scalaBinaryVersion.value), dependencyOverrides ++= Seq( "com.typesafe" % "config" % "1.3.2", "org.slf4j" % "slf4j-api" % "1.7.22", - "com.typesafe.akka" %% "akka-stream" % AkkaV, + "com.typesafe.akka" %% "akka-stream" % akkaV(scalaBinaryVersion.value), "org.mongodb" % "mongo-java-driver" % MongoJavaDriverVersion ), version := releaseV, @@ -60,12 +72,10 @@ val commonSettings = Seq( "-language:implicitConversions", // "-Xfatal-warnings", Deprecations keep from enabling this "-Xlint", - "-Yno-adapted-args", "-Ywarn-dead-code", // N.B. doesn't work well with the ??? hole "-Ywarn-numeric-widen", "-Ywarn-value-discard", "-Xfuture", - "-Ywarn-unused-import", // 2.11 only "-target:jvm-1.8" ), javacOptions ++= Seq( @@ -94,6 +104,7 @@ lazy val `akka-persistence-mongo-common` = (project in file("common")) lazy val `akka-persistence-mongo-casbah` = (project in file("casbah")) .dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile") .settings(commonSettings:_*) + .settings(crossScalaVersions := Seq(scala211V, scala212V)) // not available for 2.13 .settings( libraryDependencies ++= Seq( "org.mongodb" %% "casbah" % "3.1.1" % "compile" @@ -106,8 +117,8 @@ lazy val `akka-persistence-mongo-scala` = (project in file("scala")) .settings(commonSettings:_*) .settings( libraryDependencies ++= Seq( - "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile", - "org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "compile", + "org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0" % "compile", + "org.mongodb.scala" %% "mongo-scala-bson" % "2.7.0" % "compile", "io.netty" % "netty-buffer" % "4.1.17.Final" % "compile", "io.netty" % "netty-transport" % "4.1.17.Final" % "compile", "io.netty" % "netty-handler" % "4.1.17.Final" % "compile", @@ -124,8 +135,9 @@ lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo")) Seq("reactivemongo", "reactivemongo-akkastream") .map("org.reactivemongo" %% _ % "0.18.4" % "compile") .map(_.exclude("com.typesafe.akka","akka-actor_2.11") - .exclude("com.typesafe.akka","akka-actor_2.12") - .excludeAll(ExclusionRule("org.apache.logging.log4j")) + .exclude("com.typesafe.akka","akka-actor_2.12") + .exclude("com.typesafe.akka","akka-actor_2.13") + .excludeAll(ExclusionRule("org.apache.logging.log4j")) ) ) .configs(Travis) @@ -135,7 +147,7 @@ lazy val `akka-persistence-mongo-tools` = (project in file("tools")) .settings(commonSettings:_*) .settings( libraryDependencies ++= Seq( - "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile" + "org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0" % "compile" ) ) .configs(Travis) diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala index a59201f9..c6e1665e 100755 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala @@ -4,7 +4,7 @@ import akka.actor.Actor import akka.persistence.journal.AsyncWriteJournal import akka.persistence.{AtomicWrite, PersistentRepr} import com.typesafe.config.Config -import nl.grons.metrics.scala.MetricName +import nl.grons.metrics4.scala.MetricName import scala.collection.immutable.Seq import scala.concurrent.{ExecutionContext, Future} @@ -195,4 +195,3 @@ trait MongoPersistenceJournalMetrics extends MongoPersistenceJournallingApi with = timeIt (maxTimer) { super.maxSequenceNr(pid, from) } } - \ No newline at end of file diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala index dc992559..c6b4b673 100644 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala @@ -2,7 +2,7 @@ package akka.contrib.persistence.mongodb import com.codahale.metrics.Timer.Context import com.codahale.metrics.{MetricRegistry, SharedMetricRegistries} -import nl.grons.metrics.scala._ +import nl.grons.metrics4.scala._ /** * Builds timers and histograms to record metrics. diff --git a/common/src/test/scala/akka/contrib/persistence/mongodb/BaseUnitTest.scala b/common/src/test/scala/akka/contrib/persistence/mongodb/BaseUnitTest.scala index 084e783b..54bbc022 100644 --- a/common/src/test/scala/akka/contrib/persistence/mongodb/BaseUnitTest.scala +++ b/common/src/test/scala/akka/contrib/persistence/mongodb/BaseUnitTest.scala @@ -5,7 +5,7 @@ import akka.testkit._ import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.{FlatSpecLike, Matchers} import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.mockito.MockitoSugar +import org.scalatestplus.mockito.MockitoSugar import scala.concurrent.Await import scala.util.Try @@ -30,4 +30,4 @@ object ConfigLoanFixture { () } } -} \ No newline at end of file +} diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala index ed40fbe0..8ca0b8cb 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala @@ -135,7 +135,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys } private def serializeTags(tags: Set[String]): BsonArray = - BsonArray(tags.map(BsonString(_))) + BsonArray.fromIterable(tags.map(BsonString(_))) private def serializePayload(payload: Payload)(doc: BsonDocument): BsonDocument = { val withType = doc.append(TYPE, BsonString(payload.hint)) diff --git a/scala/src/test/scala/akka/contrib/persistence/mongodb/ScalaDriverBsonPayloadSpec.scala b/scala/src/test/scala/akka/contrib/persistence/mongodb/ScalaDriverBsonPayloadSpec.scala index bf87891c..bd3dfec1 100644 --- a/scala/src/test/scala/akka/contrib/persistence/mongodb/ScalaDriverBsonPayloadSpec.scala +++ b/scala/src/test/scala/akka/contrib/persistence/mongodb/ScalaDriverBsonPayloadSpec.scala @@ -94,8 +94,8 @@ class ScalaDriverBsonPayloadSpec extends BaseUnitTest with ContainerMongo with B } private val arrays = { - val msg1 = BsonArray(BsonInt32(1) :: BsonString("2") :: Nil) - val msg2 = BsonArray(BsonDocument("a" -> BsonInt32(2)) :: BsonDocument("b" -> BsonString("3")) :: Nil) + val msg1 = BsonArray.fromIterable(BsonInt32(1) :: BsonString("2") :: Nil) + val msg2 = BsonArray.fromIterable(BsonDocument("a" -> BsonInt32(2)) :: BsonDocument("b" -> BsonString("3")) :: Nil) msg1 :: msg2 :: Nil } @@ -165,7 +165,7 @@ object PayloadSpec { private def snapshotHandling(ref: Option[ActorRef]): Receive = stateless orElse { case MakeSnapshot => - saveSnapshot(BsonArray(state)) + saveSnapshot(BsonArray.fromIterable(state)) context.become(snapshotHandling(Option(sender()))) case SaveSnapshotSuccess(m) => deleteMessages(m.sequenceNr) // clean journal for later testing of snapshot restore @@ -187,4 +187,4 @@ object PayloadSpec { sender() ! Contents(state) } } -} \ No newline at end of file +} From 4b3d6854005c49c834067ca82d03d770fc06469f Mon Sep 17 00:00:00 2001 From: Ruud Welling Date: Thu, 29 Aug 2019 21:23:23 +0200 Subject: [PATCH 2/2] Support scala 2.13 --- .travis.yml | 5 +++-- build.sbt | 3 ++- .../contrib/persistence/mongodb/MongoJournal.scala | 6 +++--- .../persistence/mongodb/RxMongoJournaller.scala | 5 +++-- .../persistence/mongodb/RxMongoJournallerSpec.scala | 10 ++++++---- .../mongodb/ScalaDriverPersistenceJournaller.scala | 5 +++-- .../mongodb/ScalaDriverPersistenceReadJournaller.scala | 2 +- .../persistence/mongodb/ScalaDriverSerializers.scala | 2 +- .../ScalaDriverMigrateToSuffixedCollections.scala | 2 +- travis_build.sh | 6 ++++++ 10 files changed, 29 insertions(+), 17 deletions(-) create mode 100755 travis_build.sh diff --git a/.travis.yml b/.travis.yml index cce44245..1fd60681 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,8 @@ services: language: scala scala: - 2.11.12 - - 2.12.8 + - 2.12.9 + - 2.13.0 env: matrix: - MONGODB_VERSION=3.0 MONGODB_OPTS="--storageEngine wiredTiger" @@ -32,4 +33,4 @@ before_script: - sleep 15 - docker exec $(docker ps -a | grep -e "--auth" | awk '{print $1;}') mongo admin --eval "db.createUser({user:'admin',pwd:'password',roles:['root']});" script: - - travis_retry sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test" + - travis_retry ./travis_build.sh diff --git a/build.sbt b/build.sbt index 9a46218e..a61f7be7 100644 --- a/build.sbt +++ b/build.sbt @@ -2,6 +2,7 @@ val releaseV = "2.2.10" val scala211V = "2.11.12" val scala212V = "2.12.9" +val scala213V = "2.13.0" val scalaV = scala211V @@ -52,7 +53,7 @@ ThisBuild / scalaVersion := scalaV val commonSettings = Seq( scalaVersion := scalaV, - crossScalaVersions := Seq(scala211V, scala212V), + crossScalaVersions := Seq(scala211V, scala212V, scala213V), libraryDependencies ++= commonDeps(scalaBinaryVersion.value), dependencyOverrides ++= Seq( "com.typesafe" % "config" % "1.3.2", diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala index c6e1665e..1bb1003f 100755 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala @@ -97,7 +97,7 @@ class MongoJournal(config: Config) extends AsyncWriteJournal { * @param replayCallback called to replay a single message. Can be called from any * thread. */ - override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = + override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr => Unit): Future[Unit] = impl.replayJournal(processorId, fromSequenceNr, toSequenceNr, max)(replayCallback) /** @@ -148,7 +148,7 @@ trait MongoPersistenceJournallingApi { private[mongodb] def deleteFrom(persistenceId: String, toSequenceNr: Long)(implicit ec: ExecutionContext): Future[Unit] - private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit)(implicit ec: ExecutionContext): Future[Unit] + private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit] private[mongodb] def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long] @@ -188,7 +188,7 @@ trait MongoPersistenceJournalMetrics extends MongoPersistenceJournallingApi with super.deleteFrom(persistenceId, toSequenceNr) } - private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit)(implicit ec: ExecutionContext): Future[Unit] + private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit] = timeIt (replayTimer) { super.replayJournal(pid, from, to, max)(replayCallback) } private[mongodb] abstract override def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long] diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala index c74f8a67..09a35dd4 100755 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala @@ -116,9 +116,10 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn batchFuture.andThen { case Success(batch) => val f = doBatchAppend(batch, realtime) - f.onFailure { - case t => + f.onComplete { + case scala.util.Failure(t) => logger.error("Error during write to realtime collection", t) + case _ => () } f }.map(squashToUnit) diff --git a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala index fafd94aa..363e8ba9 100644 --- a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala +++ b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala @@ -56,8 +56,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon val (range, head) = await(inserted) range should have size 1 - underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure { - case t => t.printStackTrace() + underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete { + case scala.util.Failure(t) => t.printStackTrace() + case _ => () } val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { @@ -90,8 +91,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon val (range, head) = await(inserted) range should have size 1 - underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure { - case t => t.printStackTrace() + underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete { + case scala.util.Failure(t) => t.printStackTrace() + case _ => () } val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala index 6fe07a20..1f0c7e87 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala @@ -118,9 +118,10 @@ class ScalaDriverPersistenceJournaller(val driver: ScalaMongoDriver) extends Mon batchFuture.andThen { case Success(batch) => val f = doBatchAppend(batch, realtime) - f.onFailure { - case t => + f.onComplete { + case scala.util.Failure(t) => logger.error("Error during write to realtime collection", t) + case _ => () } f }.map(squashToUnit) diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala index ec5a2fe6..f53c9f49 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala @@ -39,7 +39,7 @@ object CurrentAllEvents { case d:BsonDocument => driver.deserializeJournal(d) }) .getOrElse(Nil) - ).mapConcat(xs => Seq(xs:_*)) + ).mapConcat(_.toVector) ).reduceLeftOption(_ concat _) .getOrElse(Source.empty)) } diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala index 8ca0b8cb..7d511d86 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala @@ -39,7 +39,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys case Version(x,_) => throw new IllegalStateException(s"Don't know how to deserialize version $x of document") } - private def extractTags(d: BsonDocument): Seq[String] = + private def extractTags(d: BsonDocument): scala.collection.Seq[String] = Option(d.get(TAGS)).filter(_.isArray).map(_.asArray) .map(_.getValues.asScala.collect{ case s:bson.BsonString => s.getValue }) .getOrElse(Seq.empty[String]) diff --git a/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala b/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala index a9ab9e45..5f279b33 100644 --- a/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala +++ b/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala @@ -201,7 +201,7 @@ class ScalaDriverMigrateToSuffixedCollections(system: ActorSystem) extends Scala .map(_.groupBy(doc => getNewCollectionName(Option(doc.getString("_id").getValue).getOrElse("")))) .map(_.mapValues(_.foldLeft((Seq[String](), 0L)){ (acc, doc) => (acc._1 :+ Option(doc.getString("_id").getValue).getOrElse(""), acc._2 + doc.getInt32("count").getValue) - })) + }).toMap) } /** diff --git a/travis_build.sh b/travis_build.sh new file mode 100755 index 00000000..4f7588a3 --- /dev/null +++ b/travis_build.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +if [[ $TRAVIS_SCALA_VERSION == 2.13* ]]; then + sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test" +else + sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test" +fi