diff --git a/.travis.yml b/.travis.yml index cce44245..1fd60681 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,8 @@ services: language: scala scala: - 2.11.12 - - 2.12.8 + - 2.12.9 + - 2.13.0 env: matrix: - MONGODB_VERSION=3.0 MONGODB_OPTS="--storageEngine wiredTiger" @@ -32,4 +33,4 @@ before_script: - sleep 15 - docker exec $(docker ps -a | grep -e "--auth" | awk '{print $1;}') mongo admin --eval "db.createUser({user:'admin',pwd:'password',roles:['root']});" script: - - travis_retry sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test" + - travis_retry ./travis_build.sh diff --git a/build.sbt b/build.sbt index 9a46218e..a61f7be7 100644 --- a/build.sbt +++ b/build.sbt @@ -2,6 +2,7 @@ val releaseV = "2.2.10" val scala211V = "2.11.12" val scala212V = "2.12.9" +val scala213V = "2.13.0" val scalaV = scala211V @@ -52,7 +53,7 @@ ThisBuild / scalaVersion := scalaV val commonSettings = Seq( scalaVersion := scalaV, - crossScalaVersions := Seq(scala211V, scala212V), + crossScalaVersions := Seq(scala211V, scala212V, scala213V), libraryDependencies ++= commonDeps(scalaBinaryVersion.value), dependencyOverrides ++= Seq( "com.typesafe" % "config" % "1.3.2", diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala index c6e1665e..1bb1003f 100755 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoJournal.scala @@ -97,7 +97,7 @@ class MongoJournal(config: Config) extends AsyncWriteJournal { * @param replayCallback called to replay a single message. Can be called from any * thread. */ - override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = + override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr => Unit): Future[Unit] = impl.replayJournal(processorId, fromSequenceNr, toSequenceNr, max)(replayCallback) /** @@ -148,7 +148,7 @@ trait MongoPersistenceJournallingApi { private[mongodb] def deleteFrom(persistenceId: String, toSequenceNr: Long)(implicit ec: ExecutionContext): Future[Unit] - private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit)(implicit ec: ExecutionContext): Future[Unit] + private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit] private[mongodb] def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long] @@ -188,7 +188,7 @@ trait MongoPersistenceJournalMetrics extends MongoPersistenceJournallingApi with super.deleteFrom(persistenceId, toSequenceNr) } - private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit)(implicit ec: ExecutionContext): Future[Unit] + private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit] = timeIt (replayTimer) { super.replayJournal(pid, from, to, max)(replayCallback) } private[mongodb] abstract override def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long] diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala index c74f8a67..09a35dd4 100755 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala @@ -116,9 +116,10 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn batchFuture.andThen { case Success(batch) => val f = doBatchAppend(batch, realtime) - f.onFailure { - case t => + f.onComplete { + case scala.util.Failure(t) => logger.error("Error during write to realtime collection", t) + case _ => () } f }.map(squashToUnit) diff --git a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala index fafd94aa..363e8ba9 100644 --- a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala +++ b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala @@ -56,8 +56,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon val (range, head) = await(inserted) range should have size 1 - underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure { - case t => t.printStackTrace() + underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete { + case scala.util.Failure(t) => t.printStackTrace() + case _ => () } val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { @@ -90,8 +91,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon val (range, head) = await(inserted) range should have size 1 - underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure { - case t => t.printStackTrace() + underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete { + case scala.util.Failure(t) => t.printStackTrace() + case _ => () } val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala index 6fe07a20..1f0c7e87 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala @@ -118,9 +118,10 @@ class ScalaDriverPersistenceJournaller(val driver: ScalaMongoDriver) extends Mon batchFuture.andThen { case Success(batch) => val f = doBatchAppend(batch, realtime) - f.onFailure { - case t => + f.onComplete { + case scala.util.Failure(t) => logger.error("Error during write to realtime collection", t) + case _ => () } f }.map(squashToUnit) diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala index ec5a2fe6..f53c9f49 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala @@ -39,7 +39,7 @@ object CurrentAllEvents { case d:BsonDocument => driver.deserializeJournal(d) }) .getOrElse(Nil) - ).mapConcat(xs => Seq(xs:_*)) + ).mapConcat(_.toVector) ).reduceLeftOption(_ concat _) .getOrElse(Source.empty)) } diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala index 8ca0b8cb..7d511d86 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala @@ -39,7 +39,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys case Version(x,_) => throw new IllegalStateException(s"Don't know how to deserialize version $x of document") } - private def extractTags(d: BsonDocument): Seq[String] = + private def extractTags(d: BsonDocument): scala.collection.Seq[String] = Option(d.get(TAGS)).filter(_.isArray).map(_.asArray) .map(_.getValues.asScala.collect{ case s:bson.BsonString => s.getValue }) .getOrElse(Seq.empty[String]) diff --git a/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala b/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala index a9ab9e45..5f279b33 100644 --- a/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala +++ b/tools/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverMigrateToSuffixedCollections.scala @@ -201,7 +201,7 @@ class ScalaDriverMigrateToSuffixedCollections(system: ActorSystem) extends Scala .map(_.groupBy(doc => getNewCollectionName(Option(doc.getString("_id").getValue).getOrElse("")))) .map(_.mapValues(_.foldLeft((Seq[String](), 0L)){ (acc, doc) => (acc._1 :+ Option(doc.getString("_id").getValue).getOrElse(""), acc._2 + doc.getInt32("count").getValue) - })) + }).toMap) } /** diff --git a/travis_build.sh b/travis_build.sh new file mode 100755 index 00000000..4f7588a3 --- /dev/null +++ b/travis_build.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +if [[ $TRAVIS_SCALA_VERSION == 2.13* ]]; then + sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test" +else + sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test" +fi