Skip to content

Commit

Permalink
Support scala 2.13
Browse files Browse the repository at this point in the history
  • Loading branch information
WellingR committed Sep 1, 2019
1 parent 48e921f commit 4b3d685
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 17 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ services:
language: scala
scala:
- 2.11.12
- 2.12.8
- 2.12.9
- 2.13.0
env:
matrix:
- MONGODB_VERSION=3.0 MONGODB_OPTS="--storageEngine wiredTiger"
Expand All @@ -32,4 +33,4 @@ before_script:
- sleep 15
- docker exec $(docker ps -a | grep -e "--auth" | awk '{print $1;}') mongo admin --eval "db.createUser({user:'admin',pwd:'password',roles:['root']});"
script:
- travis_retry sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test"
- travis_retry ./travis_build.sh
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ val releaseV = "2.2.10"

val scala211V = "2.11.12"
val scala212V = "2.12.9"
val scala213V = "2.13.0"

val scalaV = scala211V

Expand Down Expand Up @@ -52,7 +53,7 @@ ThisBuild / scalaVersion := scalaV

val commonSettings = Seq(
scalaVersion := scalaV,
crossScalaVersions := Seq(scala211V, scala212V),
crossScalaVersions := Seq(scala211V, scala212V, scala213V),
libraryDependencies ++= commonDeps(scalaBinaryVersion.value),
dependencyOverrides ++= Seq(
"com.typesafe" % "config" % "1.3.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MongoJournal(config: Config) extends AsyncWriteJournal {
* @param replayCallback called to replay a single message. Can be called from any
* thread.
*/
override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr Unit): Future[Unit] =
override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr => Unit): Future[Unit] =
impl.replayJournal(processorId, fromSequenceNr, toSequenceNr, max)(replayCallback)

/**
Expand Down Expand Up @@ -148,7 +148,7 @@ trait MongoPersistenceJournallingApi {

private[mongodb] def deleteFrom(persistenceId: String, toSequenceNr: Long)(implicit ec: ExecutionContext): Future[Unit]

private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr Unit)(implicit ec: ExecutionContext): Future[Unit]
private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit]

private[mongodb] def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long]

Expand Down Expand Up @@ -188,7 +188,7 @@ trait MongoPersistenceJournalMetrics extends MongoPersistenceJournallingApi with
super.deleteFrom(persistenceId, toSequenceNr)
}

private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr Unit)(implicit ec: ExecutionContext): Future[Unit]
private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit]
= timeIt (replayTimer) { super.replayJournal(pid, from, to, max)(replayCallback) }

private[mongodb] abstract override def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn
batchFuture.andThen {
case Success(batch) =>
val f = doBatchAppend(batch, realtime)
f.onFailure {
case t =>
f.onComplete {
case scala.util.Failure(t) =>
logger.error("Error during write to realtime collection", t)
case _ => ()
}
f
}.map(squashToUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon
val (range, head) = await(inserted)
range should have size 1

underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure {
case t => t.printStackTrace()
underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete {
case scala.util.Failure(t) => t.printStackTrace()
case _ => ()
}

val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect {
Expand Down Expand Up @@ -90,8 +91,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon
val (range, head) = await(inserted)
range should have size 1

underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure {
case t => t.printStackTrace()
underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete {
case scala.util.Failure(t) => t.printStackTrace()
case _ => ()
}

val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ class ScalaDriverPersistenceJournaller(val driver: ScalaMongoDriver) extends Mon
batchFuture.andThen {
case Success(batch) =>
val f = doBatchAppend(batch, realtime)
f.onFailure {
case t =>
f.onComplete {
case scala.util.Failure(t) =>
logger.error("Error during write to realtime collection", t)
case _ => ()
}
f
}.map(squashToUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object CurrentAllEvents {
case d:BsonDocument => driver.deserializeJournal(d)
})
.getOrElse(Nil)
).mapConcat(xs => Seq(xs:_*))
).mapConcat(_.toVector)
).reduceLeftOption(_ concat _)
.getOrElse(Source.empty))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys
case Version(x,_) => throw new IllegalStateException(s"Don't know how to deserialize version $x of document")
}

private def extractTags(d: BsonDocument): Seq[String] =
private def extractTags(d: BsonDocument): scala.collection.Seq[String] =
Option(d.get(TAGS)).filter(_.isArray).map(_.asArray)
.map(_.getValues.asScala.collect{ case s:bson.BsonString => s.getValue })
.getOrElse(Seq.empty[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ScalaDriverMigrateToSuffixedCollections(system: ActorSystem) extends Scala
.map(_.groupBy(doc => getNewCollectionName(Option(doc.getString("_id").getValue).getOrElse(""))))
.map(_.mapValues(_.foldLeft((Seq[String](), 0L)){ (acc, doc) =>
(acc._1 :+ Option(doc.getString("_id").getValue).getOrElse(""), acc._2 + doc.getInt32("count").getValue)
}))
}).toMap)
}

/**
Expand Down
6 changes: 6 additions & 0 deletions travis_build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash
if [[ $TRAVIS_SCALA_VERSION == 2.13* ]]; then
sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test"
else
sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test"
fi

0 comments on commit 4b3d685

Please sign in to comment.