Skip to content

Commit

Permalink
Clean up outstanding warnings, deprecations, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
scullxbones committed Dec 22, 2019
1 parent 268abd4 commit 388254f
Show file tree
Hide file tree
Showing 18 changed files with 230 additions and 213 deletions.
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
val releaseV = "2.3.1"

val scala211V = "2.11.12"
val scala212V = "2.12.9"
val scala213V = "2.13.0"
val scala212V = "2.12.10"
val scala213V = "2.13.1"

val scalaV = scala211V

Expand Down Expand Up @@ -85,8 +85,8 @@ val commonSettings = Seq(
"-Xlint"
),
resolvers ++= Seq(
"Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
"Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/",
"Typesafe Releases" at "https://repo.typesafe.com/typesafe/releases/",
"Typesafe Snapshots" at "https://repo.typesafe.com/typesafe/snapshots/",
"Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
),
parallelExecution in Test := true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import akka.serialization.{Serialization, SerializerWithStringManifest}

import scala.collection.immutable.{Seq => ISeq}
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

sealed trait Payload {
Expand Down Expand Up @@ -45,7 +44,7 @@ case class Serialized[C <: AnyRef](bytes: Array[Byte],
className: String,
tags: Set[String],
serializerId: Option[Int],
serializedManifest: Option[String])(implicit ser: Serialization, loadClass: LoadClass, ct: ClassTag[C]) extends Payload {
serializedManifest: Option[String])(implicit ser: Serialization, loadClass: LoadClass) extends Payload {
type Content = C

val hint = "ser"
Expand Down Expand Up @@ -150,7 +149,7 @@ object Payload {

import language.implicitConversions

implicit def bson2payload[D](document: D)(implicit ev: Manifest[D], dt: DocumentType[D]): Bson[D] = Bson(document, Set.empty[String])
implicit def bson2payload[D](document: D)(implicit dt: DocumentType[D]): Bson[D] = Bson(document, Set.empty[String])

implicit def str2payload(string: String): StringPayload = StringPayload(string, Set.empty[String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)
* Convenient methods to retrieve EXISTING journal collection from persistenceId.
* CAUTION: this method does NOT create the journal and its indexes.
*/
private[mongodb] def getJournal(persistenceId: String)(implicit ec: ExecutionContext): C = collection(getJournalCollectionName(persistenceId))
private[mongodb] def getJournal(persistenceId: String): C = collection(getJournalCollectionName(persistenceId))

/**
* Convenient methods to retrieve EXISTING snapshot collection from persistenceId.
* CAUTION: this method does NOT create the snapshot and its indexes.
*/
private[mongodb] def getSnaps(persistenceId: String)(implicit ec: ExecutionContext): C = collection(getSnapsCollectionName(persistenceId))
private[mongodb] def getSnaps(persistenceId: String): C = collection(getSnapsCollectionName(persistenceId))

private[mongodb] lazy val indexes: Seq[IndexSettings] = Seq(
IndexSettings(journalIndexName, unique = true, sparse = false, JournallingFieldNames.PROCESSOR_ID -> 1, FROM -> 1, TO -> 1),
Expand All @@ -179,7 +179,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)

private[this] val journalCache = MongoCollectionCache[C](settings.CollectionCache, "journal", actorSystem)

private[mongodb] def journal(implicit ec: ExecutionContext): C = journal("")
private[mongodb] def journal: C = journal("")

private[mongodb] def journal(persistenceId: String)(implicit ec: ExecutionContext): C = {
val collectionName = getJournalCollectionName(persistenceId)
Expand All @@ -201,7 +201,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)

private[this] val snapsCache = MongoCollectionCache[C](settings.CollectionCache, "snaps", actorSystem)

private[mongodb] def snaps(implicit ec: ExecutionContext): C = snaps("")
private[mongodb] def snaps: C = snaps("")

private[mongodb] def snaps(persistenceId: String)(implicit ec: ExecutionContext): C = {
val collectionName = getSnapsCollectionName(persistenceId)
Expand All @@ -222,7 +222,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)

private[this] val realtimeCache = MongoCollectionCache[C](settings.CollectionCache, "realtime", actorSystem)

private[mongodb] def realtime(implicit ec: ExecutionContext): C =
private[mongodb] def realtime: C =
realtimeCache.getOrElseCreate(realtimeCollectionName, collectionName => cappedCollection(collectionName))

private[mongodb] val querySideDispatcher = actorSystem.dispatchers.lookup("akka-contrib-persistence-query-dispatcher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class ScalaDslMongoReadJournal(impl: MongoPersistenceReadJournallingApi)(implici
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
require(tag != null, "Tag must not be null")
require(impl.checkOffsetIsSupported(offset), s"Offset $offset is not supported by read journal")
implicit val ordering: Ordering[Offset] = implicitly[Ordering[Offset]]
val pastSource =
impl.currentEventsByTag(tag, offset)
.toEventEnvelopes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package akka.contrib.persistence.mongodb

import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigException}
import com.typesafe.config.Config

import scala.concurrent.ExecutionContextExecutor
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

abstract class WithMongoPersistencePluginDispatcher(actorSystem: ActorSystem, config: Config) {
Expand All @@ -12,9 +13,9 @@ abstract class WithMongoPersistencePluginDispatcher(actorSystem: ActorSystem, co
Try(actorSystem.dispatchers.lookup(config.getString("plugin-dispatcher"))) match {
case Success(configuredPluginDispatcher) =>
configuredPluginDispatcher
case Failure(_ : ConfigException) =>
actorSystem.log.warning("plugin-dispatcher not configured for akka-contrib-mongodb-persistence. " +
"Using actor system dispatcher.")
case Failure(NonFatal(_)) =>
actorSystem.log.warning("plugin-dispatcher not configured for akka-contrib-mongodb-persistence. Using actor system dispatcher.")
actorSystem.dispatcher
case Failure(t) => throw t
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ package object mongodb {
}

implicit class OffsetWithObjectIdToo(val offsets: Offset.type) extends AnyVal {
def objectId(hexStr: String, time: Long) =
def objectId(hexStr: String, time: Long): ObjectIdOffset =
ObjectIdOffset(hexStr, time)
}

Expand All @@ -49,6 +49,9 @@ package object mongodb {
case (a:ObjectIdOffset, b:ObjectIdOffset) => a compareTo b
case (_:ObjectIdOffset, _) => 0 // Can't compare
case (_, _:ObjectIdOffset) => 0 // Can't compare
case _ =>
// Per j.u.Comparator contract
throw new ClassCastException(s"Unsupported offset types ${x.getClass.getName} ${y.getClass.getName}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer}
import org.slf4j.{Logger, LoggerFactory}
import reactivemongo.akkastream._
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.bson.collection.BSONCollection
import reactivemongo.api.commands.{LastError, WriteResult}
import reactivemongo.bson.{BSONDocument, _}
import reactivemongo.api.bson.{BSONDocument, _}

import scala.collection.immutable.Seq
import scala.concurrent._
Expand All @@ -37,10 +37,12 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn

private[this] def metadata(implicit ec: ExecutionContext) = driver.metadata

private[this] def journalRangeQuery(pid: String, from: Long, to: Long) = BSONDocument(
PROCESSOR_ID -> pid,
TO -> BSONDocument("$gte" -> from),
FROM -> BSONDocument("$lte" -> to))
private[this] def journalRangeQuery(pid: String, from: Long, to: Long) =
BSONDocument(
PROCESSOR_ID -> pid,
TO -> BSONDocument("$gte" -> from),
FROM -> BSONDocument("$lte" -> to)
)

private[this] implicit val system: ActorSystem = driver.actorSystem
private[this] implicit val materializer: Materializer = ActorMaterializer()
Expand All @@ -60,7 +62,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn
)

val flow = Flow[BSONDocument]
.mapConcat(_.getAs[BSONArray](EVENTS).map(_.values.collect {
.mapConcat(_.getAsOpt[BSONArray](EVENTS).map(_.values.collect {
case d: BSONDocument => driver.deserializeJournal(d)
}).getOrElse(Stream.empty[Event]))
.filter(_.sn >= from)
Expand Down Expand Up @@ -129,7 +131,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn

private[this] def findMaxSequence(persistenceId: String, maxSequenceNr: Long)(implicit ec: ExecutionContext): Future[Option[Long]] = {
def performAggregation(j: BSONCollection): Future[Option[Long]] = {
import j.BatchCommands.AggregationFramework.{GroupField, Match, MaxField}
import j.aggregationFramework.{GroupField, Match, MaxField}

j.aggregatorContext[BSONDocument](
Match(BSONDocument(PROCESSOR_ID -> persistenceId, TO -> BSONDocument("$lte" -> maxSequenceNr))),
Expand All @@ -138,7 +140,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn
).prepared
.cursor
.headOption
.map(_.flatMap(_.getAs[Long]("max")))
.map(_.flatMap(_.getAsOpt[Long]("max")))
}

for {
Expand Down Expand Up @@ -214,7 +216,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn
metadata.flatMap(_.find(BSONDocument(PROCESSOR_ID -> pid), Option(BSONDocument(MAX_SN -> 1)))
.cursor[BSONDocument]()
.headOption
.map(d => d.flatMap(_.getAs[Long](MAX_SN)))))(l => Future.successful(Option(l)))
.map(d => d.flatMap(_.getAsOpt[Long](MAX_SN)))))(l => Future.successful(Option(l)))
}

private[mongodb] override def maxSequenceNr(pid: String, from: Long)(implicit ec: ExecutionContext): Future[Long] = {
Expand All @@ -223,7 +225,7 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn
.sort(BSONDocument(TO -> -1))
.cursor[BSONDocument]()
.headOption
.map(d => d.flatMap(_.getAs[Long](TO)))
.map(d => d.flatMap(_.getAsOpt[Long](TO)))
.flatMap(maxSequenceFromMetadata(pid))
.map(_.getOrElse(0L)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}
import reactivemongo.api._
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.bson.collection.{BSONCollection, BSONSerializationPack}
import reactivemongo.api.commands.{Command, CommandError, WriteConcern}
import reactivemongo.api.indexes.{Index, IndexType}
import reactivemongo.bson.{BSONDocument, _}
import reactivemongo.api.bson.{BSONDocument, _}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -35,9 +35,9 @@ object RxMongoPersistenceDriver {
}

class RxMongoDriverProvider(actorSystem: ActorSystem) {
val driver: MongoDriver = {
val md = MongoDriver()
actorSystem.registerOnTermination(driver.close())
val driver: AsyncDriver = {
val md = AsyncDriver()
actorSystem.registerOnTermination(driver.close()(actorSystem.dispatcher))
md
}
}
Expand All @@ -62,10 +62,11 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo
implicit val waitFor: FiniteDuration = 10.seconds

private[mongodb] lazy val connection: Future[MongoConnection] =
Future.fromTry(driver.connection(parsedMongoUri, strictUri = false))
driver.connect(parsedMongoUri)

private[mongodb] def closeConnections(): Unit = {
driver.close(5.seconds)
()
}

private[mongodb] def dbName: String = databaseName.getOrElse(parsedMongoUri.db.getOrElse(DEFAULT_DB_NAME))
Expand Down Expand Up @@ -101,12 +102,17 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo

private[mongodb] override def ensureIndex(indexName: String, unique: Boolean, sparse: Boolean, keys: (String, Int)*)(implicit ec: ExecutionContext) = { collection =>
val ky = keys.toSeq.map { case (f, o) => f -> (if (o > 0) IndexType.Ascending else IndexType.Descending) }
collection.flatMap(c => c.indexesManager.ensure(Index(
collection.flatMap(c => c.indexesManager.ensure(Index(BSONSerializationPack)(
key = ky,
background = true,
unique = unique,
sparse = sparse,
name = Some(indexName))).map(_ => c))
name = Some(indexName),
dropDups = true,
version = None,
partialFilter = None,
options = BSONDocument.empty
)).map(_ => c))
}

override private[mongodb] def cappedCollection(name: String)(implicit ec: ExecutionContext) =
Expand Down Expand Up @@ -156,7 +162,7 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo
val runner = Command.run(BSONSerializationPack, FailoverStrategy())
runner.apply(database, runner.rawCommand(BSONDocument("buildInfo" -> 1)))
.one[BSONDocument](ReadPreference.Primary)
.map(_.getAs[BSONString]("version").getOrElse(BSONString("")).value)
.map(_.getAsOpt[BSONString]("version").getOrElse(BSONString("")).value)
.map { v =>
mongoVersion = Some(v)
v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import reactivemongo.akkastream._
import reactivemongo.api.QueryOpts
import reactivemongo.bson._
import reactivemongo.api.bson._

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
Expand All @@ -31,10 +31,10 @@ object CurrentAllEvents {
.cursor[BSONDocument]()
.documentSource()
.map { doc =>
doc.getAs[BSONArray](EVENTS)
.map(_.elements
.map(_.value)
.collect{ case d:BSONDocument => driver.deserializeJournal(d) })
doc.getAsOpt[BSONArray](EVENTS)
.map(_.values.collect{
case d:BSONDocument => driver.deserializeJournal(d)
})
.getOrElse(Nil)
}.mapConcat(identity)
}.reduceLeftOption(_ concat _)
Expand All @@ -50,7 +50,7 @@ object CurrentPersistenceIds {
Source.fromFuture(for {
collections <- driver.journalCollectionsAsFuture
tmpNames <- Future.sequence(collections.zipWithIndex.map { case (c,idx) =>
import c.BatchCommands.AggregationFramework.{Group, Out, Project}
import c.aggregationFramework.{Group, Out, Project}
val nameWithIndex = s"$temporaryCollectionName-$idx"
c.aggregatorContext[BSONDocument](
Project(BSONDocument(PROCESSOR_ID -> 1)),
Expand All @@ -66,7 +66,7 @@ object CurrentPersistenceIds {
tmps <- Future.sequence(tmpNames.map(driver.collection))
} yield tmps )
.flatMapConcat(cols => cols.map(_.find(BSONDocument(), Option.empty[BSONDocument]).cursor[BSONDocument]().documentSource()).reduce(_ ++ _))
.mapConcat(_.getAs[String]("_id").toList)
.mapConcat(_.getAsOpt[String]("_id").toList)
.alsoTo(Sink.onComplete{ _ =>
driver
.getCollectionsAsFuture(temporaryCollectionName)
Expand All @@ -78,7 +78,7 @@ object CurrentPersistenceIds {
}

object CurrentEventsByPersistenceId {
def queryFor(persistenceId: String, fromSeq: Long, toSeq: Long) = BSONDocument(
def queryFor(persistenceId: String, fromSeq: Long, toSeq: Long): BSONDocument = BSONDocument(
PROCESSOR_ID -> persistenceId,
TO -> BSONDocument("$gte" -> fromSeq),
FROM -> BSONDocument("$lte" -> toSeq)
Expand All @@ -97,10 +97,10 @@ object CurrentEventsByPersistenceId {
.cursor[BSONDocument]()
.documentSource()
).map( doc =>
doc.getAs[BSONArray](EVENTS)
.map(_.elements
.map(_.value)
.collect{ case d:BSONDocument => driver.deserializeJournal(d) })
doc.getAsOpt[BSONArray](EVENTS)
.map(_.values.collect{
case d:BSONDocument => driver.deserializeJournal(d)
})
.getOrElse(Nil)
).mapConcat(identity)
}
Expand All @@ -117,7 +117,7 @@ object CurrentEventsByTag {
}
val query = BSONDocument(
TAGS -> tag
).merge(offset.fold(BSONDocument.empty)(id => BSONDocument(ID -> BSONDocument("$gt" -> id))))
) ++ offset.fold(BSONDocument.empty)(id => BSONDocument(ID -> BSONDocument("$gt" -> id)))

Source.fromFuture(driver.journalCollectionsAsFuture)
.flatMapConcat{ xs =>
Expand All @@ -129,13 +129,11 @@ object CurrentEventsByTag {
).reduceLeftOption(_ ++ _)
.getOrElse(Source.empty)
}.map{ doc =>
val id = doc.getAs[BSONObjectID](ID).get
doc.getAs[BSONArray](EVENTS)
.map(_.elements
.map(_.value)
.collect{ case d:BSONDocument => driver.deserializeJournal(d) -> ObjectIdOffset(id.stringify, id.time) }
.filter(_._1.tags.contains(tag))
)
val id = doc.getAsOpt[BSONObjectID](ID).get
doc.getAsOpt[BSONArray](EVENTS)
.map(_.values.collect{
case d:BSONDocument => driver.deserializeJournal(d) -> ObjectIdOffset(id.stringify, id.time)
}.filter(_._1.tags.contains(tag)))
.getOrElse(Nil)
}.mapConcat(identity)
}
Expand Down Expand Up @@ -171,7 +169,7 @@ class RxMongoRealtimeGraphStage(driver: RxMongoDriver, bufsz: Int = 16)(factory:
}
else
buffer = buffer ::: List(doc)
lastId = doc.getAs[BSONObjectID]("_id")
lastId = doc.getAsOpt[BSONObjectID]("_id")
}

private def errAc = getAsyncCallback[Throwable](failStage)
Expand Down Expand Up @@ -249,8 +247,8 @@ class RxMongoJournalStream(driver: RxMongoDriver)(implicit m: Materializer) exte
)
.via(killSwitch.flow)
.mapConcat { d =>
val id = d.getAs[BSONObjectID](ID).get
d.getAs[BSONArray](EVENTS).map(_.elements.map(e => e.value).collect {
val id = d.getAsOpt[BSONObjectID](ID).get
d.getAsOpt[BSONArray](EVENTS).map(_.values.collect {
case d: BSONDocument => driver.deserializeJournal(d) -> ObjectIdOffset(id.stringify, id.time)
}).getOrElse(Nil)
}
Expand Down
Loading

0 comments on commit 388254f

Please sign in to comment.