From dd1540c5635bebfedf7d0069bc049f6bf36acc43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Mon, 16 Oct 2023 10:52:27 +0100 Subject: [PATCH 01/15] compiles --- .../jdbc/journal/dao/JournalTables.scala | 7 +++++++ .../journal/dao/legacy/ByteArrayJournalDao.scala | 3 ++- .../jdbc/journal/dao/legacy/package.scala | 3 +++ .../jdbc/query/JdbcReadJournalProvider.scala | 5 +++-- .../jdbc/query/JournalSequenceActor.scala | 4 ++-- .../query/dao/legacy/ByteArrayReadJournalDao.scala | 6 ++++-- .../jdbc/snapshot/dao/SnapshotTables.scala | 4 ++++ .../jdbc/snapshot/dao/legacy/SnapshotTables.scala | 3 +++ .../jdbc/state/DurableStateQueries.scala | 2 +- .../jdbc/state/DurableStateTables.scala | 3 +++ .../jdbc/state/JdbcDurableStateStoreProvider.scala | 4 ++-- .../state/scaladsl/DurableStateSequenceActor.scala | 2 +- .../state/scaladsl/JdbcDurableStateStore.scala | 12 ++++-------- .../jdbc/SharedActorSystemTestSpec.scala | 2 +- .../jdbc/SingleActorSystemPerTestSpec.scala | 2 +- .../jdbc/journal/JdbcJournalPerfSpec.scala | 3 ++- .../persistence/jdbc/journal/JdbcJournalSpec.scala | 3 ++- .../jdbc/query/HardDeleteQueryTest.scala | 2 +- .../query/JournalDaoStreamMessagesMemoryTest.scala | 2 +- .../jdbc/query/JournalSequenceActorTest.scala | 14 +++++++------- .../jdbc/query/dao/TestProbeReadJournalDao.scala | 2 +- .../StoreOnlySerializableMessagesTest.scala | 2 +- .../jdbc/snapshot/JdbcSnapshotStoreSpec.scala | 4 +++- .../scaladsl/DurableStateSequenceActorTest.scala | 2 +- .../jdbc/state/scaladsl/JdbcDurableStateSpec.scala | 6 ++++-- .../jdbc/state/scaladsl/StateSpecBase.scala | 3 ++- .../scaladsl/TestProbeDurableStateStoreQuery.scala | 2 +- .../jdbc/migrator/JournalMigrator.scala | 8 ++++---- .../jdbc/migrator/SnapshotMigrator.scala | 5 ++--- project/Dependencies.scala | 5 +++-- project/ProjectAutoPlugin.scala | 3 ++- 31 files changed, 78 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala index c98de1a20..18165a97e 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala @@ -29,7 +29,14 @@ object JournalTables { metaSerId: Option[Int], metaSerManifest: Option[String]) + object JournalAkkaSerializationRow { + def tupled = (JournalAkkaSerializationRow.apply _).tupled + } + case class TagRow(eventId: Long, tag: String) + object TagRow { + def tupled = (TagRow.apply _).tupled + } } /** diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index 5a4591792..6d88716dd 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -28,7 +28,8 @@ class ByteArrayJournalDao( serialization: Serialization)(implicit val ec: ExecutionContext, val mat: Materializer) extends BaseByteArrayJournalDao { val queries = new JournalQueries(profile, journalConfig.journalTableConfiguration) - val serializer = new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator) + val serializer: ByteArrayJournalSerializer = + new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator) } /** diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala index daa55e209..de4e98753 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala @@ -15,6 +15,9 @@ package object legacy { sequenceNumber: Long, message: Array[Byte], tags: Option[String] = None) + object JournalRow { + def tupled = (JournalRow.apply _).tupled + } def encodeTags(tags: Set[String], separator: String): Option[String] = if (tags.isEmpty) None else Option(tags.mkString(separator)) diff --git a/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala b/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala index dddb99ad9..800aec84e 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/JdbcReadJournalProvider.scala @@ -11,7 +11,8 @@ import com.typesafe.config.Config class JdbcReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String) extends ReadJournalProvider { - override val scaladslReadJournal = new scaladsl.JdbcReadJournal(config, configPath)(system) + override def scaladslReadJournal(): scaladsl.JdbcReadJournal = + new scaladsl.JdbcReadJournal(config, configPath)(system) - override val javadslReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal) + override def javadslReadJournal(): javadsl.JdbcReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal()) } diff --git a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala index 370de6708..5d00553c0 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala @@ -38,7 +38,7 @@ object JournalSequenceActor { * Efficient representation of missing elements using NumericRanges. * It can be seen as a collection of OrderingIds */ - private case class MissingElements(elements: Seq[NumericRange[OrderingId]]) { + case class MissingElements(elements: Seq[NumericRange[OrderingId]]) { def addRange(from: OrderingId, until: OrderingId): MissingElements = { val newRange = from.until(until) MissingElements(elements :+ newRange) @@ -46,7 +46,7 @@ object JournalSequenceActor { def contains(id: OrderingId): Boolean = elements.exists(_.containsTyped(id)) def isEmpty: Boolean = elements.forall(_.isEmpty) } - private object MissingElements { + object MissingElements { def empty: MissingElements = MissingElements(Vector.empty) } } diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala index b6fc76209..129f9b3b5 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala @@ -112,7 +112,8 @@ trait OracleReadJournalDao extends ReadJournalDao { } } - implicit val getJournalRow = GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<)) + implicit val getJournalRow: GetResult[JournalRow] = + GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<)) abstract override def eventsByTag( tag: String, @@ -155,5 +156,6 @@ class ByteArrayReadJournalDao( extends BaseByteArrayReadJournalDao with OracleReadJournalDao { val queries = new ReadJournalQueries(profile, readJournalConfig) - val serializer = new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator) + val serializer: ByteArrayJournalSerializer = + new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator) } diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala index e2d0b53f0..0a9814fef 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala @@ -21,6 +21,10 @@ object SnapshotTables { metaSerId: Option[Int], metaSerManifest: Option[String], metaPayload: Option[Array[Byte]]) + + object SnapshotRow { + def tupled = (SnapshotRow.apply _).tupled + } } trait SnapshotTables { diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala index 87fde25ff..1a76a1fbd 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala @@ -12,6 +12,9 @@ import slick.jdbc.JdbcProfile object SnapshotTables { case class SnapshotRow(persistenceId: String, sequenceNumber: Long, created: Long, snapshot: Array[Byte]) + object SnapshotRow { + def tupled = (SnapshotRow.apply _).tupled + } def isOracleDriver(profile: JdbcProfile): Boolean = profile match { case _: slick.jdbc.OracleProfile => true diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala index c4bb6a034..6a98adad2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala @@ -42,7 +42,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration case _ => ??? } - implicit val uuidSetter = SetParameter[Array[Byte]] { case (bytes, params) => + implicit val uuidSetter: SetParameter[Array[Byte]] = SetParameter[Array[Byte]] { case (bytes, params) => params.setBytes(bytes) } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala index d76b57e24..6231f39f6 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala @@ -21,6 +21,9 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration stateSerId: Int, stateSerManifest: Option[String], stateTimestamp: Long) + object DurableStateRow { + def tupled = (DurableStateRow.apply _).tupled + } } /** diff --git a/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala b/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala index 5c961de8e..bf664d7a2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/JdbcDurableStateStoreProvider.scala @@ -33,10 +33,10 @@ class JdbcDurableStateStoreProvider[A](system: ExtendedActorSystem) extends Dura lazy val serialization = SerializationExtension(system) val profile: JdbcProfile = slickDb.profile - override val scaladslDurableStateStore: DurableStateStore[Any] = + override def scaladslDurableStateStore(): DurableStateStore[Any] = new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig, serialization)(system) - override val javadslDurableStateStore: JDurableStateStore[AnyRef] = + override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = new javadsl.JdbcDurableStateStore[AnyRef]( profile, durableStateConfig, diff --git a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala index fe91acb29..9b41bed37 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala @@ -46,7 +46,7 @@ import akka.annotation.InternalApi * Efficient representation of missing elements using NumericRanges. * It can be seen as a collection of GlobalOffset */ - private case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) { + case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) { def addRange(from: GlobalOffset, until: GlobalOffset): MissingElements = { val newRange = from.until(until) MissingElements(elements :+ newRange) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala index fc51b8cce..884ad44e8 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala @@ -211,20 +211,16 @@ class JdbcDurableStateStore[A]( } private def updateDurableState(row: DurableStateTables.DurableStateRow) = { - import queries._ - for { - s <- getSequenceNextValueExpr() - u <- updateDbWithDurableState(row, s.head) + s <- queries.getSequenceNextValueExpr() + u <- queries.updateDbWithDurableState(row, s.head) } yield u } private def insertDurableState(row: DurableStateTables.DurableStateRow) = { - import queries._ - for { - s <- getSequenceNextValueExpr() - u <- insertDbWithDurableState(row, s.head) + s <- queries.getSequenceNextValueExpr() + u <- queries.insertDbWithDurableState(row, s.head) } yield u } diff --git a/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala b/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala index 225a026e2..96c531c12 100644 --- a/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/SharedActorSystemTestSpec.scala @@ -28,7 +28,7 @@ abstract class SharedActorSystemTestSpec(val config: Config) extends SimpleSpec implicit lazy val ec: ExecutionContext = system.dispatcher implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute) - implicit val timeout = Timeout(1.minute) + implicit val timeout: Timeout = Timeout(1.minute) lazy val serialization = SerializationExtension(system) diff --git a/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala b/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala index 5ddac2aa2..133b9cf44 100644 --- a/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala @@ -26,7 +26,7 @@ abstract class SingleActorSystemPerTestSpec(val config: Config) conf.withValue(path, configValue) }) - implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 1.minute) implicit val timeout: Timeout = Timeout(1.minute) val cfg = config.getConfig("jdbc-journal") diff --git a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala index a14f47c40..bbaed3024 100644 --- a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala @@ -18,6 +18,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType) @@ -29,7 +30,7 @@ abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType) with DropCreate { override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true - implicit lazy val ec = system.dispatcher + implicit lazy val ec: ExecutionContext = system.dispatcher implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes) diff --git a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala index b129a9885..7c01be9ba 100644 --- a/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala @@ -15,6 +15,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType) @@ -28,7 +29,7 @@ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType) implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds) - implicit lazy val ec = system.dispatcher + implicit lazy val ec: ExecutionContext = system.dispatcher lazy val cfg = system.settings.config.getConfig("jdbc-journal") diff --git a/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala index cac2ff1dc..ebfbc90c7 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala @@ -12,7 +12,7 @@ import scala.concurrent.duration._ import org.scalatest.matchers.should.Matchers abstract class HardDeleteQueryTest(config: String) extends QueryTestSpec(config) with Matchers { - implicit val askTimeout = 500.millis + implicit val askTimeout: FiniteDuration = 500.millis it should "not return deleted events when using CurrentEventsByTag" in withActorSystem { implicit system => val journalOps = new ScalaJdbcReadJournalOperations(system) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index 66870c136..fdd66adfd 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -43,7 +43,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration val journalTableCfg = journalConfig.journalTableConfiguration - implicit val askTimeout = 50.millis + implicit val askTimeout: FiniteDuration = 50.millis def generateId: Int = 0 diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala index 0ff220c5b..8115fd2df 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala @@ -33,7 +33,7 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) import profile.api._ - implicit val askTimeout = 50.millis + implicit val askTimeout: FiniteDuration = 50.millis def generateId: Int = 0 @@ -75,11 +75,11 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) val startTime = System.currentTimeMillis() withJournalSequenceActor(db, maxTries = 100) { actor => - val patienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) + implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe elements - }(patienceConfig, implicitly, implicitly) + } } val timeTaken = System.currentTimeMillis() - startTime log.info(s"Recovered all events in $timeTaken ms") @@ -109,11 +109,11 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) withJournalSequenceActor(db, maxTries = 2) { actor => // Should normally recover after `maxTries` seconds - val patienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) + implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe lastElement - }(patienceConfig, implicitly, implicitly) + } } } } @@ -145,11 +145,11 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) withJournalSequenceActor(db, maxTries = 2) { actor => // The actor should assume the max after 2 seconds - val patienceConfig = PatienceConfig(3.seconds) + implicit val patienceConfig: PatienceConfig = PatienceConfig(3.seconds) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe highestValue - }(patienceConfig, implicitly, implicitly) + } } } } diff --git a/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala b/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala index 717c68cfb..73b8356f6 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala @@ -27,7 +27,7 @@ object TestProbeReadJournalDao { */ class TestProbeReadJournalDao(val probe: TestProbe) extends ReadJournalDao { // Since the testprobe is instrumented by the test, it should respond very fast - implicit val askTimeout = Timeout(100.millis) + implicit val askTimeout: Timeout = Timeout(100.millis) /** * Returns distinct stream of persistenceIds diff --git a/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala b/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala index 65146e4da..f4c942c6d 100644 --- a/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala @@ -34,7 +34,7 @@ abstract class StoreOnlySerializableMessagesTest(config: String, schemaType: Sch override val receiveCommand: Receive = LoggingReceive { case msg => persist(msg) { _ => - sender ! akka.actor.Status.Success("") + sender() ! akka.actor.Status.Success("") } } diff --git a/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala b/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala index b20fba0d6..208d5a0f2 100644 --- a/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/snapshot/JdbcSnapshotStoreSpec.scala @@ -18,6 +18,8 @@ import scala.concurrent.duration._ import akka.persistence.jdbc.testkit.internal.H2 import akka.persistence.jdbc.testkit.internal.SchemaType +import scala.concurrent.ExecutionContext + abstract class JdbcSnapshotStoreSpec(config: Config, schemaType: SchemaType) extends SnapshotStoreSpec(config) with BeforeAndAfterAll @@ -26,7 +28,7 @@ abstract class JdbcSnapshotStoreSpec(config: Config, schemaType: SchemaType) with DropCreate { implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds) - implicit lazy val ec = system.dispatcher + implicit lazy val ec: ExecutionContext = system.dispatcher lazy val cfg = system.settings.config.getConfig("jdbc-journal") diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala index 5da6e3c9c..809981339 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/DurableStateSequenceActorTest.scala @@ -26,7 +26,7 @@ abstract class DurableStateSequenceActorTest(config: Config, schemaType: SchemaT val durableStateSequenceActorConfig = durableStateConfig.stateSequenceConfig - implicit val askTimeout = 50.millis + implicit val askTimeout: FiniteDuration = 50.millis implicit val timeout: Timeout = Timeout(1.minute) "A DurableStateSequenceActor" must { diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala index 3eccb4c87..00c68b1af 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala @@ -17,6 +17,8 @@ import org.scalatest.time.Millis import org.scalatest.time.Seconds import org.scalatest.time.Span +import scala.concurrent.Future + abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) extends StateSpecBase(config, schemaType) { override implicit val defaultPatience = @@ -284,7 +286,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte whenReady { currentChanges("t1", NoOffset) .collect { case u: UpdatedDurableState[String] => u } - .runWith(Sink.seq[UpdatedDurableState[String]]) + .runWith(Sink.seq): Future[Seq[UpdatedDurableState[String]]] } { chgs => chgs.map(_.offset.value) shouldBe sorted chgs.map(_.offset.value).max shouldBe 3000 @@ -293,7 +295,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte whenReady { currentChanges("t1", Sequence(2000)) .collect { case u: UpdatedDurableState[String] => u } - .runWith(Sink.seq[UpdatedDurableState[String]]) + .runWith(Sink.seq): Future[Seq[UpdatedDurableState[String]]] } { chgs => chgs.map(_.offset.value) shouldBe sorted chgs.map(_.offset.value).max shouldBe 3000 diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala index 156e0bf3c..7ea961952 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/StateSpecBase.scala @@ -7,6 +7,7 @@ package akka.persistence.jdbc.state.scaladsl import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext import scala.util.{ Failure, Success } import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -32,7 +33,7 @@ abstract class StateSpecBase(val config: Config, schemaType: SchemaType) with DataGenerationHelper { implicit def system: ActorSystem - implicit lazy val e = system.dispatcher + implicit lazy val e: ExecutionContext = system.dispatcher private[jdbc] def schemaTypeToProfile(s: SchemaType) = s match { case H2 => slick.jdbc.H2Profile diff --git a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala index 8596cd6d4..73b76a640 100644 --- a/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala +++ b/core/src/test/scala/akka/persistence/jdbc/state/scaladsl/TestProbeDurableStateStoreQuery.scala @@ -32,7 +32,7 @@ class TestProbeDurableStateStoreQuery( serialization: Serialization)(override implicit val system: ExtendedActorSystem) extends JdbcDurableStateStore[String](db, profile, durableStateConfig, serialization)(system) { - implicit val askTimeout = Timeout(100.millis) + implicit val askTimeout: Timeout = Timeout(100.millis) override def getObject(persistenceId: String): Future[GetObjectResult[String]] = ??? override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[String], NotUsed] = ??? diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala index b51cf133b..56bc9da56 100644 --- a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala @@ -14,7 +14,6 @@ import akka.persistence.jdbc.db.SlickExtension import akka.persistence.jdbc.journal.dao.JournalQueries import akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalSerializer import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow } -import akka.persistence.jdbc.migrator.JournalMigrator.{ JournalConfig, ReadJournalConfig } import akka.persistence.jdbc.query.dao.legacy.ReadJournalQueries import akka.serialization.{ Serialization, SerializationExtension } import akka.stream.scaladsl.Source @@ -38,13 +37,14 @@ final case class JournalMigrator(profile: JdbcProfile)(implicit system: ActorSys val log: Logger = LoggerFactory.getLogger(getClass) // get the various configurations - private val journalConfig: JournalConfig = new JournalConfig(system.settings.config.getConfig(JournalConfig)) + private val journalConfig: JournalConfig = new JournalConfig( + system.settings.config.getConfig(JournalMigrator.JournalConfig)) private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig( - system.settings.config.getConfig(ReadJournalConfig)) + system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)) // the journal database private val journalDB: JdbcBackend.Database = - SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database + SlickExtension(system).database(system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)).database // get an instance of the new journal queries private val newJournalQueries: JournalQueries = diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala index 7ea5fdfc9..bb40e71ca 100644 --- a/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala @@ -16,7 +16,6 @@ import akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables.SnapshotRow import akka.serialization.{ Serialization, SerializationExtension } import akka.stream.scaladsl.{ Sink, Source } import akka.Done -import akka.persistence.jdbc.migrator.JournalMigrator.ReadJournalConfig import akka.persistence.jdbc.migrator.SnapshotMigrator.{ NoParallelism, SnapshotStoreConfig } import org.slf4j.{ Logger, LoggerFactory } import slick.jdbc @@ -38,13 +37,13 @@ case class SnapshotMigrator(profile: JdbcProfile)(implicit system: ActorSystem) private val snapshotConfig: SnapshotConfig = new SnapshotConfig(system.settings.config.getConfig(SnapshotStoreConfig)) private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig( - system.settings.config.getConfig(ReadJournalConfig)) + system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)) private val snapshotDB: jdbc.JdbcBackend.Database = SlickExtension(system).database(system.settings.config.getConfig(SnapshotStoreConfig)).database private val journalDB: JdbcBackend.Database = - SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database + SlickExtension(system).database(system.settings.config.getConfig(JournalMigrator.ReadJournalConfig)).database private val serialization: Serialization = SerializationExtension(system) private val queries: SnapshotQueries = new SnapshotQueries(profile, snapshotConfig.legacySnapshotTableConfiguration) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c3736e60e..24a1477f6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,13 +3,14 @@ import sbt._ object Dependencies { val Scala213 = "2.13.12" + val Scala3 = "3.3.1" - val ScalaVersions = Seq(Scala213) + val ScalaVersions = Seq(Scala213, Scala3) val AkkaVersion = "2.9.0-M3" val AkkaBinaryVersion = AkkaVersion.take(3) - val SlickVersion = "3.4.1" + val SlickVersion = "3.5.0-M4" val ScalaTestVersion = "3.2.17" val JdbcDrivers = Seq( diff --git a/project/ProjectAutoPlugin.scala b/project/ProjectAutoPlugin.scala index 9c7735a46..cb07d605f 100644 --- a/project/ProjectAutoPlugin.scala +++ b/project/ProjectAutoPlugin.scala @@ -108,6 +108,7 @@ object ProjectAutoPlugin extends AutoPlugin { "-Ywarn-nullary-unit", "-Ywarn-unused:_", "-Ypartial-unification", - "-Ywarn-extra-implicit") + "-Ywarn-extra-implicit", + "-Xsource:3") } From 245acc3d4eaea1d9835f62d386be1f047bd1e1b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Mon, 16 Oct 2023 13:01:09 +0100 Subject: [PATCH 02/15] scalafmt --- .../persistence/jdbc/query/JournalSequenceActorTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala index 8115fd2df..1638c7c4b 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala @@ -75,7 +75,8 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) val startTime = System.currentTimeMillis() withJournalSequenceActor(db, maxTries = 100) { actor => - implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) + implicit val patienceConfig: PatienceConfig = + PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe elements @@ -109,7 +110,8 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) withJournalSequenceActor(db, maxTries = 2) { actor => // Should normally recover after `maxTries` seconds - implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) + implicit val patienceConfig: PatienceConfig = + PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis)) eventually { val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering currentMax shouldBe lastElement From 33aec5975ac0c7656bc6b3eb19c5bdfdf1926533 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 14 Feb 2024 15:49:15 +0100 Subject: [PATCH 03/15] slick 3.5.0-RC1 out now --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 24a1477f6..fdd727800 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -10,7 +10,7 @@ object Dependencies { val AkkaVersion = "2.9.0-M3" val AkkaBinaryVersion = AkkaVersion.take(3) - val SlickVersion = "3.5.0-M4" + val SlickVersion = "3.5.0-RC1" val ScalaTestVersion = "3.2.17" val JdbcDrivers = Seq( From 87cedb0b9c7f4e331419332e9dcfdfbbc78abb32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Thu, 15 Feb 2024 16:47:34 +0000 Subject: [PATCH 04/15] mima filters --- .../issue-775-slick-3.50.excludes | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes diff --git a/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes new file mode 100644 index 000000000..41d841fa2 --- /dev/null +++ b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes @@ -0,0 +1,43 @@ +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.unapply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.tupled") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.curried") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.database") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy$default$1") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.this") + +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.LazySlickDatabase.database") + +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.forConfig") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.database") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.db.SlickDatabase.database") + +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.JdbcAsyncWriteJournal.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.snapshot.JdbcSnapshotStore.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.state.JdbcDurableStateStoreProvider.db") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.this") + +ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.JournalTables$JournalAkkaSerializationRow$") +ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.legacy.package$JournalRow$") +ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.snapshot.dao.SnapshotTables$SnapshotRow$") +ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables$SnapshotRow$") +ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.state.DurableStateTables$DurableStateRow$") + + From 1f123edf8884531e49901d464e5573d65b15a519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Thu, 15 Feb 2024 17:22:45 +0000 Subject: [PATCH 05/15] remove tupled methods to reduce the number of mima violations --- .../issue-775-slick-3.50.excludes | 5 ----- .../persistence/jdbc/journal/dao/JournalTables.scala | 11 ++--------- .../jdbc/journal/dao/legacy/JournalTables.scala | 2 +- .../persistence/jdbc/journal/dao/legacy/package.scala | 3 --- .../jdbc/snapshot/dao/SnapshotTables.scala | 5 +---- .../jdbc/snapshot/dao/legacy/SnapshotTables.scala | 5 +---- .../persistence/jdbc/state/DurableStateTables.scala | 5 +---- 7 files changed, 6 insertions(+), 30 deletions(-) diff --git a/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes index 41d841fa2..a1e6381d8 100644 --- a/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes +++ b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes @@ -34,10 +34,5 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snaps ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.state.JdbcDurableStateStoreProvider.db") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.this") -ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.JournalTables$JournalAkkaSerializationRow$") -ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.legacy.package$JournalRow$") -ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.snapshot.dao.SnapshotTables$SnapshotRow$") -ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables$SnapshotRow$") -ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.state.DurableStateTables$DurableStateRow$") diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala index cbcf6f82e..3b816a12d 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala @@ -29,14 +29,7 @@ object JournalTables { metaSerId: Option[Int], metaSerManifest: Option[String]) - object JournalAkkaSerializationRow { - def tupled = (JournalAkkaSerializationRow.apply _).tupled - } - case class TagRow(eventId: Option[Long], persistenceId: Option[String], sequenceNumber: Option[Long], tag: String) - object TagRow { - def tupled = (TagRow.apply _).tupled - } } /** @@ -71,7 +64,7 @@ trait JournalTables { eventSerManifest, metaPayload, metaSerId, - metaSerManifest) <> (JournalAkkaSerializationRow.tupled, JournalAkkaSerializationRow.unapply) + metaSerManifest) <> ((JournalAkkaSerializationRow.apply _).tupled, JournalAkkaSerializationRow.unapply) val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc) val persistenceId: Rep[String] = @@ -98,7 +91,7 @@ trait JournalTables { lazy val JournalTable = new TableQuery(tag => new JournalEvents(tag)) class EventTags(_tableTag: Tag) extends Table[TagRow](_tableTag, tagTableCfg.schemaName, tagTableCfg.tableName) { - override def * = (eventId, persistenceId, sequenceNumber, tag) <> (TagRow.tupled, TagRow.unapply) + override def * = (eventId, persistenceId, sequenceNumber, tag) <> ((TagRow.apply _).tupled, TagRow.unapply) // allow null value insert. val eventId: Rep[Option[Long]] = column[Option[Long]](tagTableCfg.columnNames.eventId) val persistenceId: Rep[Option[String]] = column[Option[String]](tagTableCfg.columnNames.persistenceId) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala index 16a507c8b..7db2dca0c 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala @@ -19,7 +19,7 @@ trait JournalTables { _tableTag, _schemaName = journalTableCfg.schemaName, _tableName = journalTableCfg.tableName) { - def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> (JournalRow.tupled, JournalRow.unapply) + def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> ((JournalRow.apply _).tupled, JournalRow.unapply) val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc) val persistenceId: Rep[String] = diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala index de4e98753..daa55e209 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/package.scala @@ -15,9 +15,6 @@ package object legacy { sequenceNumber: Long, message: Array[Byte], tags: Option[String] = None) - object JournalRow { - def tupled = (JournalRow.apply _).tupled - } def encodeTags(tags: Set[String], separator: String): Option[String] = if (tags.isEmpty) None else Option(tags.mkString(separator)) diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala index 0a9814fef..7bb4378f1 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala @@ -22,9 +22,6 @@ object SnapshotTables { metaSerManifest: Option[String], metaPayload: Option[Array[Byte]]) - object SnapshotRow { - def tupled = (SnapshotRow.apply _).tupled - } } trait SnapshotTables { @@ -47,7 +44,7 @@ trait SnapshotTables { snapshotPayload, metaSerId, metaSerManifest, - metaPayload) <> (SnapshotRow.tupled, SnapshotRow.unapply) + metaPayload) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply) val persistenceId: Rep[String] = column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true)) diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala index 1a76a1fbd..16d36e379 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/SnapshotTables.scala @@ -12,9 +12,6 @@ import slick.jdbc.JdbcProfile object SnapshotTables { case class SnapshotRow(persistenceId: String, sequenceNumber: Long, created: Long, snapshot: Array[Byte]) - object SnapshotRow { - def tupled = (SnapshotRow.apply _).tupled - } def isOracleDriver(profile: JdbcProfile): Boolean = profile match { case _: slick.jdbc.OracleProfile => true @@ -34,7 +31,7 @@ trait SnapshotTables { _tableTag, _schemaName = snapshotTableCfg.schemaName, _tableName = snapshotTableCfg.tableName) { - def * = (persistenceId, sequenceNumber, created, snapshot) <> (SnapshotRow.tupled, SnapshotRow.unapply) + def * = (persistenceId, sequenceNumber, created, snapshot) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply) val persistenceId: Rep[String] = column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true)) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala index 6231f39f6..e6ef2afaa 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateTables.scala @@ -21,9 +21,6 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration stateSerId: Int, stateSerManifest: Option[String], stateTimestamp: Long) - object DurableStateRow { - def tupled = (DurableStateRow.apply _).tupled - } } /** @@ -44,7 +41,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration def * = (globalOffset, persistenceId, revision, statePayload, tag, stateSerId, stateSerManifest, stateTimestamp) - .<>(DurableStateRow.tupled, DurableStateRow.unapply) + .<>((DurableStateRow.apply _).tupled, DurableStateRow.unapply) val globalOffset: Rep[Long] = column[Long](durableStateTableCfg.columnNames.globalOffset, O.AutoInc) val persistenceId: Rep[String] = From 42d18ad5776fe1db18782d58ddd671e433c1bcd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Thu, 15 Feb 2024 17:26:01 +0000 Subject: [PATCH 06/15] no mimaPreviousArtifacts if running scala3 --- build.sbt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 21f479985..f0abaecc4 100644 --- a/build.sbt +++ b/build.sbt @@ -18,9 +18,15 @@ lazy val core = project name := "akka-persistence-jdbc", libraryDependencies ++= Dependencies.Libraries, mimaReportSignatureProblems := true, - mimaPreviousArtifacts := Set( - organization.value %% name.value % previousStableVersion.value.getOrElse( - throw new Error("Unable to determine previous version for MiMa")))) + mimaPreviousArtifacts := { + if (scalaVersion.value.startsWith("3")) { + Set.empty + } else { + Set( + organization.value %% name.value % previousStableVersion.value.getOrElse( + throw new Error("Unable to determine previous version for MiMa"))) + } + }) lazy val integration = project .in(file("integration")) From f8c9ff3904cf775f47845a9206bcfa8e509a2308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 16 Feb 2024 09:03:55 +0100 Subject: [PATCH 07/15] Formatting --- .../jdbc/journal/dao/legacy/JournalTables.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala index 7db2dca0c..1efd76024 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/JournalTables.scala @@ -19,7 +19,13 @@ trait JournalTables { _tableTag, _schemaName = journalTableCfg.schemaName, _tableName = journalTableCfg.tableName) { - def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> ((JournalRow.apply _).tupled, JournalRow.unapply) + def * = ( + ordering, + deleted, + persistenceId, + sequenceNumber, + message, + tags) <> ((JournalRow.apply _).tupled, JournalRow.unapply) val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc) val persistenceId: Rep[String] = From 2fc72ee042f887ccb2397816b62ae918ee6f525e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 16 Feb 2024 09:12:14 +0100 Subject: [PATCH 08/15] PR validation with added 3.3 Test/compile --- .github/workflows/checks.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 31d6b17e7..4ffbdced1 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -63,10 +63,13 @@ jobs: with: jvm: temurin:1.11.0 - - name: Compile all code with fatal warnings for Java 11, Scala 2.13 + - name: Compile all code with fatal warnings for Java 11 and Scala 2.13 # Run locally with: sbt 'clean ; +Test/compile ; +It/compile' run: sbt "; Test/compile" + - name: Compile all code with fatal warnings for Java 11 and Scala 3.3 + run: sbt "++3.3; Test/compile" + check-docs: name: Check Docs runs-on: ubuntu-22.04 From 45bbe4a7de22e077fb8dd3b7adb217bd519653e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Fri, 16 Feb 2024 09:41:27 +0000 Subject: [PATCH 09/15] fix cyclic reference compiler error by moving import of DurableStateSequenceActor --- .../persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala index 884ad44e8..ea14cffbd 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala @@ -25,7 +25,6 @@ import akka.serialization.Serialization import akka.stream.scaladsl.{ Sink, Source } import akka.stream.{ Materializer, SystemMaterializer } import akka.util.Timeout -import DurableStateSequenceActor._ import OffsetSyntax._ import akka.annotation.ApiMayChange import akka.persistence.query.UpdatedDurableState @@ -45,6 +44,7 @@ class JdbcDurableStateStore[A]( serialization: Serialization)(implicit val system: ExtendedActorSystem) extends DurableStateUpdateStore[A] with DurableStateStoreQuery[A] { + import DurableStateSequenceActor._ import FlowControl._ import profile.api._ From bafa917f9c6093fc6f66298584ba45f94e4b2085 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Fri, 16 Feb 2024 09:50:28 +0000 Subject: [PATCH 10/15] mark EagerSlickDatabase and LazySlickDatabase as internal --- .../main/scala/akka/persistence/jdbc/db/SlickDatabase.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala b/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala index c0ea00411..1a2300944 100644 --- a/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala +++ b/core/src/main/scala/akka/persistence/jdbc/db/SlickDatabase.scala @@ -6,6 +6,8 @@ package akka.persistence.jdbc.db import akka.actor.ActorSystem +import akka.annotation.InternalApi + import javax.naming.InitialContext import akka.persistence.jdbc.config.SlickConfiguration import com.typesafe.config.Config @@ -82,6 +84,7 @@ trait SlickDatabase { def allowShutdown: Boolean } +@InternalApi case class EagerSlickDatabase(database: Database, profile: JdbcProfile) extends SlickDatabase { override def allowShutdown: Boolean = true } @@ -90,6 +93,7 @@ case class EagerSlickDatabase(database: Database, profile: JdbcProfile) extends * A LazySlickDatabase lazily initializes a database, it also manages the shutdown of the database * @param config The configuration used to create the database */ +@InternalApi class LazySlickDatabase(config: Config, system: ActorSystem) extends SlickDatabase { val profile: JdbcProfile = SlickDatabase.profile(config, path = "") From e6422a4a866590b8e748116a13aad65067628586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Fri, 16 Feb 2024 09:54:08 +0000 Subject: [PATCH 11/15] make MissingElements private again --- .../persistence/jdbc/query/JournalSequenceActor.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala index 5d00553c0..390902a1d 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala @@ -38,7 +38,7 @@ object JournalSequenceActor { * Efficient representation of missing elements using NumericRanges. * It can be seen as a collection of OrderingIds */ - case class MissingElements(elements: Seq[NumericRange[OrderingId]]) { + private case class MissingElements(elements: Seq[NumericRange[OrderingId]]) { def addRange(from: OrderingId, until: OrderingId): MissingElements = { val newRange = from.until(until) MissingElements(elements :+ newRange) @@ -46,7 +46,7 @@ object JournalSequenceActor { def contains(id: OrderingId): Boolean = elements.exists(_.containsTyped(id)) def isEmpty: Boolean = elements.forall(_.isEmpty) } - object MissingElements { + private object MissingElements { def empty: MissingElements = MissingElements(Vector.empty) } } @@ -84,7 +84,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen * @param moduloCounter A counter which is incremented every time a new query have been executed, modulo `maxTries` * @param previousDelay The last used delay (may change in case failures occur) */ - def receive( + private def receive( currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], moduloCounter: Int, @@ -129,7 +129,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen /** * This method that implements the "find gaps" algo. It's the meat and main purpose of this actor. */ - def findGaps( + private def findGaps( elements: Seq[OrderingId], currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], From 5ceb36e1ab9241b8a47fc56275f6bff5c70f3648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Fri, 16 Feb 2024 11:36:46 +0000 Subject: [PATCH 12/15] fix mima after making some methods private in JournalSequenceActor --- .../5.4.0.backwards.excludes/issue-775-slick-3.50.excludes | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes index a1e6381d8..72a5768e5 100644 --- a/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes +++ b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-775-slick-3.50.excludes @@ -34,5 +34,10 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snaps ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.state.JdbcDurableStateStoreProvider.db") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.receive") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.receive$default$4") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.findGaps") + + From 7199f967ae3861af115f64bb828027adf6cbd1f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Fri, 16 Feb 2024 14:25:15 +0000 Subject: [PATCH 13/15] fix timeout in teste "complete without any gaps in case events are being persisted when the query is executed" increase toStrict timeout --- .../akka/persistence/jdbc/query/CurrentEventsByTagTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index f2389dd20..c8337c3a6 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -193,7 +193,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp => // The stream must complete within the given amount of time // This make take a while in case the journal sequence actor detects gaps - val allEvents = tp.toStrict(atMost = 20.seconds) + val allEvents = tp.toStrict(atMost = 40.seconds) allEvents.size should be >= 600 val expectedOffsets = 1L.to(allEvents.size).map(Sequence.apply) allEvents.map(_.offset) shouldBe expectedOffsets From 20f619f6ea2dba7ce1087345afdda1d106fb0850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Sat, 17 Feb 2024 00:44:59 +0000 Subject: [PATCH 14/15] workaround scala3 bug https://github.com/lampepfl/dotty/issues/19711 --- .../persistence/jdbc/query/EventsByTagMigrationTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala index 7447ed7f6..a7d160f7c 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala @@ -28,7 +28,7 @@ object EventsByTagMigrationTest { "jdbc-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString)) } -abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(config, migrationConfigOverride) { +abstract class EventsByTagMigrationTest(configS: String) extends QueryTestSpec(configS, migrationConfigOverride) { final val NoMsgTime: FiniteDuration = 100.millis val tagTableCfg = journalConfig.eventTagTableConfiguration @@ -158,7 +158,7 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co // override this, so we can reset the value. def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = { - val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) { + val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(configS)) { case (conf, (path, configValue)) => conf.withValue(path, configValue) } From 19f484118e19bc5e977ee392e550eb28a485ca61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 8 Mar 2024 09:58:27 +0100 Subject: [PATCH 15/15] bump: slick 3.5.0 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 957ed1e3c..cbbfc18da 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -10,7 +10,7 @@ object Dependencies { val AkkaVersion = "2.9.0" val AkkaBinaryVersion = AkkaVersion.take(3) - val SlickVersion = "3.5.0-RC1" + val SlickVersion = "3.5.0" val ScalaTestVersion = "3.2.18" val JdbcDrivers = Seq(