diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 31d6b17e7..4ffbdced1 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -63,10 +63,13 @@ jobs: with: jvm: temurin:1.11.0 - - name: Compile all code with fatal warnings for Java 11, Scala 2.13 + - name: Compile all code with fatal warnings for Java 11 and Scala 2.13 # Run locally with: sbt 'clean ; +Test/compile ; +It/compile' run: sbt "; Test/compile" + - name: Compile all code with fatal warnings for Java 11 and Scala 3.3 + run: sbt "++3.3; Test/compile" + check-docs: name: Check Docs runs-on: ubuntu-22.04 diff --git a/build.sbt b/build.sbt index 21f479985..f0abaecc4 100644 --- a/build.sbt +++ b/build.sbt @@ -18,9 +18,15 @@ lazy val core = project name := "akka-persistence-jdbc", libraryDependencies ++= Dependencies.Libraries, mimaReportSignatureProblems := true, - mimaPreviousArtifacts := Set( - organization.value %% name.value % previousStableVersion.value.getOrElse( - throw new Error("Unable to determine previous version for MiMa")))) + mimaPreviousArtifacts := { + if (scalaVersion.value.startsWith("3")) { + Set.empty + } else { + Set( + organization.value %% name.value % previousStableVersion.value.getOrElse( + throw new Error("Unable to determine previous version for MiMa"))) + } + }) lazy val integration = project .in(file("integration")) diff --git a/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes new file mode 100644 index 000000000..72a5768e5 --- /dev/null +++ b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes @@ -0,0 +1,43 @@ +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.unapply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.tupled") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.curried") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.database") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy$default$1") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.this") + +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.LazySlickDatabase.database") + +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.forConfig") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.database") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.db.SlickDatabase.database") + +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.JdbcAsyncWriteJournal.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.snapshot.JdbcSnapshotStore.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.state.JdbcDurableStateStoreProvider.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.this") + +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.receive") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.receive$default$4") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.findGaps") + + + + diff --git a/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala b/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala index c0ea00411..1a2300944 100644 --- a/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala +++ b/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala @@ -6,6 +6,8 @@ package akka.persistence.jdbc.db import akka.actor.ActorSystem +import akka.annotation.InternalApi + import javax.naming.InitialContext import akka.persistence.jdbc.config.SlickConfiguration import com.typesafe.config.Config @@ -82,6 +84,7 @@ trait SlickDatabase { def allowShutdown: Boolean } +@InternalApi case class EagerSlickDatabase(database: Database, profile: JdbcProfile) extends SlickDatabase { override def allowShutdown: Boolean = true } @@ -90,6 +93,7 @@ case class EagerSlickDatabase(database: Database, profile: JdbcProfile) extends * A LazySlickDatabase lazily initializes a database, it also manages the shutdown of the database * @param config The configuration used to create the database */ +@InternalApi class LazySlickDatabase(config: Config, system: ActorSystem) extends SlickDatabase { val profile: JdbcProfile = SlickDatabase.profile(config, path = "") diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala index b6b32cc27..3b816a12d 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala @@ -64,7 +64,7 @@ trait JournalTables { eventSerManifest, metaPayload, metaSerId, - metaSerManifest) <> (JournalAkkaSerializationRow.tupled, JournalAkkaSerializationRow.unapply) + metaSerManifest) <> ((JournalAkkaSerializationRow.apply _).tupled, JournalAkkaSerializationRow.unapply) val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc) val persistenceId: Rep[String] = @@ -91,17 +91,17 @@ trait JournalTables { lazy val JournalTable = new TableQuery(tag => new JournalEvents(tag)) class EventTags(_tableTag: Tag) extends Table[TagRow](_tableTag, tagTableCfg.schemaName, tagTableCfg.tableName) { - override def * = (eventId, persistenceId, sequenceNumber, tag) <> (TagRow.tupled, TagRow.unapply) + override def * = (eventId, persistenceId, sequenceNumber, tag) <> ((TagRow.apply _).tupled, TagRow.unapply) // allow null value insert. - val eventId: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.eventId) - val persistenceId: Rep[Option[String]] = column[String](tagTableCfg.columnNames.persistenceId) - val sequenceNumber: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.sequenceNumber) + val eventId: Rep[Option[Long]] = column[Option[Long]](tagTableCfg.columnNames.eventId) + val persistenceId: Rep[Option[String]] = column[Option[String]](tagTableCfg.columnNames.persistenceId) + val sequenceNumber: Rep[Option[Long]] = column[Option[Long]](tagTableCfg.columnNames.sequenceNumber) val tag: Rep[String] = column[String](tagTableCfg.columnNames.tag) val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (persistenceId, sequenceNumber, tag)) val journalEvent = foreignKey(s"fk_${journalTableCfg.tableName}", (persistenceId, sequenceNumber), JournalTable)(e => - (e.persistenceId, e.sequenceNumber)) + (Rep.Some(e.persistenceId), Rep.Some(e.sequenceNumber))) } lazy val TagTable = new TableQuery(tag => new EventTags(tag)) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index 5a4591792..6d88716dd 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -28,7 +28,8 @@ class ByteArrayJournalDao( serialization: Serialization)(implicit val ec: ExecutionContext, val mat: Materializer) extends BaseByteArrayJournalDao { val queries = new JournalQueries(profile, journalConfig.journalTableConfiguration) - val serializer = new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator) + val serializer: ByteArrayJournalSerializer = + new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator) } /** diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala index 16a507c8b..1efd76024 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala @@ -19,7 +19,13 @@ trait JournalTables { _tableTag, _schemaName = journalTableCfg.schemaName, _tableName = journalTableCfg.tableName) { - def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> (JournalRow.tupled, JournalRow.unapply) + def * = ( + ordering, + deleted, + persistenceId, + sequenceNumber, + message, + tags) <> ((JournalRow.apply _).tupled, JournalRow.unapply) val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc) val persistenceId: Rep[String] = diff --git a/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala b/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala index dddb99ad9..800aec84e 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala @@ -11,7 +11,8 @@ import com.typesafe.config.Config class JdbcReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String) extends ReadJournalProvider { - override val scaladslReadJournal = new scaladsl.JdbcReadJournal(config, configPath)(system) + override def scaladslReadJournal(): scaladsl.JdbcReadJournal = + new scaladsl.JdbcReadJournal(config, configPath)(system) - override val javadslReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal) + override def javadslReadJournal(): javadsl.JdbcReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal()) } diff --git a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala index 370de6708..390902a1d 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala @@ -84,7 +84,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen * @param moduloCounter A counter which is incremented every time a new query have been executed, modulo `maxTries` * @param previousDelay The last used delay (may change in case failures occur) */ - def receive( + private def receive( currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], moduloCounter: Int, @@ -129,7 +129,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen /** * This method that implements the "find gaps" algo. It's the meat and main purpose of this actor. */ - def findGaps( + private def findGaps( elements: Seq[OrderingId], currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala index b6fc76209..129f9b3b5 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala @@ -112,7 +112,8 @@ trait OracleReadJournalDao extends ReadJournalDao { } } - implicit val getJournalRow = GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<)) + implicit val getJournalRow: GetResult[JournalRow] = + GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<)) abstract override def eventsByTag( tag: String, @@ -155,5 +156,6 @@ class ByteArrayReadJournalDao( extends BaseByteArrayReadJournalDao with OracleReadJournalDao { val queries = new ReadJournalQueries(profile, readJournalConfig) - val serializer = new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator) + val serializer: ByteArrayJournalSerializer = + new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator) } diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala index e2d0b53f0..7bb4378f1 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala @@ -21,6 +21,7 @@ object SnapshotTables { metaSerId: Option[Int], metaSerManifest: Option[String], metaPayload: Option[Array[Byte]]) + } trait SnapshotTables { @@ -43,7 +44,7 @@ trait SnapshotTables { snapshotPayload, metaSerId, metaSerManifest, - metaPayload) <> (SnapshotRow.tupled, SnapshotRow.unapply) + metaPayload) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply) val persistenceId: Rep[String] = column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true)) diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala index 87fde25ff..16d36e379 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala @@ -31,7 +31,7 @@ trait SnapshotTables { _tableTag, _schemaName = snapshotTableCfg.schemaName, _tableName = snapshotTableCfg.tableName) { - def * = (persistenceId, sequenceNumber, created, snapshot) <> (SnapshotRow.tupled, SnapshotRow.unapply) + def * = (persistenceId, sequenceNumber, created, snapshot) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply) val persistenceId: Rep[String] = column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true)) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala index c4bb6a034..6a98adad2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala @@ -42,7 +42,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration case _ => ??? } - implicit val uuidSetter = SetParameter[Array[Byte]] { case (bytes, params) => + implicit val uuidSetter: SetParameter[Array[Byte]] = SetParameter[Array[Byte]] { case (bytes, params) => params.setBytes(bytes) } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala index d76b57e24..e6ef2afaa 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala @@ -41,7 +41,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration def * = (globalOffset, persistenceId, revision, statePayload, tag, stateSerId, stateSerManifest, stateTimestamp) - .<>(DurableStateRow.tupled, DurableStateRow.unapply) + .<>((DurableStateRow.apply _).tupled, DurableStateRow.unapply) val globalOffset: Rep[Long] = column[Long](durableStateTableCfg.columnNames.globalOffset, O.AutoInc) val persistenceId: Rep[String] = diff --git a/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala b/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala index 5c961de8e..bf664d7a2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala @@ -33,10 +33,10 @@ class JdbcDurableStateStoreProvider[A](system: ExtendedActorSystem) extends Dura lazy val serialization = SerializationExtension(system) val profile: JdbcProfile = slickDb.profile - override val scaladslDurableStateStore: DurableStateStore[Any] = + override def scaladslDurableStateStore(): DurableStateStore[Any] = new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig, serialization)(system) - override val javadslDurableStateStore: JDurableStateStore[AnyRef] = + override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = new javadsl.JdbcDurableStateStore[AnyRef]( profile, durableStateConfig, diff --git a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala index fe91acb29..9b41bed37 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala @@ -46,7 +46,7 @@ import akka.annotation.InternalApi * Efficient representation of missing elements using NumericRanges. * It can be seen as a collection of GlobalOffset */ - private case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) { + case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) { def addRange(from: GlobalOffset, until: GlobalOffset): MissingElements = { val newRange = from.until(until) MissingElements(elements :+ newRange) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala index fc51b8cce..ea14cffbd 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala @@ -25,7 +25,6 @@ import akka.serialization.Serialization import akka.stream.scaladsl.{ Sink, Source } import akka.stream.{ Materializer, SystemMaterializer } import akka.util.Timeout -import DurableStateSequenceActor._ import OffsetSyntax._ import akka.annotation.ApiMayChange import akka.persistence.query.UpdatedDurableState @@ -45,6 +44,7 @@ class JdbcDurableStateStore[A]( serialization: Serialization)(implicit val system: ExtendedActorSystem) extends DurableStateUpdateStore[A] with DurableStateStoreQuery[A] { + import DurableStateSequenceActor._ import FlowControl._ import profile.api._ @@ -211,20 +211,16 @@ class JdbcDurableStateStore[A]( } private def updateDurableState(row: DurableStateTables.DurableStateRow) = { - import queries._ - for { - s <- getSequenceNextValueExpr() - u <- updateDbWithDurableState(row, s.head) + s <- queries.getSequenceNextValueExpr() + u <- queries.updateDbWithDurableState(row, s.head) } yield u } private def insertDurableState(row: DurableStateTables.DurableStateRow) = { - import queries._ - for { - s <- getSequenceNextValueExpr() - u <- insertDbWithDurableState(row, s.head) + s <- queries.getSequenceNextValueExpr() + u <- queries.insertDbWithDurableState(row, s.head) } yield u } diff --git a/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala b/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala index 225a026e2..96c531c12 100644 --- a/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala @@ -28,7 +28,7 @@ abstract class SharedActorSystemTestSpec(val config: Config) extends SimpleSpec implicit lazy val ec: ExecutionContext = system.dispatcher implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute) - implicit val timeout = Timeout(1.minute) + implicit val timeout: Timeout = Timeout(1.minute) lazy val serialization = SerializationExtension(system) diff --git a/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala b/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala index 5ddac2aa2..133b9cf44 100644 --- a/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala @@ -26,7 +26,7 @@ abstract class SingleActorSystemPerTestSpec(val config: Config) conf.withValue(path, configValue) }) - implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 1.minute) implicit val timeout: Timeout = Timeout(1.minute) val cfg = config.getConfig("jdbc-journal") diff --git a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala index a14f47c40..bbaed3024 100644 --- a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala @@ -18,6 +18,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType) @@ -29,7 +30,7 @@ abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType) with DropCreate { override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true - implicit lazy val ec = system.dispatcher + implicit lazy val ec: ExecutionContext = system.dispatcher implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes) diff --git a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala index b129a9885..7c01be9ba 100644 --- a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala @@ -15,6 +15,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType) @@ -28,7 +29,7 @@ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType) implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds) - implicit lazy val ec = system.dispatcher + implicit lazy val ec: ExecutionContext = system.dispatcher lazy val cfg = system.settings.config.getConfig("jdbc-journal") diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index f2389dd20..c8337c3a6 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -193,7 +193,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp => // The stream must complete within the given amount of time // This make take a while in case the journal sequence actor detects gaps - val allEvents = tp.toStrict(atMost = 20.seconds) + val allEvents = tp.toStrict(atMost = 40.seconds) allEvents.size should be >= 600 val expectedOffsets = 1L.to(allEvents.size).map(Sequence.apply) allEvents.map(_.offset) shouldBe expectedOffsets diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala index 7447ed7f6..a7d160f7c 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala @@ -28,7 +28,7 @@ object EventsByTagMigrationTest { "jdbc-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString)) } -abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(config, migrationConfigOverride) { +abstract class EventsByTagMigrationTest(configS: String) extends QueryTestSpec(configS, migrationConfigOverride) { final val NoMsgTime: FiniteDuration = 100.millis val tagTableCfg = journalConfig.eventTagTableConfiguration @@ -158,7 +158,7 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co // override this, so we can reset the value. def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = { - val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) { + val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(configS)) { case (conf, (path, configValue)) => conf.withValue(path, configValue) } diff --git a/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala index cac2ff1dc..ebfbc90c7 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala @@ -12,7 +12,7 @@ import scala.concurrent.duration._ import org.scalatest.matchers.should.Matchers abstract class HardDeleteQueryTest(config: String) extends QueryTestSpec(config) with Matchers { - implicit val askTimeout = 500.millis + implicit val askTimeout: FiniteDuration = 500.millis it should "not return deleted events when using CurrentEventsByTag" in withActorSystem { implicit system => val journalOps = new ScalaJdbcReadJournalOperations(system) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index 66870c136..fdd66adfd 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -43,7 +43,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration val journalTableCfg = journalConfig.journalTableConfiguration - implicit val askTimeout = 50.millis + implicit val askTimeout: FiniteDuration = 50.millis def generateId: Int = 0 diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala index 0ff220c5b..1638c7c4b 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala @@ -33,7 +33,7 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) import profile.api._ - implicit val askTimeout = 50.millis + implicit val askTimeout: FiniteDuration = 50.millis def generateId: Int = 0 @@ -75,11 +75,12 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) val startTime = System.currentTimeMillis() withJournalSequenceActor(db, maxTries = 100) { actor => - val patienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) + implicit val patienceConfig: PatienceConfig = + PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe elements - }(patienceConfig, implicitly, implicitly) + } } val timeTaken = System.currentTimeMillis() - startTime log.info(s"Recovered all events in $timeTaken ms") @@ -109,11 +110,12 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) withJournalSequenceActor(db, maxTries = 2) { actor => // Should normally recover after `maxTries` seconds - val patienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) + implicit val patienceConfig: PatienceConfig = + PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe lastElement - }(patienceConfig, implicitly, implicitly) + } } } } @@ -145,11 +147,11 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) withJournalSequenceActor(db, maxTries = 2) { actor => // The actor should assume the max after 2 seconds - val patienceConfig = PatienceConfig(3.seconds) + implicit val patienceConfig: PatienceConfig = PatienceConfig(3.seconds) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe highestValue - }(patienceConfig, implicitly, implicitly) + } } } } diff --git a/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala b/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala index 717c68cfb..73b8356f6 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala @@ -27,7 +27,7 @@ object TestProbeReadJournalDao { */ class TestProbeReadJournalDao(val probe: TestProbe) extends ReadJournalDao { // Since the testprobe is instrumented by the test, it should respond very fast - implicit val askTimeout = Timeout(100.millis) + implicit val askTimeout: Timeout = Timeout(100.millis) /** * Returns distinct stream of persistenceIds diff --git a/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala b/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala index 65146e4da..f4c942c6d 100644 --- a/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala @@ -34,7 +34,7 @@ abstract class StoreOnlySerializableMessagesTest(config: String, schemaType: Sch override val receiveCommand: Receive = LoggingReceive { case msg => persist(msg) { _ => - sender ! akka.actor.Status.Success("") + sender() ! akka.actor.Status.Success("") } } diff --git a/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala b/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala index b20fba0d6..208d5a0f2 100644 --- a/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala @@ -18,6 +18,8 @@ import scala.concurrent.duration._ import akka.persistence.jdbc.testkit.internal.H2 import akka.persistence.jdbc.testkit.internal.SchemaType +import scala.concurrent.ExecutionContext + abstract class JdbcSnapshotStoreSpec(config: Config, schemaType: SchemaType) extends SnapshotStoreSpec(config) with BeforeAndAfterAll @@ -26,7 +28,7 @@ abstract class JdbcSnapshotStoreSpec(config: Config, schemaType: SchemaType) with DropCreate { implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds) - implicit lazy val ec = system.dispatcher + implicit lazy val ec: ExecutionContext = system.dispatcher lazy val cfg = system.settings.config.getConfig("jdbc-journal") diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala index 5da6e3c9c..809981339 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala @@ -26,7 +26,7 @@ abstract class DurableStateSequenceActorTest(config: Config, schemaType: SchemaT val durableStateSequenceActorConfig = durableStateConfig.stateSequenceConfig - implicit val askTimeout = 50.millis + implicit val askTimeout: FiniteDuration = 50.millis implicit val timeout: Timeout = Timeout(1.minute) "A DurableStateSequenceActor" must { diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala index 3eccb4c87..00c68b1af 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala @@ -17,6 +17,8 @@ import org.scalatest.time.Millis import org.scalatest.time.Seconds import org.scalatest.time.Span +import scala.concurrent.Future + abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) extends StateSpecBase(config, schemaType) { override implicit val defaultPatience = @@ -284,7 +286,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte whenReady { currentChanges("t1", NoOffset) .collect { case u: UpdatedDurableState[String] => u } - .runWith(Sink.seq[UpdatedDurableState[String]]) + .runWith(Sink.seq): Future[Seq[UpdatedDurableState[String]]] } { chgs => chgs.map(_.offset.value) shouldBe sorted chgs.map(_.offset.value).max shouldBe 3000 @@ -293,7 +295,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte whenReady { currentChanges("t1", Sequence(2000)) .collect { case u: UpdatedDurableState[String] => u } - .runWith(Sink.seq[UpdatedDurableState[String]]) + .runWith(Sink.seq): Future[Seq[UpdatedDurableState[String]]] } { chgs => chgs.map(_.offset.value) shouldBe sorted chgs.map(_.offset.value).max shouldBe 3000 diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala index 156e0bf3c..7ea961952 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala @@ -7,6 +7,7 @@ package akka.persistence.jdbc.state.scaladsl import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext import scala.util.{ Failure, Success } import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -32,7 +33,7 @@ abstract class StateSpecBase(val config: Config, schemaType: SchemaType) with DataGenerationHelper { implicit def system: ActorSystem - implicit lazy val e = system.dispatcher + implicit lazy val e: ExecutionContext = system.dispatcher private[jdbc] def schemaTypeToProfile(s: SchemaType) = s match { case H2 => slick.jdbc.H2Profile diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala index 8596cd6d4..73b76a640 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala @@ -32,7 +32,7 @@ class TestProbeDurableStateStoreQuery( serialization: Serialization)(override implicit val system: ExtendedActorSystem) extends JdbcDurableStateStore[String](db, profile, durableStateConfig, serialization)(system) { - implicit val askTimeout = Timeout(100.millis) + implicit val askTimeout: Timeout = Timeout(100.millis) override def getObject(persistenceId: String): Future[GetObjectResult[String]] = ??? override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[String], NotUsed] = ??? diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala index 605f0e491..b0f3a2a0f 100644 --- a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala @@ -14,7 +14,6 @@ import akka.persistence.jdbc.db.SlickExtension import akka.persistence.jdbc.journal.dao.JournalQueries import akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalSerializer import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow } -import akka.persistence.jdbc.migrator.JournalMigrator.{ JournalConfig, ReadJournalConfig } import akka.persistence.jdbc.query.dao.legacy.ReadJournalQueries import akka.serialization.{ Serialization, SerializationExtension } import akka.stream.scaladsl.Source @@ -38,13 +37,14 @@ final case class JournalMigrator(profile: JdbcProfile)(implicit system: ActorSys val log: Logger = LoggerFactory.getLogger(getClass) // get the various configurations - private val journalConfig: JournalConfig = new JournalConfig(system.settings.config.getConfig(JournalConfig)) + private val journalConfig: JournalConfig = new JournalConfig( + system.settings.config.getConfig(JournalMigrator.JournalConfig)) private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig( - system.settings.config.getConfig(ReadJournalConfig)) + system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)) // the journal database private val journalDB: JdbcBackend.Database = - SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database + SlickExtension(system).database(system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)).database // get an instance of the new journal queries private val newJournalQueries: JournalQueries = diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala index 7ea5fdfc9..bb40e71ca 100644 --- a/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala @@ -16,7 +16,6 @@ import akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables.SnapshotRow import akka.serialization.{ Serialization, SerializationExtension } import akka.stream.scaladsl.{ Sink, Source } import akka.Done -import akka.persistence.jdbc.migrator.JournalMigrator.ReadJournalConfig import akka.persistence.jdbc.migrator.SnapshotMigrator.{ NoParallelism, SnapshotStoreConfig } import org.slf4j.{ Logger, LoggerFactory } import slick.jdbc @@ -38,13 +37,13 @@ case class SnapshotMigrator(profile: JdbcProfile)(implicit system: ActorSystem) private val snapshotConfig: SnapshotConfig = new SnapshotConfig(system.settings.config.getConfig(SnapshotStoreConfig)) private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig( - system.settings.config.getConfig(ReadJournalConfig)) + system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)) private val snapshotDB: jdbc.JdbcBackend.Database = SlickExtension(system).database(system.settings.config.getConfig(SnapshotStoreConfig)).database private val journalDB: JdbcBackend.Database = - SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database + SlickExtension(system).database(system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)).database private val serialization: Serialization = SerializationExtension(system) private val queries: SnapshotQueries = new SnapshotQueries(profile, snapshotConfig.legacySnapshotTableConfiguration) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 76dc4a42c..cbbfc18da 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,13 +3,14 @@ import sbt._ object Dependencies { val Scala213 = "2.13.12" + val Scala3 = "3.3.1" - val ScalaVersions = Seq(Scala213) + val ScalaVersions = Seq(Scala213, Scala3) val AkkaVersion = "2.9.0" val AkkaBinaryVersion = AkkaVersion.take(3) - val SlickVersion = "3.4.1" + val SlickVersion = "3.5.0" val ScalaTestVersion = "3.2.18" val JdbcDrivers = Seq( diff --git a/project/ProjectAutoPlugin.scala b/project/ProjectAutoPlugin.scala index 9c7735a46..cb07d605f 100644 --- a/project/ProjectAutoPlugin.scala +++ b/project/ProjectAutoPlugin.scala @@ -108,6 +108,7 @@ object ProjectAutoPlugin extends AutoPlugin { "-Ywarn-nullary-unit", "-Ywarn-unused:_", "-Ypartial-unification", - "-Ywarn-extra-implicit") + "-Ywarn-extra-implicit", + "-Xsource:3") }