From 388254f953a174749dcd411b5535a349e171fad7 Mon Sep 17 00:00:00 2001 From: Brian Scully Date: Sat, 21 Dec 2019 23:47:38 -0500 Subject: [PATCH] Clean up outstanding warnings, deprecations, etc --- build.sbt | 8 +- .../persistence/mongodb/MongoDataModel.scala | 5 +- .../mongodb/MongoPersistence.scala | 10 +- .../mongodb/MongoReadJournal.scala | 1 - ...WithMongoPersistencePluginDispatcher.scala | 9 +- .../contrib/persistence/mongodb/package.scala | 5 +- .../mongodb/RxMongoJournaller.scala | 24 +-- .../mongodb/RxMongoPersistenceExtension.scala | 24 +-- .../mongodb/RxMongoReadJournaller.scala | 44 +++-- .../mongodb/RxMongoSerializers.scala | 150 +++++++++--------- .../mongodb/RxMongoSnapshotter.scala | 19 ++- .../mongodb/RxMongoJournallerSpec.scala | 70 ++++---- .../mongodb/RxMongoPersistenceSpec.scala | 2 +- .../mongodb/RxMongoSnapshotterSpec.scala | 19 +-- .../ScalaDriverPersistenceExtension.scala | 25 ++- .../ScalaDriverPersistenceJournaller.scala | 4 +- ...ScalaDriverPersistenceReadJournaller.scala | 19 +-- .../mongodb/ScalaDriverSettings.scala | 5 + 18 files changed, 230 insertions(+), 213 deletions(-) diff --git a/build.sbt b/build.sbt index 2181e172..8ef0ca3f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,8 +1,8 @@ val releaseV = "2.3.1" val scala211V = "2.11.12" -val scala212V = "2.12.9" -val scala213V = "2.13.0" +val scala212V = "2.12.10" +val scala213V = "2.13.1" val scalaV = scala211V @@ -85,8 +85,8 @@ val commonSettings = Seq( "-Xlint" ), resolvers ++= Seq( - "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/", - "Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/", + "Typesafe Releases" at "https://repo.typesafe.com/typesafe/releases/", + "Typesafe Snapshots" at "https://repo.typesafe.com/typesafe/snapshots/", "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots" ), parallelExecution in Test := true, diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoDataModel.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoDataModel.scala index 2884d62f..9d410a32 100644 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoDataModel.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoDataModel.scala @@ -12,7 +12,6 @@ import akka.serialization.{Serialization, SerializerWithStringManifest} import scala.collection.immutable.{Seq => ISeq} import scala.language.existentials -import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} sealed trait Payload { @@ -45,7 +44,7 @@ case class Serialized[C <: AnyRef](bytes: Array[Byte], className: String, tags: Set[String], serializerId: Option[Int], - serializedManifest: Option[String])(implicit ser: Serialization, loadClass: LoadClass, ct: ClassTag[C]) extends Payload { + serializedManifest: Option[String])(implicit ser: Serialization, loadClass: LoadClass) extends Payload { type Content = C val hint = "ser" @@ -150,7 +149,7 @@ object Payload { import language.implicitConversions - implicit def bson2payload[D](document: D)(implicit ev: Manifest[D], dt: DocumentType[D]): Bson[D] = Bson(document, Set.empty[String]) + implicit def bson2payload[D](document: D)(implicit dt: DocumentType[D]): Bson[D] = Bson(document, Set.empty[String]) implicit def str2payload(string: String): StringPayload = StringPayload(string, Set.empty[String]) diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala index 0a1e3681..adb8d23d 100755 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala @@ -163,13 +163,13 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) * Convenient methods to retrieve EXISTING journal collection from persistenceId. * CAUTION: this method does NOT create the journal and its indexes. */ - private[mongodb] def getJournal(persistenceId: String)(implicit ec: ExecutionContext): C = collection(getJournalCollectionName(persistenceId)) + private[mongodb] def getJournal(persistenceId: String): C = collection(getJournalCollectionName(persistenceId)) /** * Convenient methods to retrieve EXISTING snapshot collection from persistenceId. * CAUTION: this method does NOT create the snapshot and its indexes. */ - private[mongodb] def getSnaps(persistenceId: String)(implicit ec: ExecutionContext): C = collection(getSnapsCollectionName(persistenceId)) + private[mongodb] def getSnaps(persistenceId: String): C = collection(getSnapsCollectionName(persistenceId)) private[mongodb] lazy val indexes: Seq[IndexSettings] = Seq( IndexSettings(journalIndexName, unique = true, sparse = false, JournallingFieldNames.PROCESSOR_ID -> 1, FROM -> 1, TO -> 1), @@ -179,7 +179,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) private[this] val journalCache = MongoCollectionCache[C](settings.CollectionCache, "journal", actorSystem) - private[mongodb] def journal(implicit ec: ExecutionContext): C = journal("") + private[mongodb] def journal: C = journal("") private[mongodb] def journal(persistenceId: String)(implicit ec: ExecutionContext): C = { val collectionName = getJournalCollectionName(persistenceId) @@ -201,7 +201,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) private[this] val snapsCache = MongoCollectionCache[C](settings.CollectionCache, "snaps", actorSystem) - private[mongodb] def snaps(implicit ec: ExecutionContext): C = snaps("") + private[mongodb] def snaps: C = snaps("") private[mongodb] def snaps(persistenceId: String)(implicit ec: ExecutionContext): C = { val collectionName = getSnapsCollectionName(persistenceId) @@ -222,7 +222,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) private[this] val realtimeCache = MongoCollectionCache[C](settings.CollectionCache, "realtime", actorSystem) - private[mongodb] def realtime(implicit ec: ExecutionContext): C = + private[mongodb] def realtime: C = realtimeCache.getOrElseCreate(realtimeCollectionName, collectionName => cappedCollection(collectionName)) private[mongodb] val querySideDispatcher = actorSystem.dispatchers.lookup("akka-contrib-persistence-query-dispatcher") diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoReadJournal.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoReadJournal.scala index 5a817998..7660ae24 100644 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoReadJournal.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoReadJournal.scala @@ -121,7 +121,6 @@ class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implici override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = { require(tag != null, "Tag must not be null") require(impl.checkOffsetIsSupported(offset), s"Offset $offset is not supported by read journal") - implicit val ordering: Ordering[Offset] = implicitly[Ordering[Offset]] val pastSource = impl.currentEventsByTag(tag, offset) .toEventEnvelopes diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/WithMongoPersistencePluginDispatcher.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/WithMongoPersistencePluginDispatcher.scala index adbf4828..ebf5bc33 100644 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/WithMongoPersistencePluginDispatcher.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/WithMongoPersistencePluginDispatcher.scala @@ -1,9 +1,10 @@ package akka.contrib.persistence.mongodb import akka.actor.ActorSystem -import com.typesafe.config.{Config, ConfigException} +import com.typesafe.config.Config import scala.concurrent.ExecutionContextExecutor +import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} abstract class WithMongoPersistencePluginDispatcher(actorSystem: ActorSystem, config: Config) { @@ -12,9 +13,9 @@ abstract class WithMongoPersistencePluginDispatcher(actorSystem: ActorSystem, co Try(actorSystem.dispatchers.lookup(config.getString("plugin-dispatcher"))) match { case Success(configuredPluginDispatcher) => configuredPluginDispatcher - case Failure(_ : ConfigException) => - actorSystem.log.warning("plugin-dispatcher not configured for akka-contrib-mongodb-persistence. " + - "Using actor system dispatcher.") + case Failure(NonFatal(_)) => + actorSystem.log.warning("plugin-dispatcher not configured for akka-contrib-mongodb-persistence. Using actor system dispatcher.") actorSystem.dispatcher + case Failure(t) => throw t } } diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/package.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/package.scala index 537cd38c..5100b055 100644 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/package.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/package.scala @@ -30,7 +30,7 @@ package object mongodb { } implicit class OffsetWithObjectIdToo(val offsets: Offset.type) extends AnyVal { - def objectId(hexStr: String, time: Long) = + def objectId(hexStr: String, time: Long): ObjectIdOffset = ObjectIdOffset(hexStr, time) } @@ -49,6 +49,9 @@ package object mongodb { case (a:ObjectIdOffset, b:ObjectIdOffset) => a compareTo b case (_:ObjectIdOffset, _) => 0 // Can't compare case (_, _:ObjectIdOffset) => 0 // Can't compare + case _ => + // Per j.u.Comparator contract + throw new ClassCastException(s"Unsupported offset types ${x.getClass.getName} ${y.getClass.getName}") } } } diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala index 22e6e265..0198be06 100755 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoJournaller.scala @@ -15,9 +15,9 @@ import akka.stream.scaladsl.{Flow, Sink, Source} import akka.stream.{ActorMaterializer, Materializer} import org.slf4j.{Logger, LoggerFactory} import reactivemongo.akkastream._ -import reactivemongo.api.collections.bson.BSONCollection +import reactivemongo.api.bson.collection.BSONCollection import reactivemongo.api.commands.{LastError, WriteResult} -import reactivemongo.bson.{BSONDocument, _} +import reactivemongo.api.bson.{BSONDocument, _} import scala.collection.immutable.Seq import scala.concurrent._ @@ -37,10 +37,12 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn private[this] def metadata(implicit ec: ExecutionContext) = driver.metadata - private[this] def journalRangeQuery(pid: String, from: Long, to: Long) = BSONDocument( - PROCESSOR_ID -> pid, - TO -> BSONDocument("$gte" -> from), - FROM -> BSONDocument("$lte" -> to)) + private[this] def journalRangeQuery(pid: String, from: Long, to: Long) = + BSONDocument( + PROCESSOR_ID -> pid, + TO -> BSONDocument("$gte" -> from), + FROM -> BSONDocument("$lte" -> to) + ) private[this] implicit val system: ActorSystem = driver.actorSystem private[this] implicit val materializer: Materializer = ActorMaterializer() @@ -60,7 +62,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn ) val flow = Flow[BSONDocument] - .mapConcat(_.getAs[BSONArray](EVENTS).map(_.values.collect { + .mapConcat(_.getAsOpt[BSONArray](EVENTS).map(_.values.collect { case d: BSONDocument => driver.deserializeJournal(d) }).getOrElse(Stream.empty[Event])) .filter(_.sn >= from) @@ -129,7 +131,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn private[this] def findMaxSequence(persistenceId: String, maxSequenceNr: Long)(implicit ec: ExecutionContext): Future[Option[Long]] = { def performAggregation(j: BSONCollection): Future[Option[Long]] = { - import j.BatchCommands.AggregationFramework.{GroupField, Match, MaxField} + import j.aggregationFramework.{GroupField, Match, MaxField} j.aggregatorContext[BSONDocument]( Match(BSONDocument(PROCESSOR_ID -> persistenceId, TO -> BSONDocument("$lte" -> maxSequenceNr))), @@ -138,7 +140,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn ).prepared .cursor .headOption - .map(_.flatMap(_.getAs[Long]("max"))) + .map(_.flatMap(_.getAsOpt[Long]("max"))) } for { @@ -214,7 +216,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn metadata.flatMap(_.find(BSONDocument(PROCESSOR_ID -> pid), Option(BSONDocument(MAX_SN -> 1))) .cursor[BSONDocument]() .headOption - .map(d => d.flatMap(_.getAs[Long](MAX_SN)))))(l => Future.successful(Option(l))) + .map(d => d.flatMap(_.getAsOpt[Long](MAX_SN)))))(l => Future.successful(Option(l))) } private[mongodb] override def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long] = { @@ -223,7 +225,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn .sort(BSONDocument(TO -> -1)) .cursor[BSONDocument]() .headOption - .map(d => d.flatMap(_.getAs[Long](TO))) + .map(d => d.flatMap(_.getAsOpt[Long](TO))) .flatMap(maxSequenceFromMetadata(pid)) .map(_.getOrElse(0L))) } diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala index 5ccb2862..a89298f9 100755 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala @@ -10,10 +10,10 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import com.typesafe.config.{Config, ConfigFactory} import reactivemongo.api._ -import reactivemongo.api.collections.bson.BSONCollection +import reactivemongo.api.bson.collection.{BSONCollection, BSONSerializationPack} import reactivemongo.api.commands.{Command, CommandError, WriteConcern} import reactivemongo.api.indexes.{Index, IndexType} -import reactivemongo.bson.{BSONDocument, _} +import reactivemongo.api.bson.{BSONDocument, _} import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -35,9 +35,9 @@ object RxMongoPersistenceDriver { } class RxMongoDriverProvider(actorSystem: ActorSystem) { - val driver: MongoDriver = { - val md = MongoDriver() - actorSystem.registerOnTermination(driver.close()) + val driver: AsyncDriver = { + val md = AsyncDriver() + actorSystem.registerOnTermination(driver.close()(actorSystem.dispatcher)) md } } @@ -62,10 +62,11 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo implicit val waitFor: FiniteDuration = 10.seconds private[mongodb] lazy val connection: Future[MongoConnection] = - Future.fromTry(driver.connection(parsedMongoUri, strictUri = false)) + driver.connect(parsedMongoUri) private[mongodb] def closeConnections(): Unit = { driver.close(5.seconds) + () } private[mongodb] def dbName: String = databaseName.getOrElse(parsedMongoUri.db.getOrElse(DEFAULT_DB_NAME)) @@ -101,12 +102,17 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo private[mongodb] override def ensureIndex(indexName: String, unique: Boolean, sparse: Boolean, keys: (String, Int)*)(implicit ec: ExecutionContext) = { collection => val ky = keys.toSeq.map { case (f, o) => f -> (if (o > 0) IndexType.Ascending else IndexType.Descending) } - collection.flatMap(c => c.indexesManager.ensure(Index( + collection.flatMap(c => c.indexesManager.ensure(Index(BSONSerializationPack)( key = ky, background = true, unique = unique, sparse = sparse, - name = Some(indexName))).map(_ => c)) + name = Some(indexName), + dropDups = true, + version = None, + partialFilter = None, + options = BSONDocument.empty + )).map(_ => c)) } override private[mongodb] def cappedCollection(name: String)(implicit ec: ExecutionContext) = @@ -156,7 +162,7 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo val runner = Command.run(BSONSerializationPack, FailoverStrategy()) runner.apply(database, runner.rawCommand(BSONDocument("buildInfo" -> 1))) .one[BSONDocument](ReadPreference.Primary) - .map(_.getAs[BSONString]("version").getOrElse(BSONString("")).value) + .map(_.getAsOpt[BSONString]("version").getOrElse(BSONString("")).value) .map { v => mongoVersion = Some(v) v diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoReadJournaller.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoReadJournaller.scala index 15b9613d..c31c0957 100755 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoReadJournaller.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoReadJournaller.scala @@ -15,7 +15,7 @@ import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} import org.reactivestreams.{Publisher, Subscriber, Subscription} import reactivemongo.akkastream._ import reactivemongo.api.QueryOpts -import reactivemongo.bson._ +import reactivemongo.api.bson._ import scala.concurrent.{ExecutionContext, Future} import scala.util.Random @@ -31,10 +31,10 @@ object CurrentAllEvents { .cursor[BSONDocument]() .documentSource() .map { doc => - doc.getAs[BSONArray](EVENTS) - .map(_.elements - .map(_.value) - .collect{ case d:BSONDocument => driver.deserializeJournal(d) }) + doc.getAsOpt[BSONArray](EVENTS) + .map(_.values.collect{ + case d:BSONDocument => driver.deserializeJournal(d) + }) .getOrElse(Nil) }.mapConcat(identity) }.reduceLeftOption(_ concat _) @@ -50,7 +50,7 @@ object CurrentPersistenceIds { Source.fromFuture(for { collections <- driver.journalCollectionsAsFuture tmpNames <- Future.sequence(collections.zipWithIndex.map { case (c,idx) => - import c.BatchCommands.AggregationFramework.{Group, Out, Project} + import c.aggregationFramework.{Group, Out, Project} val nameWithIndex = s"$temporaryCollectionName-$idx" c.aggregatorContext[BSONDocument]( Project(BSONDocument(PROCESSOR_ID -> 1)), @@ -66,7 +66,7 @@ object CurrentPersistenceIds { tmps <- Future.sequence(tmpNames.map(driver.collection)) } yield tmps ) .flatMapConcat(cols => cols.map(_.find(BSONDocument(), Option.empty[BSONDocument]).cursor[BSONDocument]().documentSource()).reduce(_ ++ _)) - .mapConcat(_.getAs[String]("_id").toList) + .mapConcat(_.getAsOpt[String]("_id").toList) .alsoTo(Sink.onComplete{ _ => driver .getCollectionsAsFuture(temporaryCollectionName) @@ -78,7 +78,7 @@ object CurrentPersistenceIds { } object CurrentEventsByPersistenceId { - def queryFor(persistenceId: String, fromSeq: Long, toSeq: Long) = BSONDocument( + def queryFor(persistenceId: String, fromSeq: Long, toSeq: Long): BSONDocument = BSONDocument( PROCESSOR_ID -> persistenceId, TO -> BSONDocument("$gte" -> fromSeq), FROM -> BSONDocument("$lte" -> toSeq) @@ -97,10 +97,10 @@ object CurrentEventsByPersistenceId { .cursor[BSONDocument]() .documentSource() ).map( doc => - doc.getAs[BSONArray](EVENTS) - .map(_.elements - .map(_.value) - .collect{ case d:BSONDocument => driver.deserializeJournal(d) }) + doc.getAsOpt[BSONArray](EVENTS) + .map(_.values.collect{ + case d:BSONDocument => driver.deserializeJournal(d) + }) .getOrElse(Nil) ).mapConcat(identity) } @@ -117,7 +117,7 @@ object CurrentEventsByTag { } val query = BSONDocument( TAGS -> tag - ).merge(offset.fold(BSONDocument.empty)(id => BSONDocument(ID -> BSONDocument("$gt" -> id)))) + ) ++ offset.fold(BSONDocument.empty)(id => BSONDocument(ID -> BSONDocument("$gt" -> id))) Source.fromFuture(driver.journalCollectionsAsFuture) .flatMapConcat{ xs => @@ -129,13 +129,11 @@ object CurrentEventsByTag { ).reduceLeftOption(_ ++ _) .getOrElse(Source.empty) }.map{ doc => - val id = doc.getAs[BSONObjectID](ID).get - doc.getAs[BSONArray](EVENTS) - .map(_.elements - .map(_.value) - .collect{ case d:BSONDocument => driver.deserializeJournal(d) -> ObjectIdOffset(id.stringify, id.time) } - .filter(_._1.tags.contains(tag)) - ) + val id = doc.getAsOpt[BSONObjectID](ID).get + doc.getAsOpt[BSONArray](EVENTS) + .map(_.values.collect{ + case d:BSONDocument => driver.deserializeJournal(d) -> ObjectIdOffset(id.stringify, id.time) + }.filter(_._1.tags.contains(tag))) .getOrElse(Nil) }.mapConcat(identity) } @@ -171,7 +169,7 @@ class RxMongoRealtimeGraphStage(driver: RxMongoDriver, bufsz: Int = 16)(factory: } else buffer = buffer ::: List(doc) - lastId = doc.getAs[BSONObjectID]("_id") + lastId = doc.getAsOpt[BSONObjectID]("_id") } private def errAc = getAsyncCallback[Throwable](failStage) @@ -249,8 +247,8 @@ class RxMongoJournalStream(driver: RxMongoDriver)(implicit m: Materializer) exte ) .via(killSwitch.flow) .mapConcat { d => - val id = d.getAs[BSONObjectID](ID).get - d.getAs[BSONArray](EVENTS).map(_.elements.map(e => e.value).collect { + val id = d.getAsOpt[BSONObjectID](ID).get + d.getAsOpt[BSONArray](EVENTS).map(_.values.collect { case d: BSONDocument => driver.deserializeJournal(d) -> ObjectIdOffset(id.stringify, id.time) }).getOrElse(Nil) } diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala index b5725566..75677e28 100644 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSerializers.scala @@ -8,11 +8,10 @@ import akka.actor.{ActorRef, ActorSystem, DynamicAccess, ExtendedActorSystem, Ex import akka.persistence.serialization.Snapshot import akka.persistence.{PersistentRepr, SelectedSnapshot, SnapshotMetadata} import akka.serialization.{Serialization, SerializationExtension} -import reactivemongo.bson._ -import DefaultBSONHandlers._ +import reactivemongo.api.bson._ object RxMongoSerializersExtension extends ExtensionId[RxMongoSerializers] with ExtensionIdProvider { - override def lookup = RxMongoSerializersExtension + override def lookup: ExtensionId[RxMongoSerializers] = RxMongoSerializersExtension override def createExtension(system: ExtendedActorSystem) = new RxMongoSerializers(system.dynamicAccess, system) @@ -23,8 +22,8 @@ object RxMongoSerializersExtension extends ExtensionId[RxMongoSerializers] with object RxMongoSerializers { implicit class PimpedBSONDocument(val doc: BSONDocument) extends AnyVal { - def as[A](key: String)(implicit ev: Manifest[A], reader: BSONReader[_ <: BSONValue, A]): A = - doc.getAs[A](key) + def as[A](key: String)(implicit ev: Manifest[A], reader: BSONReader[A]): A = + doc.getAsOpt[A](key) .getOrElse(throw new IllegalArgumentException(s"Could not deserialize required key $key of type ${ev.runtimeClass.getName}")) } @@ -35,56 +34,60 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem) implicit val loadClass: LoadClass = dynamicAccess private implicit val system: ActorSystem = actorSystem - implicit val serialization = SerializationExtension(actorSystem) + implicit val serialization: Serialization = SerializationExtension(actorSystem) implicit val dt: DocumentType[BSONDocument] = new DocumentType[BSONDocument] { } object Version { def unapply(d: BSONDocument): Option[(Int,BSONDocument)] = { - d.getAs[Int](JournallingFieldNames.VERSION).orElse(Option(0)).map(_ -> d) + d.getAsOpt[Int](JournallingFieldNames.VERSION).orElse(Option(0)).map(_ -> d) } } - implicit object RxMongoSnapshotSerialization extends BSONDocumentReader[SelectedSnapshot] with BSONDocumentWriter[SelectedSnapshot] with SnapshottingFieldNames { - - override def read(doc: BSONDocument): SelectedSnapshot = { - val content = doc.getAs[Array[Byte]](V1.SERIALIZED) - if (content.isDefined) { - serialization.deserialize(content.get, classOf[SelectedSnapshot]).get - } else { - val pid = doc.as[String](PROCESSOR_ID) - val sn = doc.as[Long](SEQUENCE_NUMBER) - val timestamp = doc.as[Long](TIMESTAMP) - - val content = doc.get(V2.SERIALIZED) match { - case Some(b: BSONDocument) => - b - case Some(_) => - val snapshot = doc.as[Array[Byte]](V2.SERIALIZED) - val deserialized = serialization.deserialize(snapshot, classOf[Snapshot]).get - deserialized.data - case None => - throw new IllegalStateException(s"Snapshot unreadable, missing serialized snapshot field ${V2.SERIALIZED}") - } + implicit val SelectedSnapshotReader: BSONDocumentReader[SelectedSnapshot] = BSONDocumentReader { doc => + import SnapshottingFieldNames._ - SelectedSnapshot(SnapshotMetadata(pid,sn,timestamp),content) - } - } + val content = doc.getAsOpt[Array[Byte]](V1.SERIALIZED) + if (content.isDefined) { + serialization.deserialize(content.get, classOf[SelectedSnapshot]).get + } else { + val pid = doc.as[String](PROCESSOR_ID) + val sn = doc.as[Long](SEQUENCE_NUMBER) + val timestamp = doc.as[Long](TIMESTAMP) - override def write(snap: SelectedSnapshot): BSONDocument = { - val content: BSONValue = snap.snapshot match { - case b: BSONDocument => + val content = doc.get(V2.SERIALIZED) match { + case Some(b: BSONDocument) => b - case _ => - Serialization.withTransportInformation(serialization.system) { () => - BSON.write(serialization.serialize(Snapshot(snap.snapshot)).get) - } + case Some(_) => + val snapshot = doc.as[Array[Byte]](V2.SERIALIZED) + val deserialized = serialization.deserialize(snapshot, classOf[Snapshot]).get + deserialized.data + case None => + throw new IllegalStateException(s"Snapshot unreadable, missing serialized snapshot field ${V2.SERIALIZED}") } - BSONDocument(PROCESSOR_ID -> snap.metadata.persistenceId, - SEQUENCE_NUMBER -> snap.metadata.sequenceNr, - TIMESTAMP -> snap.metadata.timestamp, - V2.SERIALIZED -> content) + + SelectedSnapshot(SnapshotMetadata(pid,sn,timestamp),content) } + } + + implicit val SelectedSnapshotWriter: BSONDocumentWriter[SelectedSnapshot] = BSONDocumentWriter { snap => + import SnapshottingFieldNames._ + + val content: BSONValue = snap.snapshot match { + case b: BSONDocument => + b + case _ => + Serialization.withTransportInformation(serialization.system) { () => + BSONBinary(serialization.serialize(Snapshot(snap.snapshot)).get, Subtype.OldBinarySubtype) + } + } + BSONDocument(PROCESSOR_ID -> snap.metadata.persistenceId, + SEQUENCE_NUMBER -> snap.metadata.sequenceNr, + TIMESTAMP -> snap.metadata.timestamp, + V2.SERIALIZED -> content) + } + + implicit object LegacyRxMongoSnapshotSerialization extends SnapshottingFieldNames { @deprecated("Use v2 write instead", "0.3.0") def legacyWrite(snap: SelectedSnapshot): BSONDocument = { @@ -112,40 +115,42 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem) payload = deserializePayload( d.get(PayloadKey).get, d.as[String](TYPE), - d.getAs[BSONArray](TAGS).toList.flatMap(_.values.collect{ case BSONString(s) => s }).toSet, - d.getAs[String](HINT), - d.getAs[Int](SER_ID), - d.getAs[String](SER_MANIFEST) + d.getAsOpt[BSONArray](TAGS).toList.flatMap(_.values.collect{ case BSONString(s) => s }).toSet, + d.getAsOpt[String](HINT), + d.getAsOpt[Int](SER_ID), + d.getAsOpt[String](SER_MANIFEST) ), - sender = d.getAs[Array[Byte]](SenderKey).flatMap(serialization.deserialize(_, classOf[ActorRef]).toOption), - manifest = d.getAs[String](MANIFEST), - writerUuid = d.getAs[String](WRITER_UUID) + sender = d.getAsOpt[Array[Byte]](SenderKey).flatMap(serialization.deserialize(_, classOf[ActorRef]).toOption), + manifest = d.getAsOpt[String](MANIFEST), + writerUuid = d.getAsOpt[String](WRITER_UUID) ) - private def deserializePayload(b: BSONValue, clue: String, tags: Set[String], clazzName: Option[String], serializerId: Option[Int], serializedManifest: Option[String])(implicit serialization: Serialization): Payload = (clue,b) match { - case ("ser",BSONBinary(bfr, _)) => - Serialized(bfr.readArray(bfr.size), clazzName.getOrElse(classOf[AnyRef].getName), tags, serializerId, serializedManifest) - case ("bson",d:BSONDocument) => Bson(d, tags) - case ("bin",BSONBinary(bfr, _)) => Bin(bfr.readArray(bfr.size), tags) - case ("repr",BSONBinary(bfr, _)) => Legacy(bfr.readArray(bfr.size), tags) - case ("s",BSONString(s)) => StringPayload(s, tags) - case ("d",BSONDouble(d)) => FloatingPointPayload(d, tags) - case ("l",BSONLong(l)) => FixedPointPayload(l, tags) - case ("b",BSONBoolean(bln)) => BooleanPayload(bln, tags) - case (x,y) => throw new IllegalArgumentException(s"Unknown hint $x or type for payload content $y") - } + private def deserializePayload(b: BSONValue, clue: String, tags: Set[String], clazzName: Option[String], serializerId: Option[Int], serializedManifest: Option[String])(implicit serialization: Serialization): Payload = + (clue,b) match { + case ("ser",b:BSONBinary) => + val bfr = b.byteArray + Serialized(bfr, clazzName.getOrElse(classOf[AnyRef].getName), tags, serializerId, serializedManifest) + case ("bson",d:BSONDocument) => Bson(d, tags) + case ("bin",b:BSONBinary) => Bin(b.byteArray, tags) + case ("repr",b:BSONBinary) => Legacy(b.byteArray, tags) + case ("s",BSONString(s)) => StringPayload(s, tags) + case ("d",BSONDouble(d)) => FloatingPointPayload(d, tags) + case ("l",BSONLong(l)) => FixedPointPayload(l, tags) + case ("b",BSONBoolean(bln)) => BooleanPayload(bln, tags) + case (x,y) => throw new IllegalArgumentException(s"Unknown hint $x or type for payload content $y") + } private def deserializeDocumentLegacy(document: BSONDocument)(implicit serialization: Serialization, system: ActorSystem): Event = { val persistenceId = document.as[String](PROCESSOR_ID) val sequenceNr = document.as[Long](SEQUENCE_NUMBER) - val tags = document.getAs[BSONArray](TAGS).toList.flatMap(_.values.collect{ case BSONString(s) => s }).toSet + val tags = document.getAsOpt[BSONArray](TAGS).toList.flatMap(_.values.collect{ case BSONString(s) => s }).toSet document.get(SERIALIZED) match { case Some(b: BSONDocument) => Event(pid = persistenceId, sn = sequenceNr, payload = Bson(b.as[BSONDocument](PayloadKey), tags), - sender = b.getAs[Array[Byte]](SenderKey).flatMap(serialization.deserialize(_, classOf[ActorRef]).toOption), + sender = b.getAsOpt[Array[Byte]](SenderKey).flatMap(serialization.deserialize(_, classOf[ActorRef]).toOption), manifest = None) case Some(ser: BSONBinary) => val repr = serialization.deserialize(ser.byteArray, classOf[PersistentRepr]) @@ -171,28 +176,27 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem) EVENTS -> BSONArray(atom.events.map(serializeEvent)), VERSION -> 1 ) - ){ case(d,tags) => d.merge(TAGS -> serializeTags(tags)) } + ){ case(d,tags) => d ++ BSONDocument(TAGS -> serializeTags(tags)) } } - import Producer._ private def serializeEvent(event: Event): BSONDocument = { val doc = serializePayload(event.payload)( BSONDocument(VERSION -> 1, PROCESSOR_ID -> event.pid, SEQUENCE_NUMBER -> event.sn)) (for { d <- Option(doc) - d <- Option(event.tags).filter(_.nonEmpty).map(tags => d.merge(TAGS -> serializeTags(tags))).orElse(Option(d)) - d <- event.manifest.map(m => d.merge(MANIFEST -> m)).orElse(Option(d)) - d <- event.writerUuid.map(u => d.merge(WRITER_UUID -> u)).orElse(Option(d)) + d <- Option(event.tags).filter(_.nonEmpty).map(tags => d ++ BSONDocument(TAGS -> serializeTags(tags))).orElse(Option(d)) + d <- event.manifest.map(m => d ++ BSONDocument(MANIFEST -> m)).orElse(Option(d)) + d <- event.writerUuid.map(u => d ++ BSONDocument(WRITER_UUID -> u)).orElse(Option(d)) d <- event.sender .filterNot(_ == actorSystem.deadLetters) .flatMap(serialization.serialize(_).toOption) - .map(BSON.write(_)) - .map(b => d.merge(SenderKey -> b)).orElse(Option(d)) + .map(BSONBinary(_, Subtype.GenericBinarySubtype)) + .map(b => d ++ BSONDocument(SenderKey -> b)).orElse(Option(d)) } yield d).getOrElse(doc) } private def serializeTags(tags: Set[String]): BSONArray = - BSONArray(tags.map(BSONString)) + BSONArray(tags.map(BSONString(_))) private def serializePayload(payload: Payload)(document: BSONDocument) = { val asDoc = payload match { @@ -200,7 +204,7 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem) case Bin(bytes, _) => BSONDocument(PayloadKey -> bytes) case Legacy(bytes, _) => BSONDocument(PayloadKey -> bytes) case s: Serialized[_] => - BSONDocument(PayloadKey -> BSON.write(s.bytes), + BSONDocument(PayloadKey -> BSONBinary(s.bytes, Subtype.GenericBinarySubtype), HINT -> s.className, SER_ID -> s.serializerId, SER_MANIFEST -> s.serializedManifest) @@ -211,7 +215,7 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem) case x => throw new IllegalArgumentException(s"Unable to serialize payload of type $x") } - document.merge(BSONDocument(TYPE -> payload.hint).merge(asDoc)) + document ++ BSONDocument(TYPE -> payload.hint) ++ asDoc } } diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotter.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotter.scala index 53d2ab33..4be0d0ad 100644 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotter.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotter.scala @@ -8,7 +8,8 @@ package akka.contrib.persistence.mongodb import akka.persistence.SelectedSnapshot import reactivemongo.api.indexes._ -import reactivemongo.bson._ +import reactivemongo.api.bson._ +import reactivemongo.api.bson.collection.BSONSerializationPack import scala.concurrent._ @@ -47,8 +48,8 @@ class RxMongoSnapshotter(driver: RxMongoDriver) extends MongoPersistenceSnapshot private[mongodb] def deleteSnapshot(pid: String, seq: Long, ts: Long)(implicit ec: ExecutionContext) = { val criteria = - Seq[Producer[BSONElement]](PROCESSOR_ID -> pid, SEQUENCE_NUMBER -> seq) ++ - Option[Producer[BSONElement]](TIMESTAMP -> ts).filter(_ => ts > 0).toSeq + Seq[ElementProducer](PROCESSOR_ID -> pid, SEQUENCE_NUMBER -> seq) ++ + Option[ElementProducer](TIMESTAMP -> ts).filter(_ => ts > 0).toSeq for { s <- snaps(pid) @@ -78,15 +79,21 @@ class RxMongoSnapshotter(driver: RxMongoDriver) extends MongoPersistenceSnapshot } } - private[this] def snaps(suffix: String)(implicit ec: ExecutionContext) = { + private[this] def snaps(suffix: String)(implicit ec: ExecutionContext): driver.C = { val snaps = driver.getSnaps(suffix) - snaps.flatMap(_.indexesManager.ensure(Index( + snaps.flatMap(_.indexesManager.ensure(Index(BSONSerializationPack)( key = Seq((PROCESSOR_ID, IndexType.Ascending), (SEQUENCE_NUMBER, IndexType.Descending), (TIMESTAMP, IndexType.Descending)), background = true, unique = true, - name = Some(driver.snapsIndexName)))) + name = Some(driver.snapsIndexName), + dropDups = true, + sparse = false, + version = None, + partialFilter = None, + options = BSONDocument.empty + ))) snaps } diff --git a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala index 363e8ba9..784bb404 100644 --- a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala +++ b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoJournallerSpec.scala @@ -8,11 +8,11 @@ package akka.contrib.persistence.mongodb import akka.actor.ActorSystem import akka.persistence.{AtomicWrite, PersistentRepr} -import akka.serialization.SerializationExtension +import akka.serialization.{Serialization, SerializationExtension} import akka.stream.{ActorMaterializer, Materializer} import akka.testkit._ import reactivemongo.api.Cursor -import reactivemongo.bson._ +import reactivemongo.api.bson._ import scala.collection.immutable.{Seq => ISeq} import scala.concurrent._ @@ -24,7 +24,7 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon override def embedDB = "persistence-journaller-rxmongo" - implicit val serialization = SerializationExtension(system) + implicit val serialization: Serialization = SerializationExtension(system) implicit val as: ActorSystem = system implicit val am: Materializer = ActorMaterializer() @@ -50,8 +50,8 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon withJournal { journal => val inserted = for { _ <- underTest.batchAppend(ISeq(AtomicWrite(records))) - range <- journal.find(BSONDocument()).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) - head <- journal.find(BSONDocument()).cursor().headOption + range <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) + head <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor().headOption } yield (range, head) val (range, head) = await(inserted) range should have size 1 @@ -61,11 +61,11 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon case _ => () } - val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { + val recone = head.get.getAsOpt[BSONArray](EVENTS).toStream.flatMap(_.values.collect { case e: BSONDocument => e }).head - recone.getAs[String](PROCESSOR_ID) shouldBe Some("unit-test") - recone.getAs[Long](SEQUENCE_NUMBER) shouldBe Some(1) + recone.getAsOpt[String](PROCESSOR_ID) shouldBe Some("unit-test") + recone.getAsOpt[Long](SEQUENCE_NUMBER) shouldBe Some(1) } } @@ -85,8 +85,8 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon // should 'retrieve' (and not 'build') the suffixed journal journal <- drv.getJournal("unit-test") - range <- journal.find(BSONDocument()).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) - head <- journal.find(BSONDocument()).cursor().headOption + range <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) + head <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor().headOption } yield (range, head) val (range, head) = await(inserted) range should have size 1 @@ -96,11 +96,11 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon case _ => () } - val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { + val recone = head.get.getAsOpt[BSONArray](EVENTS).toStream.flatMap(_.values.collect { case e: BSONDocument => e }).head - recone.getAs[String](PROCESSOR_ID) shouldBe Some("unit-test") - recone.getAs[Long](SEQUENCE_NUMBER) shouldBe Some(1) + recone.getAsOpt[String](PROCESSOR_ID) shouldBe Some("unit-test") + recone.getAsOpt[Long](SEQUENCE_NUMBER) shouldBe Some(1) } } @@ -112,19 +112,19 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon withJournal { journal => val inserted = for { _ <- underTest.batchAppend(ISeq(AtomicWrite(documents))) - range <- journal.find(BSONDocument()).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) - head <- journal.find(BSONDocument()).cursor().headOption + range <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) + head <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor().headOption } yield (range, head) val (range, head) = await(inserted) range should have size 1 - val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { + val recone = head.get.getAsOpt[BSONArray](EVENTS).toStream.flatMap(_.values.collect { case e: BSONDocument => e }).head - recone.getAs[String](PROCESSOR_ID) shouldBe Some("unit-test") - recone.getAs[Long](SEQUENCE_NUMBER) shouldBe Some(10) - recone.getAs[String](TYPE) shouldBe Some("bson") - recone.getAs[BSONDocument](PayloadKey) shouldBe Some(BSONDocument("foo" -> "bar", "baz" -> 1)) + recone.getAsOpt[String](PROCESSOR_ID) shouldBe Some("unit-test") + recone.getAsOpt[Long](SEQUENCE_NUMBER) shouldBe Some(10) + recone.getAsOpt[String](TYPE) shouldBe Some("bson") + recone.getAsOpt[BSONDocument](PayloadKey) shouldBe Some(BSONDocument("foo" -> "bar", "baz" -> 1)) () } } @@ -142,19 +142,19 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon val inserted: Future[(List[BSONDocument],Option[BSONDocument])] = for { _ <- underTest.batchAppend(ISeq(AtomicWrite(withSerializedObjects))) - range <- journal.find(BSONDocument()).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) - head <- journal.find(BSONDocument()).cursor().headOption + range <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) + head <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor().headOption } yield (range, head) val (range, head) = await(inserted) range should have size 1 val allDocuments = for { h <- head.toList - ev <- h.getAs[BSONArray](EVENTS).toList + ev <- h.getAsOpt[BSONArray](EVENTS).toList doc <- ev.values.collect { case e: BSONDocument => e } } yield doc - allDocuments.map(_.getAs[Int](SER_ID).get) should contain theSameElementsInOrderAs records.map(_=> serializer.identifier) + allDocuments.map(_.getAsOpt[Int](SER_ID).get) should contain theSameElementsInOrderAs records.map(_=> serializer.identifier) () } } @@ -174,19 +174,19 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon val inserted: Future[(List[BSONDocument],Option[BSONDocument])] = for { _ <- underExtendedTest.batchAppend(ISeq(AtomicWrite(events))) journal <- drv.getJournal(persistenceId) - range <- journal.find(BSONDocument()).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) - head <- journal.find(BSONDocument()).cursor().headOption + range <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) + head <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor().headOption } yield (range, head) val (range, head) = await(inserted) range should have size 1 - val recone = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { + val recone = head.get.getAsOpt[BSONArray](EVENTS).toStream.flatMap(_.values.collect { case e: BSONDocument => e }).head - recone.getAs[String](PROCESSOR_ID) shouldBe Some(persistenceId) - recone.getAs[Long](SEQUENCE_NUMBER) shouldBe Some(10) - recone.getAs[String](TYPE) shouldBe Some("bson") - recone.getAs[BSONDocument](PayloadKey) shouldBe Some(BSONDocument("foo" -> "bar", "baz" -> 1)) + recone.getAsOpt[String](PROCESSOR_ID) shouldBe Some(persistenceId) + recone.getAsOpt[Long](SEQUENCE_NUMBER) shouldBe Some(10) + recone.getAsOpt[String](TYPE) shouldBe Some("bson") + recone.getAsOpt[BSONDocument](PayloadKey) shouldBe Some(BSONDocument("foo" -> "bar", "baz" -> 1)) () } } @@ -207,17 +207,17 @@ class RxMongoJournallerSpec extends TestKit(ActorSystem("unit-test")) with RxMon val inserted: Future[(List[BSONDocument],Option[BSONDocument])] = for { _ <- underExtendedTest.batchAppend(ISeq(AtomicWrite(events))) journal <- drv.getJournal(persistenceId) - range <- journal.find(BSONDocument()).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) - head <- journal.find(BSONDocument()).cursor().headOption + range <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor[BSONDocument]().collect[List](maxDocs = 2, err = Cursor.FailOnError[List[BSONDocument]]()) + head <- journal.find(BSONDocument.empty, Option.empty[BSONDocument]).cursor().headOption } yield (range, head) val (range, head) = await(inserted) range should have size 1 - val allDocuments = head.get.getAs[BSONArray](EVENTS).toStream.flatMap(_.values.collect { + val allDocuments = head.get.getAsOpt[BSONArray](EVENTS).toStream.flatMap(_.values.collect { case e: BSONDocument => e }) - allDocuments.map(_.getAs[Int](SER_ID).get).toList should contain theSameElementsInOrderAs records.map(_=> serializer.identifier) + allDocuments.map(_.getAsOpt[Int](SER_ID).get).toList should contain theSameElementsInOrderAs records.map(_=> serializer.identifier) () } } diff --git a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceSpec.scala b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceSpec.scala index f106d25a..62d0e81b 100644 --- a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceSpec.scala +++ b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceSpec.scala @@ -8,7 +8,7 @@ package akka.contrib.persistence.mongodb import akka.testkit.{TestKit, _} import com.typesafe.config.ConfigFactory -import reactivemongo.api.collections.bson.BSONCollection +import reactivemongo.api.bson.collection.BSONCollection import reactivemongo.api.{DefaultDB, FailoverStrategy} import scala.concurrent._ diff --git a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotterSpec.scala b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotterSpec.scala index 4ae1e36e..78b54c9b 100644 --- a/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotterSpec.scala +++ b/rxmongo/src/test/scala/akka/contrib/persistence/mongodb/RxMongoSnapshotterSpec.scala @@ -12,9 +12,9 @@ import akka.serialization.{Serialization, SerializationExtension} import akka.testkit._ import org.junit.runner.RunWith import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} -import org.scalatest.junit.JUnitRunner +import org.scalatestplus.junit.JUnitRunner import reactivemongo.api.Cursor -import reactivemongo.bson.BSONDocument +import reactivemongo.api.bson.BSONDocument import scala.concurrent._ import duration._ @@ -25,7 +25,8 @@ class RxMongoSnapshotterSpec extends TestKit(ActorSystem("unit-test")) with RxMo override def embedDB = "persistence-snapshotter-rxmongo" implicit val serialization: Serialization = SerializationExtension.get(system) - implicit val serializer = RxMongoSerializersExtension(system).RxMongoSnapshotSerialization + val serializers: RxMongoSerializers = RxMongoSerializersExtension(system) + import serializers._ implicit val as: ActorSystem = system val pid = "unit-test" @@ -35,7 +36,7 @@ class RxMongoSnapshotterSpec extends TestKit(ActorSystem("unit-test")) with RxMo val metadata = (1L to 10L).map(i => SnapshotMetadata("p-1", i, i)) val snapshots = metadata.map(SelectedSnapshot(_, "snapshot-1")) - val legacyDocs = snapshots.map(serializer.legacyWrite) + val legacyDocs = snapshots.map(serializers.LegacyRxMongoSnapshotSerialization.legacyWrite) Await.result(ss.insert(ordered = true).many(legacyDocs), 3.seconds.dilated).n should be(metadata.size) @@ -52,7 +53,7 @@ class RxMongoSnapshotterSpec extends TestKit(ActorSystem("unit-test")) with RxMo val metadata = (1L to 10L).map(i => SnapshotMetadata("p-2", i, i)) val snapshots = metadata.map(SelectedSnapshot(_, "snapshot-2")) - val legacyDocs = snapshots.map(serializer.legacyWrite) + val legacyDocs = snapshots.map(serializers.LegacyRxMongoSnapshotSerialization.legacyWrite) Await.result(ss.insert(ordered = true).many(legacyDocs), 3.seconds.dilated).n should be(metadata.size) @@ -75,8 +76,8 @@ class RxMongoSnapshotterSpec extends TestKit(ActorSystem("unit-test")) with RxMo val metadata = (1L to 10L).map(i => SnapshotMetadata("p-3", i, i)) val snapshots = metadata.map(SelectedSnapshot(_, "snapshot-3")) - val legacyDocs = snapshots.take(5).map(serializer.legacyWrite) - val newDocs = snapshots.drop(5).map(serializer.write) + val legacyDocs = snapshots.take(5).map(serializers.LegacyRxMongoSnapshotSerialization.legacyWrite) + val newDocs = snapshots.drop(5).flatMap(serializers.SelectedSnapshotWriter.writeOpt(_).toIndexedSeq) whenReady(ss.insert(ordered = true).many(legacyDocs ++ newDocs), PatienceConfiguration.Timeout(3.seconds.dilated)){ _.n shouldBe metadata.size @@ -98,8 +99,8 @@ class RxMongoSnapshotterSpec extends TestKit(ActorSystem("unit-test")) with RxMo val metadata = (1L to 10L).map(i => SnapshotMetadata("p-4", i, i)) val snapshots = metadata.map(SelectedSnapshot(_, "snapshot-4")) - val legacyDocs = snapshots.take(5).map(serializer.legacyWrite) - val newDocs = snapshots.drop(5).map(serializer.write) + val legacyDocs = snapshots.take(5).map(serializers.LegacyRxMongoSnapshotSerialization.legacyWrite) + val newDocs = snapshots.drop(5).flatMap(serializers.SelectedSnapshotWriter.writeOpt(_).toIndexedSeq) Await.result(ss.insert(ordered = true).many(legacyDocs ++ newDocs), 3.seconds.dilated).n should be(metadata.size) diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala index 9aa8f702..5eb29fb0 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala @@ -44,8 +44,7 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist override private[mongodb] def ensureCollection(name: String)(implicit ec: ExecutionContext): C = ensureCollection(name, db.createCollection) - private[this] def ensureCollection(name: String, collectionCreator: String => SingleObservable[Completed]) - (implicit ec: ExecutionContext): C = + private[this] def ensureCollection(name: String, collectionCreator: String => SingleObservable[Completed]): C = for { _ <- collectionCreator(name).toFuture().recover { case MongoErrors.NamespaceExists() => Completed } mongoCollection <- collection(name) @@ -80,11 +79,11 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist .toFuture() .map(stats => stats.get("capped").exists(_.asBoolean.getValue)) - private[mongodb] def getCollectionsAsFuture(collectionName: String)(implicit ec: ExecutionContext): Future[List[MongoCollection[D]]] = { + private[mongodb] def getCollectionsAsFuture(collectionName: String): Future[List[MongoCollection[D]]] = { getAllCollectionsAsFuture(Option(_.startsWith(collectionName))) } - private[mongodb] def getAllCollectionsAsFuture(nameFilter: Option[String => Boolean])(implicit ec: ExecutionContext): Future[List[MongoCollection[D]]] = { + private[mongodb] def getAllCollectionsAsFuture(nameFilter: Option[String => Boolean]): Future[List[MongoCollection[D]]] = { def excluded(name: String): Boolean = name == realtimeCollectionName || name == metadataCollectionName || @@ -99,18 +98,18 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist } yield xs.toList } - private[mongodb] def journalCollectionsAsFuture(implicit ec: ExecutionContext) = getCollectionsAsFuture(journalCollectionName) + private[mongodb] def journalCollectionsAsFuture = getCollectionsAsFuture(journalCollectionName) - private[mongodb] def snapshotCollectionsAsFuture(implicit ec: ExecutionContext) = getCollectionsAsFuture(snapsCollectionName) + private[mongodb] def snapshotCollectionsAsFuture = getCollectionsAsFuture(snapsCollectionName) - private[mongodb] def removeEmptyJournal(jnl: MongoCollection[D])(implicit ec: ExecutionContext): Future[Unit] = + private[mongodb] def removeEmptyJournal(jnl: MongoCollection[D]): Future[Unit] = removeEmptyCollection(jnl, journalIndexName) - private[mongodb] def removeEmptySnapshot(snp: MongoCollection[D])(implicit ec: ExecutionContext): Future[Unit] = + private[mongodb] def removeEmptySnapshot(snp: MongoCollection[D]): Future[Unit] = removeEmptyCollection(snp, snapsIndexName) private[this] var mongoVersion: Option[String] = None - private[this] def getMongoVersion(implicit ec: ExecutionContext): Future[String] = mongoVersion match { + private[this] def getMongoVersion: Future[String] = mongoVersion match { case Some(v) => Future.successful(v) case None => db.runCommand(BsonDocument("buildInfo" -> 1)).toFuture() @@ -121,7 +120,7 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist } } - private[this] def isMongoVersionAtLeast(inputNbs: Int*)(implicit ec: ExecutionContext): Future[Boolean] = + private[this] def isMongoVersionAtLeast(inputNbs: Int*): Future[Boolean] = getMongoVersion.map { case str if str.isEmpty => false case str => @@ -129,13 +128,13 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist inputNbs.zip(versionNbs).forall { case (i,v) => v >= i } } - private[this] def getLocalCount(collection: MongoCollection[D])(implicit ec: ExecutionContext): Future[Long] = { + private[this] def getLocalCount(collection: MongoCollection[D]): Future[Long] = { db.runCommand(BsonDocument("count" -> s"${collection.namespace.getCollectionName}", "readConcern" -> BsonDocument("level" -> "local"))) .toFuture() .map(_.getOrElse("n", 0L).asInt32().longValue()) } - private[this] def getIndexAsBson(collection: MongoCollection[D], indexName: String)(implicit ec: ExecutionContext): Future[Option[BsonDocument]] = + private[this] def getIndexAsBson(collection: MongoCollection[D], indexName: String): Future[Option[BsonDocument]] = for { indexList <- collection.listIndexes[BsonDocument]().toFuture() indexDoc = indexList.find(_.get("name").asString().getValue.equals(indexName)) @@ -145,7 +144,7 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist } } yield indexKey - private[this] def removeEmptyCollection(collection: MongoCollection[D], indexName: String)(implicit ec: ExecutionContext): Future[Unit] = + private[this] def removeEmptyCollection(collection: MongoCollection[D], indexName: String): Future[Unit] = for { b403 <- isMongoVersionAtLeast(4,0,3) // first count, may be inaccurate in cluster environment diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala index d2024dc5..e2b955a2 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceJournaller.scala @@ -36,7 +36,7 @@ class ScalaDriverPersistenceJournaller(val driver: ScalaMongoDriver) extends Mon private[this] def journal(implicit ec: ExecutionContext): driver.C = driver.journal.map(_.withWriteConcern(driver.journalWriteConcern)) - private[this] def realtime(implicit ec: ExecutionContext): driver.C = driver.realtime + private[this] def realtime: driver.C = driver.realtime private[this] def metadata(implicit ec: ExecutionContext): driver.C = driver.metadata.map(_.withWriteConcern(driver.metadataWriteConcern)) @@ -50,7 +50,7 @@ class ScalaDriverPersistenceJournaller(val driver: ScalaMongoDriver) extends Mon private[this] implicit val system: ActorSystem = driver.actorSystem private[this] implicit val materializer: Materializer = ActorMaterializer() - private[mongodb] def journalRange(pid: String, from: Long, to: Long, max: Int)(implicit ec: ExecutionContext) = { + private[mongodb] def journalRange(pid: String, from: Long, to: Long, max: Int) = { val journal = driver.getJournal(pid) val source = Source diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala index f53c9f49..e261cd4e 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceReadJournaller.scala @@ -7,7 +7,6 @@ import akka.stream.scaladsl._ import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} import akka.stream.{Attributes, Materializer, Outlet, SourceShape} import com.mongodb.CursorType -import com.mongodb.async.client.Subscription import org.bson.types.ObjectId import org.mongodb.scala._ import org.mongodb.scala.bson._ @@ -17,15 +16,13 @@ import org.mongodb.scala.model.Projections._ import org.mongodb.scala.model.Sorts._ import scala.collection.JavaConverters._ -import scala.collection.immutable.Seq import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.Try object CurrentAllEvents { - def source(driver: ScalaMongoDriver)(implicit m: Materializer): Source[Event, NotUsed] = { + def source(driver: ScalaMongoDriver): Source[Event, NotUsed] = { import driver.ScalaSerializers._ - implicit val ec: ExecutionContext = driver.querySideDispatcher Source.fromFuture(driver.journalCollectionsAsFuture) .flatMapConcat(_.map( @@ -46,9 +43,7 @@ object CurrentAllEvents { } object CurrentPersistenceIds { - def source(driver: ScalaMongoDriver)(implicit m: Materializer): Source[String, NotUsed] = { - implicit val ec: ExecutionContext = driver.querySideDispatcher - + def source(driver: ScalaMongoDriver): Source[String, NotUsed] = { Source.fromFuture(driver.journalCollectionsAsFuture) .mapConcat(identity) .flatMapConcat(_.aggregate(project(include(PROCESSOR_ID)) :: group(s"$$$PROCESSOR_ID") :: Nil).asAkka) @@ -65,9 +60,8 @@ object CurrentEventsByPersistenceId { lte(FROM, toSeq) ) - def source(driver: ScalaMongoDriver, persistenceId: String, fromSeq: Long, toSeq: Long)(implicit m: Materializer): Source[Event, NotUsed] = { + def source(driver: ScalaMongoDriver, persistenceId: String, fromSeq: Long, toSeq: Long): Source[Event, NotUsed] = { import driver.ScalaSerializers._ - implicit val ec: ExecutionContext = driver.querySideDispatcher val query = queryFor(persistenceId, fromSeq, toSeq) @@ -91,9 +85,8 @@ object CurrentEventsByPersistenceId { } object CurrentEventsByTag { - def source(driver: ScalaMongoDriver, tag: String, fromOffset: Offset)(implicit m: Materializer): Source[(Event, Offset), NotUsed] = { + def source(driver: ScalaMongoDriver, tag: String, fromOffset: Offset): Source[(Event, Offset), NotUsed] = { import driver.ScalaSerializers._ - implicit val ec: ExecutionContext = driver.querySideDispatcher val offset = fromOffset match { case NoOffset => None @@ -216,7 +209,7 @@ class ScalaDriverRealtimeGraphStage(driver: ScalaMongoDriver, bufsz: Int = 16)(f } -class ScalaDriverJournalStream(driver: ScalaMongoDriver)(implicit m: Materializer) extends JournalStream[Source[(Event, Offset), NotUsed]] { +class ScalaDriverJournalStream(driver: ScalaMongoDriver) extends JournalStream[Source[(Event, Offset), NotUsed]] { import driver.ScalaSerializers._ implicit val ec: ExecutionContext = driver.querySideDispatcher @@ -258,7 +251,7 @@ class ScalaDriverJournalStream(driver: ScalaMongoDriver)(implicit m: Materialize class ScalaDriverPersistenceReadJournaller(driver: ScalaMongoDriver, m: Materializer) extends MongoPersistenceReadJournallingApi { val journalStream: ScalaDriverJournalStream = { - val stream = new ScalaDriverJournalStream(driver)(m) + val stream = new ScalaDriverJournalStream(driver) driver.actorSystem.registerOnTermination( stream.stopAllStreams() ) stream } diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala index d45d7d9e..f4afe874 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala @@ -50,6 +50,7 @@ class ScalaDriverSettings(config: Config) extends OfficialDriverSettings(config) override def apply(t: ClusterSettings.Builder): Unit = { t.serverSelectionTimeout(getLongQueryProperty("serverselectiontimeoutms").getOrElse(ServerSelectionTimeout.toMillis), TimeUnit.MILLISECONDS) .maxWaitQueueSize(getIntQueryProperty("waitqueuemultiple").getOrElse(ThreadsAllowedToBlockforConnectionMultiplier) * getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost)) + () } } ).applyToConnectionPoolSettings(new Block[ConnectionPoolSettings.Builder]{ @@ -59,24 +60,28 @@ class ScalaDriverSettings(config: Config) extends OfficialDriverSettings(config) .maxConnectionLifeTime(getLongQueryProperty("maxlifetimems").getOrElse(MaxConnectionLifeTime.toMillis), TimeUnit.MILLISECONDS) .minSize(getIntQueryProperty("minpoolsize").getOrElse(MinConnectionsPerHost)) .maxSize(getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost)) + () } } ).applyToServerSettings(new Block[ServerSettings.Builder]{ override def apply(t: ServerSettings.Builder): Unit = { t.heartbeatFrequency(getLongQueryProperty("heartbeatfrequencyms").getOrElse(HeartbeatFrequency.toMillis), TimeUnit.MILLISECONDS) .minHeartbeatFrequency(MinHeartbeatFrequency.toMillis, TimeUnit.MILLISECONDS) // no 'minHeartbeatFrequency' in ConnectionString + () } } ).applyToSocketSettings(new Block[SocketSettings.Builder] { override def apply(t: SocketSettings.Builder): Unit = { t.connectTimeout(getLongQueryProperty("connecttimeoutms").getOrElse(ConnectTimeout.toMillis).toIntWithoutWrapping, TimeUnit.MILLISECONDS) .readTimeout(getLongQueryProperty("sockettimeoutms").getOrElse(SocketTimeout.toMillis).toIntWithoutWrapping, TimeUnit.MILLISECONDS) + () } } ).applyToSslSettings(new Block[SslSettings.Builder]{ override def apply(t: SslSettings.Builder): Unit = { t.enabled(getBooleanQueryProperty("ssl").getOrElse(SslEnabled)) .invalidHostNameAllowed(getBooleanQueryProperty("sslinvalidhostnameallowed").getOrElse(SslInvalidHostNameAllowed)) + () } } )