Skip to content

Commit

Permalink
Merge pull request #247 from WellingR/feature/scala-2.13-support
Browse files Browse the repository at this point in the history
scala 2.13 support
  • Loading branch information
scullxbones authored Sep 2, 2019
2 parents 3fdaec7 + 4b3d685 commit 3b78137
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 49 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ services:
language: scala
scala:
- 2.11.12
- 2.12.8
- 2.12.9
- 2.13.0
env:
matrix:
- MONGODB_VERSION=3.0 MONGODB_OPTS="--storageEngine wiredTiger"
Expand All @@ -32,4 +33,4 @@ before_script:
- sleep 15
- docker exec $(docker ps -a | grep -e "--auth" | awk '{print $1;}') mongo admin --eval "db.createUser({user:'admin',pwd:'password',roles:['root']});"
script:
- travis_retry sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test"
- travis_retry ./travis_build.sh
59 changes: 36 additions & 23 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
val releaseV = "2.2.10"

val scalaV = "2.11.12"
val scala211V = "2.11.12"
val scala212V = "2.12.9"
val scala213V = "2.13.0"

val AkkaV = "2.5.12" //min version to have Serialization.withTransportInformation
val MongoJavaDriverVersion = "3.8.2"
val scalaV = scala211V


val LegacyAkkaV = "2.5.12" //min version to have Serialization.withTransportInformation
val LatestAkkaV = "2.5.25"
def akkaV(sv: String): String = sv match {
case "2.11" => LegacyAkkaV
case _ => LatestAkkaV
}

val MongoJavaDriverVersion = "3.11.0"

def commonDeps(sv:String) = Seq(
("com.typesafe.akka" %% "akka-persistence" % AkkaV)
("com.typesafe.akka" %% "akka-persistence" % akkaV(sv))
.exclude("org.iq80.leveldb", "leveldb")
.exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
(sv match {
case "2.11" => "nl.grons" %% "metrics-scala" % "3.5.5_a2.3"
case "2.12" => "nl.grons" %% "metrics-scala" % "3.5.5_a2.4"
case "2.11" => "nl.grons" %% "metrics4-akka_a24" % "4.0.8"
case "2.12" | "2.13" => "nl.grons" %% "metrics4-akka_a25" % "4.0.8"
})
.exclude("com.typesafe.akka", "akka-actor_2.11")
.exclude("com.typesafe.akka", "akka-actor_2.12"),
"com.typesafe.akka" %% "akka-persistence-query" % AkkaV % "compile",
.exclude("com.typesafe.akka", "akka-actor_2.12")
.exclude("com.typesafe.akka", "akka-actor_2.13"),
"com.typesafe.akka" %% "akka-persistence-query" % akkaV(sv) % "compile",
"com.typesafe.akka" %% "akka-persistence" % akkaV(sv) % "compile",
"com.typesafe.akka" %% "akka-actor" % akkaV(sv) % "compile",
"org.mongodb" % "mongodb-driver-core" % MongoJavaDriverVersion % "compile",
"org.mongodb" % "mongodb-driver" % MongoJavaDriverVersion % "test",
"org.slf4j" % "slf4j-api" % "1.7.22" % "test",
"org.apache.logging.log4j" % "log4j-api" % "2.5" % "test",
"org.apache.logging.log4j" % "log4j-core" % "2.5" % "test",
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.5" % "test",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"org.scalatest" %% "scalatest" % "3.0.8" % "test",
"junit" % "junit" % "4.11" % "test",
"org.mockito" % "mockito-all" % "1.9.5" % "test",
"com.typesafe.akka" %% "akka-slf4j" % AkkaV % "test",
"com.typesafe.akka" %% "akka-testkit" % AkkaV % "test",
"com.typesafe.akka" %% "akka-persistence-tck" % AkkaV % "test",
"com.typesafe.akka" %% "akka-cluster-sharding" % AkkaV % "test"
"com.typesafe.akka" %% "akka-slf4j" % akkaV(sv) % "test",
"com.typesafe.akka" %% "akka-testkit" % akkaV(sv) % "test",
"com.typesafe.akka" %% "akka-persistence-tck" % akkaV(sv) % "test",
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaV(sv) % "test"
)

lazy val Travis = config("travis").extend(Test)
Expand All @@ -39,13 +53,12 @@ ThisBuild / scalaVersion := scalaV

val commonSettings = Seq(
scalaVersion := scalaV,
crossScalaVersions := Seq("2.11.12", "2.12.8"),
dependencyOverrides += "org.mongodb" % "mongodb-driver" % "3.8.2" ,
crossScalaVersions := Seq(scala211V, scala212V, scala213V),
libraryDependencies ++= commonDeps(scalaBinaryVersion.value),
dependencyOverrides ++= Seq(
"com.typesafe" % "config" % "1.3.2",
"org.slf4j" % "slf4j-api" % "1.7.22",
"com.typesafe.akka" %% "akka-stream" % AkkaV,
"com.typesafe.akka" %% "akka-stream" % akkaV(scalaBinaryVersion.value),
"org.mongodb" % "mongo-java-driver" % MongoJavaDriverVersion
),
version := releaseV,
Expand All @@ -60,12 +73,10 @@ val commonSettings = Seq(
"-language:implicitConversions",
// "-Xfatal-warnings", Deprecations keep from enabling this
"-Xlint",
"-Yno-adapted-args",
"-Ywarn-dead-code", // N.B. doesn't work well with the ??? hole
"-Ywarn-numeric-widen",
"-Ywarn-value-discard",
"-Xfuture",
"-Ywarn-unused-import", // 2.11 only
"-target:jvm-1.8"
),
javacOptions ++= Seq(
Expand Down Expand Up @@ -94,6 +105,7 @@ lazy val `akka-persistence-mongo-common` = (project in file("common"))
lazy val `akka-persistence-mongo-casbah` = (project in file("casbah"))
.dependsOn(`akka-persistence-mongo-common` % "test->test;compile->compile")
.settings(commonSettings:_*)
.settings(crossScalaVersions := Seq(scala211V, scala212V)) // not available for 2.13
.settings(
libraryDependencies ++= Seq(
"org.mongodb" %% "casbah" % "3.1.1" % "compile"
Expand All @@ -106,8 +118,8 @@ lazy val `akka-persistence-mongo-scala` = (project in file("scala"))
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile",
"org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "compile",
"org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0" % "compile",
"org.mongodb.scala" %% "mongo-scala-bson" % "2.7.0" % "compile",
"io.netty" % "netty-buffer" % "4.1.17.Final" % "compile",
"io.netty" % "netty-transport" % "4.1.17.Final" % "compile",
"io.netty" % "netty-handler" % "4.1.17.Final" % "compile",
Expand All @@ -124,8 +136,9 @@ lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo"))
Seq("reactivemongo", "reactivemongo-akkastream")
.map("org.reactivemongo" %% _ % "0.18.4" % "compile")
.map(_.exclude("com.typesafe.akka","akka-actor_2.11")
.exclude("com.typesafe.akka","akka-actor_2.12")
.excludeAll(ExclusionRule("org.apache.logging.log4j"))
.exclude("com.typesafe.akka","akka-actor_2.12")
.exclude("com.typesafe.akka","akka-actor_2.13")
.excludeAll(ExclusionRule("org.apache.logging.log4j"))
)
)
.configs(Travis)
Expand All @@ -135,7 +148,7 @@ lazy val `akka-persistence-mongo-tools` = (project in file("tools"))
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile"
"org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0" % "compile"
)
)
.configs(Travis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.Actor
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{AtomicWrite, PersistentRepr}
import com.typesafe.config.Config
import nl.grons.metrics.scala.MetricName
import nl.grons.metrics4.scala.MetricName

import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -97,7 +97,7 @@ class MongoJournal(config: Config) extends AsyncWriteJournal {
* @param replayCallback called to replay a single message. Can be called from any
* thread.
*/
override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr Unit): Future[Unit] =
override def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr => Unit): Future[Unit] =
impl.replayJournal(processorId, fromSequenceNr, toSequenceNr, max)(replayCallback)

/**
Expand Down Expand Up @@ -148,7 +148,7 @@ trait MongoPersistenceJournallingApi {

private[mongodb] def deleteFrom(persistenceId: String, toSequenceNr: Long)(implicit ec: ExecutionContext): Future[Unit]

private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr Unit)(implicit ec: ExecutionContext): Future[Unit]
private[mongodb] def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit]

private[mongodb] def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long]

Expand Down Expand Up @@ -188,11 +188,10 @@ trait MongoPersistenceJournalMetrics extends MongoPersistenceJournallingApi with
super.deleteFrom(persistenceId, toSequenceNr)
}

private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr Unit)(implicit ec: ExecutionContext): Future[Unit]
private[mongodb] abstract override def replayJournal(pid: String, from: Long, to: Long, max: Long)(replayCallback: PersistentRepr => Unit)(implicit ec: ExecutionContext): Future[Unit]
= timeIt (replayTimer) { super.replayJournal(pid, from, to, max)(replayCallback) }

private[mongodb] abstract override def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long]
= timeIt (maxTimer) { super.maxSequenceNr(pid, from) }

}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package akka.contrib.persistence.mongodb

import com.codahale.metrics.Timer.Context
import com.codahale.metrics.{MetricRegistry, SharedMetricRegistries}
import nl.grons.metrics.scala._
import nl.grons.metrics4.scala._

/**
* Builds timers and histograms to record metrics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.testkit._
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.{FlatSpecLike, Matchers}
import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.mockito.MockitoSugar
import org.scalatestplus.mockito.MockitoSugar

import scala.concurrent.Await
import scala.util.Try
Expand All @@ -30,4 +30,4 @@ object ConfigLoanFixture {
()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn
batchFuture.andThen {
case Success(batch) =>
val f = doBatchAppend(batch, realtime)
f.onFailure {
case t =>
f.onComplete {
case scala.util.Failure(t) =>
logger.error("Error during write to realtime collection", t)
case _ => ()
}
f
}.map(squashToUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon
val (range, head) = await(inserted)
range should have size 1

underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure {
case t => t.printStackTrace()
underTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete {
case scala.util.Failure(t) => t.printStackTrace()
case _ => ()
}

val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect {
Expand Down Expand Up @@ -90,8 +91,9 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon
val (range, head) = await(inserted)
range should have size 1

underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _) onFailure {
case t => t.printStackTrace()
underExtendedTest.journalRange("unit-test", 1, 3, Int.MaxValue).runFold(List.empty[Event])(_ :+ _).onComplete {
case scala.util.Failure(t) => t.printStackTrace()
case _ => ()
}

val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ class ScalaDriverPersistenceJournaller(val driver: ScalaMongoDriver) extends Mon
batchFuture.andThen {
case Success(batch) =>
val f = doBatchAppend(batch, realtime)
f.onFailure {
case t =>
f.onComplete {
case scala.util.Failure(t) =>
logger.error("Error during write to realtime collection", t)
case _ => ()
}
f
}.map(squashToUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object CurrentAllEvents {
case d:BsonDocument => driver.deserializeJournal(d)
})
.getOrElse(Nil)
).mapConcat(xs => Seq(xs:_*))
).mapConcat(_.toVector)
).reduceLeftOption(_ concat _)
.getOrElse(Source.empty))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys
case Version(x,_) => throw new IllegalStateException(s"Don't know how to deserialize version $x of document")
}

private def extractTags(d: BsonDocument): Seq[String] =
private def extractTags(d: BsonDocument): scala.collection.Seq[String] =
Option(d.get(TAGS)).filter(_.isArray).map(_.asArray)
.map(_.getValues.asScala.collect{ case s:bson.BsonString => s.getValue })
.getOrElse(Seq.empty[String])
Expand Down Expand Up @@ -135,7 +135,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys
}

private def serializeTags(tags: Set[String]): BsonArray =
BsonArray(tags.map(BsonString(_)))
BsonArray.fromIterable(tags.map(BsonString(_)))

private def serializePayload(payload: Payload)(doc: BsonDocument): BsonDocument = {
val withType = doc.append(TYPE, BsonString(payload.hint))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class ScalaDriverBsonPayloadSpec extends BaseUnitTest with ContainerMongo with B
}

private val arrays = {
val msg1 = BsonArray(BsonInt32(1) :: BsonString("2") :: Nil)
val msg2 = BsonArray(BsonDocument("a" -> BsonInt32(2)) :: BsonDocument("b" -> BsonString("3")) :: Nil)
val msg1 = BsonArray.fromIterable(BsonInt32(1) :: BsonString("2") :: Nil)
val msg2 = BsonArray.fromIterable(BsonDocument("a" -> BsonInt32(2)) :: BsonDocument("b" -> BsonString("3")) :: Nil)
msg1 :: msg2 :: Nil
}

Expand Down Expand Up @@ -165,7 +165,7 @@ object PayloadSpec {

private def snapshotHandling(ref: Option[ActorRef]): Receive = stateless orElse {
case MakeSnapshot =>
saveSnapshot(BsonArray(state))
saveSnapshot(BsonArray.fromIterable(state))
context.become(snapshotHandling(Option(sender())))
case SaveSnapshotSuccess(m) =>
deleteMessages(m.sequenceNr) // clean journal for later testing of snapshot restore
Expand All @@ -187,4 +187,4 @@ object PayloadSpec {
sender() ! Contents(state)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ScalaDriverMigrateToSuffixedCollections(system: ActorSystem) extends Scala
.map(_.groupBy(doc => getNewCollectionName(Option(doc.getString("_id").getValue).getOrElse(""))))
.map(_.mapValues(_.foldLeft((Seq[String](), 0L)){ (acc, doc) =>
(acc._1 :+ Option(doc.getString("_id").getValue).getOrElse(""), acc._2 + doc.getInt32("count").getValue)
}))
}).toMap)
}

/**
Expand Down
6 changes: 6 additions & 0 deletions travis_build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash
if [[ $TRAVIS_SCALA_VERSION == 2.13* ]]; then
sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test"
else
sbt ++$TRAVIS_SCALA_VERSION ";akka-persistence-mongo-common/travis:test;akka-persistence-mongo-casbah/travis:test;akka-persistence-mongo-rxmongo/travis:test;akka-persistence-mongo-scala/travis:test;akka-persistence-mongo-tools/travis:test"
fi

0 comments on commit 3b78137

Please sign in to comment.