From c56753dab7db3e1a6a129153a673400efa234d98 Mon Sep 17 00:00:00 2001 From: Renato Cavalcanti Date: Tue, 9 Nov 2021 21:16:33 +0100 Subject: [PATCH] Migration tool (#603) Co-authored-by: Gabriele Favaretto Co-authored-by: Renato Cavalcanti Co-authored-by: Arsene Tochemey GANDOTE --- .github/workflows/checks.yml | 1 + .github/workflows/h2-test.yml | 1 + .github/workflows/mysql-tests.yml | 1 + .github/workflows/oracle-tests.yml | 1 + .github/workflows/postgres-tests.yml | 1 + .github/workflows/sqlserver-tests.yml | 1 + build.sbt | 16 +- .../db/migration/postgres/V001__test.sql | 0 .../db/migration/postgres/V002__test-2.sql | 9 - migration/src/main/resources/logback.xml | 21 - migration/src/main/resources/reference.conf | 9 - .../persistence/jdbc/migration/Main.scala | 36 -- .../postgres/V003__UpdateUsers.scala | 21 - .../src/test/resources/postgres/init.sql | 25 -- .../jdbc/migration/PostgresSpec.scala | 78 ---- .../integration/JournalMigratorTest.scala | 12 + .../integration/SnapshotMigratorTest.scala | 12 + .../jdbc/migrator/JournalMigrator.scala | 151 +++++++ .../jdbc/migrator/SnapshotMigrator.scala | 97 +++++ migrator/src/test/resources/general.conf | 47 +++ .../src/test/resources/h2-application.conf | 42 ++ .../src/test/resources/mysql-application.conf | 48 +++ .../test/resources/oracle-application.conf | 50 +++ .../test/resources/postgres-application.conf | 48 +++ .../schema/h2/h2-create-schema-legacy.sql | 34 ++ .../resources/schema/h2/h2-create-schema.sql | 54 +++ .../schema/h2/h2-drop-schema-legacy.sql | 3 + .../resources/schema/h2/h2-drop-schema.sql | 4 + .../mysql/mysql-create-schema-legacy.sql | 18 + .../schema/mysql/mysql-create-schema.sql | 38 ++ .../schema/mysql/mysql-drop-schema-legacy.sql | 2 + .../schema/mysql/mysql-drop-schema.sql | 3 + .../oracle/oracle-create-schema-legacy.sql | 44 ++ .../schema/oracle/oracle-create-schema.sql | 57 +++ .../oracle/oracle-drop-schema-legacy.sql | 21 + .../schema/oracle/oracle-drop-schema.sql | 20 + .../postgres-create-schema-legacy.sql | 32 ++ .../postgres/postgres-create-schema.sql | 62 +++ .../postgres/postgres-drop-schema-legacy.sql | 3 + .../schema/postgres/postgres-drop-schema.sql | 5 + .../sqlserver-create-schema-legacy.sql | 24 ++ .../sqlserver/sqlserver-create-schema.sql | 42 ++ .../sqlserver-drop-schema-legacy.sql | 2 + .../sqlserver/sqlserver-drop-schema.sql | 3 + .../test/resources/sqlserver-application.conf | 65 +++ .../jdbc/migrator/JournalMigratorTest.scala | 126 ++++++ .../jdbc/migrator/MigratorSpec.scala | 396 ++++++++++++++++++ .../jdbc/migrator/SnapshotMigratorTest.scala | 50 +++ project/Dependencies.scala | 2 - 49 files changed, 1631 insertions(+), 207 deletions(-) delete mode 100644 migration/src/main/resources/db/migration/postgres/V001__test.sql delete mode 100644 migration/src/main/resources/db/migration/postgres/V002__test-2.sql delete mode 100644 migration/src/main/resources/logback.xml delete mode 100644 migration/src/main/resources/reference.conf delete mode 100644 migration/src/main/scala/akka/persistence/jdbc/migration/Main.scala delete mode 100644 migration/src/main/scala/db/migration/postgres/V003__UpdateUsers.scala delete mode 100644 migration/src/test/resources/postgres/init.sql delete mode 100644 migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala create mode 100644 migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/JournalMigratorTest.scala create mode 100644 migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/SnapshotMigratorTest.scala create mode 100644 migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala create mode 100644 migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala create mode 100644 migrator/src/test/resources/general.conf create mode 100644 migrator/src/test/resources/h2-application.conf create mode 100644 migrator/src/test/resources/mysql-application.conf create mode 100644 migrator/src/test/resources/oracle-application.conf create mode 100644 migrator/src/test/resources/postgres-application.conf create mode 100644 migrator/src/test/resources/schema/h2/h2-create-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/h2/h2-create-schema.sql create mode 100644 migrator/src/test/resources/schema/h2/h2-drop-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/h2/h2-drop-schema.sql create mode 100644 migrator/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/mysql/mysql-create-schema.sql create mode 100644 migrator/src/test/resources/schema/mysql/mysql-drop-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/mysql/mysql-drop-schema.sql create mode 100644 migrator/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/oracle/oracle-create-schema.sql create mode 100644 migrator/src/test/resources/schema/oracle/oracle-drop-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/oracle/oracle-drop-schema.sql create mode 100644 migrator/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/postgres/postgres-create-schema.sql create mode 100644 migrator/src/test/resources/schema/postgres/postgres-drop-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/postgres/postgres-drop-schema.sql create mode 100644 migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql create mode 100644 migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema-legacy.sql create mode 100644 migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema.sql create mode 100644 migrator/src/test/resources/sqlserver-application.conf create mode 100644 migrator/src/test/scala/akka/persistence/jdbc/migrator/JournalMigratorTest.scala create mode 100644 migrator/src/test/scala/akka/persistence/jdbc/migrator/MigratorSpec.scala create mode 100644 migrator/src/test/scala/akka/persistence/jdbc/migrator/SnapshotMigratorTest.scala diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index a7a660ed6..8ff4fea3c 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -5,6 +5,7 @@ on: push: branches: - master + - migration-tool # remove before merging to master tags-ignore: [ v.* ] jobs: diff --git a/.github/workflows/h2-test.yml b/.github/workflows/h2-test.yml index 4202f4ef5..5bdf2e887 100644 --- a/.github/workflows/h2-test.yml +++ b/.github/workflows/h2-test.yml @@ -5,6 +5,7 @@ on: push: branches: - master + - migration-tool # remove before merging to master tags-ignore: [ v.* ] jobs: diff --git a/.github/workflows/mysql-tests.yml b/.github/workflows/mysql-tests.yml index c539fd936..6ba236b23 100644 --- a/.github/workflows/mysql-tests.yml +++ b/.github/workflows/mysql-tests.yml @@ -5,6 +5,7 @@ on: push: branches: - master + - migration-tool # remove before merging to master tags-ignore: [ v.* ] jobs: diff --git a/.github/workflows/oracle-tests.yml b/.github/workflows/oracle-tests.yml index fd58a719c..89f2a4450 100644 --- a/.github/workflows/oracle-tests.yml +++ b/.github/workflows/oracle-tests.yml @@ -5,6 +5,7 @@ on: push: branches: - master + - migration-tool # remove before merging to master tags-ignore: [ v.* ] jobs: diff --git a/.github/workflows/postgres-tests.yml b/.github/workflows/postgres-tests.yml index cdf1d39d8..b3e7bde50 100644 --- a/.github/workflows/postgres-tests.yml +++ b/.github/workflows/postgres-tests.yml @@ -5,6 +5,7 @@ on: push: branches: - master + - migration-tool # remove before merging to master tags-ignore: [ v.* ] jobs: diff --git a/.github/workflows/sqlserver-tests.yml b/.github/workflows/sqlserver-tests.yml index 67baad746..46681ad1d 100644 --- a/.github/workflows/sqlserver-tests.yml +++ b/.github/workflows/sqlserver-tests.yml @@ -5,6 +5,7 @@ on: push: branches: - master + - migration-tool # remove before merging to master tags-ignore: [ v.* ] jobs: diff --git a/build.sbt b/build.sbt index 08414dbf8..3747c2d9c 100644 --- a/build.sbt +++ b/build.sbt @@ -1,13 +1,13 @@ import com.lightbend.paradox.apidoc.ApidocPlugin.autoImport.apidocRootPackage // FIXME remove switching to final Akka version -resolvers in ThisBuild += "Akka Snapshots".at("https://oss.sonatype.org/content/repositories/snapshots/") +ThisBuild / resolvers += "Akka Snapshots".at("https://oss.sonatype.org/content/repositories/snapshots/") lazy val `akka-persistence-jdbc` = project .in(file(".")) .enablePlugins(ScalaUnidocPlugin) .disablePlugins(MimaPlugin, SitePlugin) - .aggregate(core, migration, docs) + .aggregate(core, docs, migrator) .settings(publish / skip := true) lazy val core = project @@ -24,13 +24,17 @@ lazy val core = project organization.value %% name.value % previousStableVersion.value.getOrElse( throw new Error("Unable to determine previous version for MiMa")))) -lazy val migration = project - .in(file("migration")) +lazy val migrator = project + .in(file("migrator")) .disablePlugins(SitePlugin, MimaPlugin) + .configs(IntegrationTest.extend(Test)) + .settings(Defaults.itSettings) .settings( - name := "akka-persistence-jdbc-migration", - libraryDependencies ++= Dependencies.Migration, + name := "akka-persistence-jdbc-migrator", + libraryDependencies ++= Dependencies.Migration ++ Dependencies.Libraries, + // TODO remove this when ready to publish it publish / skip := true) + .dependsOn(core % "compile->compile;test->test") lazy val docs = project .enablePlugins(ProjectAutoPlugin, AkkaParadoxPlugin, ParadoxSitePlugin, PreprocessPlugin, PublishRsyncPlugin) diff --git a/migration/src/main/resources/db/migration/postgres/V001__test.sql b/migration/src/main/resources/db/migration/postgres/V001__test.sql deleted file mode 100644 index e69de29bb..000000000 diff --git a/migration/src/main/resources/db/migration/postgres/V002__test-2.sql b/migration/src/main/resources/db/migration/postgres/V002__test-2.sql deleted file mode 100644 index 1661fb77c..000000000 --- a/migration/src/main/resources/db/migration/postgres/V002__test-2.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE TABLE IF NOT EXISTS public.migrated2 ( - persistence_id VARCHAR(255) NOT NULL, - sequence_number BIGINT NOT NULL, - created BIGINT NOT NULL, - snapshot BYTEA NOT NULL -); -CREATE TABLE test_user ( - name VARCHAR(200) -); diff --git a/migration/src/main/resources/logback.xml b/migration/src/main/resources/logback.xml deleted file mode 100644 index 16b67de61..000000000 --- a/migration/src/main/resources/logback.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - debug - - - %date{ISO8601} - %logger -> %-5level[%thread] %logger{0} - %msg%n - - - - - - - - - - - - diff --git a/migration/src/main/resources/reference.conf b/migration/src/main/resources/reference.conf deleted file mode 100644 index 413c9f220..000000000 --- a/migration/src/main/resources/reference.conf +++ /dev/null @@ -1,9 +0,0 @@ -akka-persistence-jdbc { - migration { - # supported values: postgres - database-vendor = "" - url = "" - user = "" - password = "" - } -} diff --git a/migration/src/main/scala/akka/persistence/jdbc/migration/Main.scala b/migration/src/main/scala/akka/persistence/jdbc/migration/Main.scala deleted file mode 100644 index 55e843ca3..000000000 --- a/migration/src/main/scala/akka/persistence/jdbc/migration/Main.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (C) 2014 - 2019 Dennis Vriend - * Copyright (C) 2019 - 2021 Lightbend Inc. - */ - -package akka.persistence.jdbc.migration - -import com.typesafe.config.{ Config, ConfigFactory } -import org.flywaydb.core.Flyway -import org.flywaydb.core.api.Location - -object Main extends App { - - val config = ConfigFactory.load().getConfig("akka-persistence-jdbc.migration") - - def run(config: Config): Unit = { - val vendor = config.getString("database-vendor") - val url = config.getString("url") - val user = config.getString("user") - val password = config.getString("password") - - val flywayConfig = Flyway.configure.dataSource(url, user, password).table("apjdbc_schema_history") - - vendor match { - case "postgres" => - flywayConfig.locations(new Location("classpath:db/migration/postgres")) - case other => - sys.error(s"Akka Persistence JDBC migrations do not support `$other` (supported are `postgres`)") - } - - val flyway = flywayConfig.load - flyway.baseline() - flyway.migrate() - - } -} diff --git a/migration/src/main/scala/db/migration/postgres/V003__UpdateUsers.scala b/migration/src/main/scala/db/migration/postgres/V003__UpdateUsers.scala deleted file mode 100644 index 409fd03a9..000000000 --- a/migration/src/main/scala/db/migration/postgres/V003__UpdateUsers.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (C) 2014 - 2019 Dennis Vriend - * Copyright (C) 2019 - 2021 Lightbend Inc. - */ - -package db.migration.postgres - -import org.flywaydb.core.api.migration.{ BaseJavaMigration, Context } - -class V003__UpdateUsers extends BaseJavaMigration { - - @throws[Exception] - override def migrate(context: Context): Unit = { - try { - val statement = context.getConnection.prepareStatement("INSERT INTO test_user (name) VALUES ('Obelix')") - try statement.execute - finally if (statement != null) statement.close() - } - } - -} diff --git a/migration/src/test/resources/postgres/init.sql b/migration/src/test/resources/postgres/init.sql deleted file mode 100644 index 232eba2f3..000000000 --- a/migration/src/test/resources/postgres/init.sql +++ /dev/null @@ -1,25 +0,0 @@ -DROP TABLE IF EXISTS public.apjdbc_schema_history; - -DROP TABLE IF EXISTS public.journal; - -CREATE TABLE IF NOT EXISTS public.journal ( - ordering BIGSERIAL, - persistence_id VARCHAR(255) NOT NULL, - sequence_number BIGINT NOT NULL, - deleted BOOLEAN DEFAULT FALSE NOT NULL, - tags VARCHAR(255) DEFAULT NULL, - message BYTEA NOT NULL, - PRIMARY KEY(persistence_id, sequence_number) -); - -CREATE UNIQUE INDEX journal_ordering_idx ON public.journal(ordering); - -DROP TABLE IF EXISTS public.snapshot; - -CREATE TABLE IF NOT EXISTS public.snapshot ( - persistence_id VARCHAR(255) NOT NULL, - sequence_number BIGINT NOT NULL, - created BIGINT NOT NULL, - snapshot BYTEA NOT NULL, - PRIMARY KEY(persistence_id, sequence_number) -); diff --git a/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala b/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala deleted file mode 100644 index d2a0be04a..000000000 --- a/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (C) 2014 - 2019 Dennis Vriend - * Copyright (C) 2019 - 2021 Lightbend Inc. - */ - -package akka.persistence.jdbc.migration - -import java.sql.{ Connection, DriverManager } -import java.util.Properties - -import com.typesafe.config.{ Config, ConfigFactory } -import org.scalatest.BeforeAndAfterAll -import org.testcontainers.containers.PostgreSQLContainer -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class PostgresSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll { - - val postgres: PostgreSQLContainer[_] = { - val c = new PostgreSQLContainer("postgres:13.1") - c.withDatabaseName("public") - c.withInitScript("postgres/init.sql") - c - } - var migrationConfig: Config = null - val connectionProperties = new Properties() - - override def beforeAll(): Unit = { - postgres.start() - migrationConfig = ConfigFactory - .parseString(s"""migration { - |database-vendor = postgres - |url = "${postgres.getJdbcUrl}" - |user = "${postgres.getUsername}" - |password = "${postgres.getPassword}" - |}""".stripMargin) - .getConfig("migration") - - connectionProperties.put("user", postgres.getUsername); - connectionProperties.put("password", postgres.getPassword); - } - - override def afterAll(): Unit = { - postgres.stop() - } - - "Migration 002" should "be applied" in { - Main.run(migrationConfig) - val connection = DriverManager.getConnection(postgres.getJdbcUrl, connectionProperties); - println(existingTables(connection)) - val stmt = connection.createStatement() - stmt.executeQuery("SELECT * FROM migrated2;") - } - - "Scala migration 003" should "be applied" in { - Main.run(migrationConfig) - val connection = DriverManager.getConnection(postgres.getJdbcUrl, connectionProperties); - println(existingTables(connection)) - val stmt = connection.createStatement() - val rs = stmt.executeQuery("SELECT * FROM test_user;") - val sb = new StringBuilder() - while (rs.next()) { - sb.append(rs.getString(1)).append("\n") - } - sb.toString() shouldBe "Obelix\n" - } - - private def existingTables(connection: Connection) = { - val stmt = connection.createStatement() - val rs = stmt.executeQuery( - "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema');") - val sb = new StringBuilder("Existing tables:\n") - while (rs.next()) { - sb.append(" " + rs.getString(1) + "." + rs.getString(2) + "\n") - } - sb.toString() - } -} diff --git a/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/JournalMigratorTest.scala b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/JournalMigratorTest.scala new file mode 100644 index 000000000..1fdd34da9 --- /dev/null +++ b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/JournalMigratorTest.scala @@ -0,0 +1,12 @@ +package akka.persistence.jdbc.migrator.integration + +import akka.persistence.jdbc.migrator.MigratorSpec._ +import akka.persistence.jdbc.migrator.JournalMigratorTest + +class PostgresJournalMigratorTest extends JournalMigratorTest("postgres-application.conf") with PostgresCleaner + +class MySQLJournalMigratorTest extends JournalMigratorTest("mysql-application.conf") with MysqlCleaner + +class OracleJournalMigratorTest extends JournalMigratorTest("oracle-application.conf") with OracleCleaner + +class SqlServerJournalMigratorTest extends JournalMigratorTest("sqlserver-application.conf") with SqlServerCleaner diff --git a/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/SnapshotMigratorTest.scala b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/SnapshotMigratorTest.scala new file mode 100644 index 000000000..434f093c0 --- /dev/null +++ b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/SnapshotMigratorTest.scala @@ -0,0 +1,12 @@ +package akka.persistence.jdbc.migrator.integration + +import akka.persistence.jdbc.migrator.MigratorSpec._ +import akka.persistence.jdbc.migrator.SnapshotMigratorTest + +class PostgresSnapshotMigratorTest extends SnapshotMigratorTest("postgres-application.conf") with PostgresCleaner + +class MySQLSnapshotMigratorTest extends SnapshotMigratorTest("mysql-application.conf") with MysqlCleaner + +class OracleSnapshotMigratorTest extends SnapshotMigratorTest("oracle-application.conf") with OracleCleaner + +class SqlServerSnapshotMigratorTest extends SnapshotMigratorTest("sqlserver-application.conf") with SqlServerCleaner diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala new file mode 100644 index 000000000..ed4fa8b9e --- /dev/null +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.migrator + +import akka.Done +import akka.actor.ActorSystem +import akka.persistence.PersistentRepr +import akka.persistence.jdbc.AkkaSerialization +import akka.persistence.jdbc.config.{ JournalConfig, ReadJournalConfig } +import akka.persistence.jdbc.db.SlickExtension +import akka.persistence.jdbc.journal.dao.JournalQueries +import akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalSerializer +import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow } +import akka.persistence.jdbc.migrator.JournalMigrator.{ JournalConfig, ReadJournalConfig } +import akka.persistence.jdbc.query.dao.legacy.ReadJournalQueries +import akka.serialization.{ Serialization, SerializationExtension } +import akka.stream.scaladsl.Source +import org.slf4j.{ Logger, LoggerFactory } +import slick.jdbc._ + +import scala.concurrent.{ ExecutionContextExecutor, Future } +import scala.util.{ Failure, Success } + +/** + * This will help migrate the legacy journal data onto the new journal schema with the + * appropriate serialization + * + * @param system the actor system + */ +final case class JournalMigrator(profile: JdbcProfile)(implicit system: ActorSystem) { + implicit val ec: ExecutionContextExecutor = system.dispatcher + + import profile.api._ + + val log: Logger = LoggerFactory.getLogger(getClass) + + // get the various configurations + private val journalConfig: JournalConfig = new JournalConfig(system.settings.config.getConfig(JournalConfig)) + private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig( + system.settings.config.getConfig(ReadJournalConfig)) + + // the journal database + private val journalDB: JdbcBackend.Database = + SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database + + // get an instance of the new journal queries + private val newJournalQueries: JournalQueries = + new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration) + + // let us get the journal reader + private val serialization: Serialization = SerializationExtension(system) + private val legacyJournalQueries: ReadJournalQueries = new ReadJournalQueries(profile, readJournalConfig) + private val serializer: ByteArrayJournalSerializer = + new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator) + + private val bufferSize: Int = journalConfig.daoConfig.bufferSize + + private val query = + legacyJournalQueries.JournalTable.result + .withStatementParameters( + rsType = ResultSetType.ForwardOnly, + rsConcurrency = ResultSetConcurrency.ReadOnly, + fetchSize = bufferSize) + .transactionally + + /** + * write all legacy events into the new journal tables applying the proper serialization + */ + def migrate(): Future[Done] = Source + .fromPublisher(journalDB.stream(query)) + .via(serializer.deserializeFlow) + .map { + case Success((repr, tags, ordering)) => (repr, tags, ordering) + case Failure(exception) => throw exception // blow-up on failure + } + .map { case (repr, tags, ordering) => serialize(repr, tags, ordering) } + // get pages of many records at once + .grouped(bufferSize) + .mapAsync(1)(records => { + val stmt: DBIO[Unit] = records + // get all the sql statements for this record as an option + .map { case (newRepr, newTags) => + log.debug(s"migrating event for PersistenceID: ${newRepr.persistenceId} with tags ${newTags.mkString(",")}") + writeJournalRowsStatements(newRepr, newTags) + } + // reduce to 1 statement + .foldLeft[DBIO[Unit]](DBIO.successful[Unit] {})((priorStmt, nextStmt) => { + priorStmt.andThen(nextStmt) + }) + + journalDB.run(stmt) + }) + .run() + + /** + * serialize the PersistentRepr and construct a JournalAkkaSerializationRow and set of matching tags + * + * @param repr the PersistentRepr + * @param tags the tags + * @param ordering the ordering of the PersistentRepr + * @return the tuple of JournalAkkaSerializationRow and set of tags + */ + private def serialize( + repr: PersistentRepr, + tags: Set[String], + ordering: Long): (JournalAkkaSerializationRow, Set[String]) = { + + val serializedPayload: AkkaSerialization.AkkaSerialized = + AkkaSerialization.serialize(serialization, repr.payload).get + + val serializedMetadata: Option[AkkaSerialization.AkkaSerialized] = + repr.metadata.flatMap(m => AkkaSerialization.serialize(serialization, m).toOption) + val row: JournalAkkaSerializationRow = JournalAkkaSerializationRow( + ordering, + repr.deleted, + repr.persistenceId, + repr.sequenceNr, + repr.writerUuid, + repr.timestamp, + repr.manifest, + serializedPayload.payload, + serializedPayload.serId, + serializedPayload.serManifest, + serializedMetadata.map(_.payload), + serializedMetadata.map(_.serId), + serializedMetadata.map(_.serManifest)) + + (row, tags) + } + + private def writeJournalRowsStatements( + journalSerializedRow: JournalAkkaSerializationRow, + tags: Set[String]): DBIO[Unit] = { + val journalInsert: DBIO[Long] = newJournalQueries.JournalTable + .returning(newJournalQueries.JournalTable.map(_.ordering)) + .forceInsert(journalSerializedRow) + + val tagInserts = + newJournalQueries.TagTable ++= tags.map(tag => TagRow(journalSerializedRow.ordering, tag)).toSeq + + journalInsert.flatMap(_ => tagInserts.asInstanceOf[DBIO[Unit]]) + } +} + +case object JournalMigrator { + final val JournalConfig: String = "jdbc-journal" + final val ReadJournalConfig: String = "jdbc-read-journal" +} diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala new file mode 100644 index 000000000..99ad97685 --- /dev/null +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + +package akka.persistence.jdbc.migrator + +import akka.actor.ActorSystem +import akka.persistence.SnapshotMetadata +import akka.persistence.jdbc.config.{ ReadJournalConfig, SnapshotConfig } +import akka.persistence.jdbc.db.SlickExtension +import akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao +import akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao +import akka.persistence.jdbc.snapshot.dao.legacy.{ ByteArraySnapshotSerializer, SnapshotQueries } +import akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables.SnapshotRow +import akka.serialization.{ Serialization, SerializationExtension } +import akka.stream.scaladsl.{ Sink, Source } +import akka.Done +import akka.persistence.jdbc.migrator.JournalMigrator.ReadJournalConfig +import akka.persistence.jdbc.migrator.SnapshotMigrator.{ NoParallelism, SnapshotStoreConfig } +import org.slf4j.{ Logger, LoggerFactory } +import slick.jdbc +import slick.jdbc.{ JdbcBackend, JdbcProfile } + +import scala.concurrent.Future + +/** + * This will help migrate the legacy snapshot data onto the new snapshot schema with the + * appropriate serialization + * + * @param system the actor system + */ +case class SnapshotMigrator(profile: JdbcProfile)(implicit system: ActorSystem) { + val log: Logger = LoggerFactory.getLogger(getClass) + + import system.dispatcher + import profile.api._ + + private val snapshotConfig: SnapshotConfig = new SnapshotConfig(system.settings.config.getConfig(SnapshotStoreConfig)) + private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig( + system.settings.config.getConfig(ReadJournalConfig)) + + private val snapshotDB: jdbc.JdbcBackend.Database = + SlickExtension(system).database(system.settings.config.getConfig(SnapshotStoreConfig)).database + + private val journalDB: JdbcBackend.Database = + SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database + + private val serialization: Serialization = SerializationExtension(system) + private val queries: SnapshotQueries = new SnapshotQueries(profile, snapshotConfig.legacySnapshotTableConfiguration) + private val serializer: ByteArraySnapshotSerializer = new ByteArraySnapshotSerializer(serialization) + + // get the instance if the default snapshot dao + private val defaultSnapshotDao: DefaultSnapshotDao = + new DefaultSnapshotDao(snapshotDB, profile, snapshotConfig, serialization) + + // get the instance of the legacy journal DAO + private val legacyJournalDao: ByteArrayReadJournalDao = + new ByteArrayReadJournalDao(journalDB, profile, readJournalConfig, SerializationExtension(system)) + + private def toSnapshotData(row: SnapshotRow): (SnapshotMetadata, Any) = serializer.deserialize(row).get + + /** + * migrate the latest snapshot data + */ + def migrateLatest(): Future[Done] = { + legacyJournalDao + .allPersistenceIdsSource(Long.MaxValue) + .mapAsync(NoParallelism) { persistenceId => + // let us fetch the latest snapshot for each persistenceId + snapshotDB.run(queries.selectLatestByPersistenceId(persistenceId).result).map { rows => + rows.headOption.map(toSnapshotData).map { case (metadata, value) => + log.debug(s"migrating snapshot for ${metadata.toString}") + defaultSnapshotDao.save(metadata, value) + } + } + } + .runWith(Sink.ignore) + } + + /** + * migrate all the legacy snapshot schema data into the new snapshot schema + */ + def migrateAll(): Future[Done] = Source + .fromPublisher(snapshotDB.stream(queries.SnapshotTable.result)) + .mapAsync(NoParallelism) { record => + val (metadata, value) = toSnapshotData(record) + log.debug(s"migrating snapshot for ${metadata.toString}") + defaultSnapshotDao.save(metadata, value) + } + .run() +} + +case object SnapshotMigrator { + final val SnapshotStoreConfig: String = "jdbc-snapshot-store" + final val NoParallelism: Int = 1 +} diff --git a/migrator/src/test/resources/general.conf b/migrator/src/test/resources/general.conf new file mode 100644 index 000000000..8680b4e7e --- /dev/null +++ b/migrator/src/test/resources/general.conf @@ -0,0 +1,47 @@ +# Copyright (C) 2019 - 2021 Lightbend Inc. +# + +// This file contains the general settings which are shared in all akka-persistence-jdbc tests + +akka { + stdout-loglevel = off // defaults to WARNING can be disabled with off. The stdout-loglevel is only in effect during system startup and shutdown + log-dead-letters-during-shutdown = on + loglevel = debug + log-dead-letters = on + log-config-on-start = off // Log the complete configuration at INFO level when the actor system is started + + loggers = ["akka.event.slf4j.Slf4jLogger"] + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + + actor { + // Required until https://github.com/akka/akka/pull/28333 is available + allow-java-serialization = on + debug { + receive = on // log all messages sent to an actor if that actors receive method is a LoggingReceive + autoreceive = off // log all special messages like Kill, PoisoffPill etc sent to all actors + lifecycle = off // log all actor lifecycle events of all actors + fsm = off // enable logging of all events, transitioffs and timers of FSM Actors that extend LoggingFSM + event-stream = off // enable logging of subscriptions (subscribe/unsubscribe) on the ActorSystem.eventStream + } + } +} + +docker { + host = "localhost" + host = ${?VM_HOST} +} + +jdbc-journal { + event-adapters { + event-adapter = "akka.persistence.jdbc.migrator.MigratorSpec$AccountEventAdapter" + } + + event-adapter-bindings { + "akka.persistence.jdbc.migrator.MigratorSpec$AccountEvent" = event-adapter + } +} + +// Default configurations of legacy and non-legacy snapshot tables are both set with the same name (tableName = "snapshot"); So we have to distinguish them with a different name +jdbc-snapshot-store.tables.legacy_snapshot.tableName = "legacy_snapshot" + +slick.db.idleTimeout = 10000 // 10 seconds diff --git a/migrator/src/test/resources/h2-application.conf b/migrator/src/test/resources/h2-application.conf new file mode 100644 index 000000000..2c7f048bf --- /dev/null +++ b/migrator/src/test/resources/h2-application.conf @@ -0,0 +1,42 @@ +# Copyright (C) 2019 - 2021 Lightbend Inc. + +// general.conf is included only for shared settings used for the akka-persistence-jdbc tests +include "general.conf" + +akka { + persistence { + journal { + plugin = "jdbc-journal" + } + snapshot-store { + plugin = "jdbc-snapshot-store" + } + } +} + +jdbc-journal { + slick = ${slick} +} + +# the akka-persistence-snapshot-store in use +jdbc-snapshot-store { + slick = ${slick} +} + +# the akka-persistence-query provider in use +jdbc-read-journal { + slick = ${slick} +} + +slick { + profile = "slick.jdbc.H2Profile$" + db { + url = "jdbc:h2:mem:test-database;DATABASE_TO_UPPER=false;" + user = "root" + password = "root" + driver = "org.h2.Driver" + numThreads = 5 + maxConnections = 5 + minConnections = 1 + } +} diff --git a/migrator/src/test/resources/mysql-application.conf b/migrator/src/test/resources/mysql-application.conf new file mode 100644 index 000000000..6b9cf8b46 --- /dev/null +++ b/migrator/src/test/resources/mysql-application.conf @@ -0,0 +1,48 @@ +# Copyright (C) 2019 - 2021 Lightbend Inc. + +// general.conf is included only for shared settings used for the akka-persistence-jdbc tests +include "general.conf" + +akka { + persistence { + journal { + plugin = "jdbc-journal" + // Enable the line below to automatically start the journal when the actorsystem is started + // auto-start-journals = ["jdbc-journal"] + } + snapshot-store { + plugin = "jdbc-snapshot-store" + // Enable the line below to automatically start the snapshot-store when the actorsystem is started + // auto-start-snapshot-stores = ["jdbc-snapshot-store"] + } + } +} + +jdbc-journal { + slick = ${slick} +} + +# the akka-persistence-snapshot-store in use +jdbc-snapshot-store { + slick = ${slick} +} + +# the akka-persistence-query provider in use +jdbc-read-journal { + slick = ${slick} +} + +slick { + profile = "slick.jdbc.MySQLProfile$" + db { + host = ${docker.host} + host = ${?DB_HOST} + url = "jdbc:mysql://"${slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true" + user = "root" + password = "root" + driver = "com.mysql.cj.jdbc.Driver" + numThreads = 5 + maxConnections = 5 + minConnections = 1 + } +} diff --git a/migrator/src/test/resources/oracle-application.conf b/migrator/src/test/resources/oracle-application.conf new file mode 100644 index 000000000..c1e072b86 --- /dev/null +++ b/migrator/src/test/resources/oracle-application.conf @@ -0,0 +1,50 @@ +# Copyright (C) 2019 - 2021 Lightbend Inc. + +// general.conf is included only for shared settings used for the akka-persistence-jdbc tests +include "general.conf" +include "oracle-schema-overrides.conf" + +akka { + persistence { + journal { + plugin = "jdbc-journal" + // Enable the line below to automatically start the journal when the actorsystem is started + // auto-start-journals = ["jdbc-journal"] + } + snapshot-store { + plugin = "jdbc-snapshot-store" + // Enable the line below to automatically start the snapshot-store when the actorsystem is started + // auto-start-snapshot-stores = ["jdbc-snapshot-store"] + } + } +} + + +jdbc-journal { + slick = ${slick} +} + +# the akka-persistence-snapshot-store in use +jdbc-snapshot-store { + slick = ${slick} +} + +# the akka-persistence-query provider in use +jdbc-read-journal { + slick = ${slick} +} + +slick { + profile = "slick.jdbc.OracleProfile$" + db { + host = ${docker.host} + host = ${?DB_HOST} + url = "jdbc:oracle:thin:@//"${slick.db.host}":1521/xe" + user = "system" + password = "oracle" + driver = "oracle.jdbc.OracleDriver" + numThreads = 5 + maxConnections = 5 + minConnections = 1 + } +} diff --git a/migrator/src/test/resources/postgres-application.conf b/migrator/src/test/resources/postgres-application.conf new file mode 100644 index 000000000..b93acaf22 --- /dev/null +++ b/migrator/src/test/resources/postgres-application.conf @@ -0,0 +1,48 @@ +# Copyright (C) 2019 - 2021 Lightbend Inc. + +// general.conf is included only for shared settings used for the akka-persistence-jdbc tests +include "general.conf" + +akka { + persistence { + journal { + plugin = "jdbc-journal" + // Enable the line below to automatically start the journal when the actorsystem is started + // auto-start-journals = ["jdbc-journal"] + } + snapshot-store { + plugin = "jdbc-snapshot-store" + // Enable the line below to automatically start the snapshot-store when the actorsystem is started + // auto-start-snapshot-stores = ["jdbc-snapshot-store"] + } + } +} + +jdbc-journal { + slick = ${slick} +} + +# the akka-persistence-snapshot-store in use +jdbc-snapshot-store { + slick = ${slick} +} + +# the akka-persistence-query provider in use +jdbc-read-journal { + slick = ${slick} +} + +slick { + profile = "slick.jdbc.PostgresProfile$" + db { + host = "localhost" + host = ${?DB_HOST} + url = "jdbc:postgresql://"${slick.db.host}":5432/docker?reWriteBatchedInserts=true" + user = "docker" + password = "docker" + driver = "org.postgresql.Driver" + numThreads = 5 + maxConnections = 5 + minConnections = 1 + } +} diff --git a/migrator/src/test/resources/schema/h2/h2-create-schema-legacy.sql b/migrator/src/test/resources/schema/h2/h2-create-schema-legacy.sql new file mode 100644 index 000000000..2a82b090d --- /dev/null +++ b/migrator/src/test/resources/schema/h2/h2-create-schema-legacy.sql @@ -0,0 +1,34 @@ +CREATE TABLE IF NOT EXISTS PUBLIC."journal" ( + "ordering" BIGINT AUTO_INCREMENT, + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" BIGINT NOT NULL, + "deleted" BOOLEAN DEFAULT FALSE NOT NULL, + "tags" VARCHAR(255) DEFAULT NULL, + "message" BYTEA NOT NULL, + PRIMARY KEY("persistence_id", "sequence_number") +); +CREATE UNIQUE INDEX IF NOT EXISTS "journal_ordering_idx" ON PUBLIC."journal"("ordering"); + +CREATE TABLE IF NOT EXISTS PUBLIC."legacy_snapshot" ( + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" BIGINT NOT NULL, + "created" BIGINT NOT NULL, + "snapshot" BYTEA NOT NULL, + PRIMARY KEY("persistence_id", "sequence_number") +); + + +CREATE TABLE IF NOT EXISTS "durable_state" ( + "global_offset" BIGINT NOT NULL AUTO_INCREMENT, + "persistence_id" VARCHAR(255) NOT NULL, + "revision" BIGINT NOT NULL, + "state_payload" BLOB NOT NULL, + "state_serial_id" INTEGER NOT NULL, + "state_serial_manifest" VARCHAR, + "tag" VARCHAR, + "state_timestamp" BIGINT NOT NULL, + PRIMARY KEY("persistence_id") + ); + +CREATE INDEX "state_tag_idx" on "durable_state" ("tag"); +CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset"); diff --git a/migrator/src/test/resources/schema/h2/h2-create-schema.sql b/migrator/src/test/resources/schema/h2/h2-create-schema.sql new file mode 100644 index 000000000..5167d1b92 --- /dev/null +++ b/migrator/src/test/resources/schema/h2/h2-create-schema.sql @@ -0,0 +1,54 @@ +CREATE TABLE IF NOT EXISTS "event_journal" ( + "ordering" BIGINT NOT NULL AUTO_INCREMENT, + "deleted" BOOLEAN DEFAULT false NOT NULL, + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" BIGINT NOT NULL, + "writer" VARCHAR NOT NULL, + "write_timestamp" BIGINT NOT NULL, + "adapter_manifest" VARCHAR NOT NULL, + "event_payload" BLOB NOT NULL, + "event_ser_id" INTEGER NOT NULL, + "event_ser_manifest" VARCHAR NOT NULL, + "meta_payload" BLOB, + "meta_ser_id" INTEGER, + "meta_ser_manifest" VARCHAR, + PRIMARY KEY("persistence_id","sequence_number") + ); + +CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering"); + +CREATE TABLE IF NOT EXISTS "event_tag" ( + "event_id" BIGINT NOT NULL, + "tag" VARCHAR NOT NULL, + PRIMARY KEY("event_id", "tag"), + CONSTRAINT fk_event_journal + FOREIGN KEY("event_id") + REFERENCES "event_journal"("ordering") + ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS "snapshot" ( + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" BIGINT NOT NULL, + "created" BIGINT NOT NULL,"snapshot_ser_id" INTEGER NOT NULL, + "snapshot_ser_manifest" VARCHAR NOT NULL, + "snapshot_payload" BLOB NOT NULL, + "meta_ser_id" INTEGER, + "meta_ser_manifest" VARCHAR, + "meta_payload" BLOB, + PRIMARY KEY("persistence_id","sequence_number") + ); + +CREATE TABLE IF NOT EXISTS "durable_state" ( + "global_offset" BIGINT NOT NULL AUTO_INCREMENT, + "persistence_id" VARCHAR(255) NOT NULL, + "revision" BIGINT NOT NULL, + "state_payload" BLOB NOT NULL, + "state_serial_id" INTEGER NOT NULL, + "state_serial_manifest" VARCHAR, + "tag" VARCHAR, + "state_timestamp" BIGINT NOT NULL, + PRIMARY KEY("persistence_id") + ); +CREATE INDEX "state_tag_idx" on "durable_state" ("tag"); +CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset"); diff --git a/migrator/src/test/resources/schema/h2/h2-drop-schema-legacy.sql b/migrator/src/test/resources/schema/h2/h2-drop-schema-legacy.sql new file mode 100644 index 000000000..499ed5b29 --- /dev/null +++ b/migrator/src/test/resources/schema/h2/h2-drop-schema-legacy.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS PUBLIC."journal"; +DROP TABLE IF EXISTS PUBLIC."legacy_snapshot"; +DROP TABLE IF EXISTS PUBLIC."durable_state"; diff --git a/migrator/src/test/resources/schema/h2/h2-drop-schema.sql b/migrator/src/test/resources/schema/h2/h2-drop-schema.sql new file mode 100644 index 000000000..3d5ab8e97 --- /dev/null +++ b/migrator/src/test/resources/schema/h2/h2-drop-schema.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS PUBLIC."event_tag"; +DROP TABLE IF EXISTS PUBLIC."event_journal"; +DROP TABLE IF EXISTS PUBLIC."snapshot"; +DROP TABLE IF EXISTS PUBLIC."durable_state"; diff --git a/migrator/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql b/migrator/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql new file mode 100644 index 000000000..841e65561 --- /dev/null +++ b/migrator/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql @@ -0,0 +1,18 @@ +CREATE TABLE IF NOT EXISTS journal ( + ordering SERIAL, + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + deleted BOOLEAN DEFAULT FALSE NOT NULL, + tags VARCHAR(255) DEFAULT NULL, + message BLOB NOT NULL, + PRIMARY KEY(persistence_id, sequence_number) +); +CREATE UNIQUE INDEX journal_ordering_idx ON journal(ordering); + +CREATE TABLE IF NOT EXISTS legacy_snapshot ( + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + created BIGINT NOT NULL, + snapshot BLOB NOT NULL, + PRIMARY KEY (persistence_id, sequence_number) +); diff --git a/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql b/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql new file mode 100644 index 000000000..5c57be277 --- /dev/null +++ b/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql @@ -0,0 +1,38 @@ +CREATE TABLE IF NOT EXISTS event_journal( + ordering SERIAL, + deleted BOOLEAN DEFAULT false NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + writer TEXT NOT NULL, + write_timestamp BIGINT NOT NULL, + adapter_manifest TEXT NOT NULL, + event_payload BLOB NOT NULL, + event_ser_id INTEGER NOT NULL, + event_ser_manifest TEXT NOT NULL, + meta_payload BLOB, + meta_ser_id INTEGER,meta_ser_manifest TEXT, + PRIMARY KEY(persistence_id,sequence_number) +); + +CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); + +CREATE TABLE IF NOT EXISTS event_tag ( + event_id BIGINT UNSIGNED NOT NULL, + tag VARCHAR(255) NOT NULL, + PRIMARY KEY(event_id, tag), + FOREIGN KEY (event_id) + REFERENCES event_journal(ordering) + ON DELETE CASCADE + ); + +CREATE TABLE IF NOT EXISTS snapshot ( + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + created BIGINT NOT NULL, + snapshot_ser_id INTEGER NOT NULL, + snapshot_ser_manifest TEXT NOT NULL, + snapshot_payload BLOB NOT NULL, + meta_ser_id INTEGER, + meta_ser_manifest TEXT, + meta_payload BLOB, + PRIMARY KEY (persistence_id, sequence_number)); diff --git a/migrator/src/test/resources/schema/mysql/mysql-drop-schema-legacy.sql b/migrator/src/test/resources/schema/mysql/mysql-drop-schema-legacy.sql new file mode 100644 index 000000000..7a3cc849f --- /dev/null +++ b/migrator/src/test/resources/schema/mysql/mysql-drop-schema-legacy.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS journal; +DROP TABLE IF EXISTS legacy_snapshot; diff --git a/migrator/src/test/resources/schema/mysql/mysql-drop-schema.sql b/migrator/src/test/resources/schema/mysql/mysql-drop-schema.sql new file mode 100644 index 000000000..750504e76 --- /dev/null +++ b/migrator/src/test/resources/schema/mysql/mysql-drop-schema.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS event_tag; +DROP TABLE IF EXISTS event_journal; +DROP TABLE IF EXISTS snapshot; diff --git a/migrator/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql b/migrator/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql new file mode 100644 index 000000000..8cbb05988 --- /dev/null +++ b/migrator/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql @@ -0,0 +1,44 @@ +CREATE SEQUENCE "ordering_seq" START WITH 1 INCREMENT BY 1 NOMAXVALUE +/ + +CREATE TABLE "journal" ( + "ordering" NUMERIC, + "deleted" char check ("deleted" in (0,1)) NOT NULL, + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" NUMERIC NOT NULL, + "tags" VARCHAR(255) DEFAULT NULL, + "message" BLOB NOT NULL, + PRIMARY KEY("persistence_id", "sequence_number") +) +/ + +CREATE UNIQUE INDEX "journal_ordering_idx" ON "journal"("ordering") +/ + +CREATE OR REPLACE TRIGGER "ordering_seq_trigger" +BEFORE INSERT ON "journal" +FOR EACH ROW +BEGIN + SELECT "ordering_seq".NEXTVAL INTO :NEW."ordering" FROM DUAL; +END; +/ + +CREATE OR REPLACE PROCEDURE "reset_sequence" +IS + l_value NUMBER; +BEGIN + EXECUTE IMMEDIATE 'SELECT "ordering_seq".nextval FROM dual' INTO l_value; + EXECUTE IMMEDIATE 'ALTER SEQUENCE "ordering_seq" INCREMENT BY -' || l_value || ' MINVALUE 0'; + EXECUTE IMMEDIATE 'SELECT "ordering_seq".nextval FROM dual' INTO l_value; + EXECUTE IMMEDIATE 'ALTER SEQUENCE "ordering_seq" INCREMENT BY 1 MINVALUE 0'; +END; +/ + +CREATE TABLE "legacy_snapshot" ( + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" NUMERIC NOT NULL, + "created" NUMERIC NOT NULL, + "snapshot" BLOB NOT NULL, + PRIMARY KEY ("persistence_id", "sequence_number") +) +/ \ No newline at end of file diff --git a/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql b/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql new file mode 100644 index 000000000..dde92755f --- /dev/null +++ b/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql @@ -0,0 +1,57 @@ +CREATE SEQUENCE EVENT_JOURNAL__ORDERING_SEQ START WITH 1 INCREMENT BY 1 NOMAXVALUE +/ + +CREATE TABLE EVENT_JOURNAL ( + ORDERING NUMERIC UNIQUE, + DELETED CHAR(1) DEFAULT 0 NOT NULL check (DELETED in (0, 1)), + PERSISTENCE_ID VARCHAR(255) NOT NULL, + SEQUENCE_NUMBER NUMERIC NOT NULL, + WRITER VARCHAR(255) NOT NULL, + WRITE_TIMESTAMP NUMBER(19) NOT NULL, + ADAPTER_MANIFEST VARCHAR(255), + EVENT_PAYLOAD BLOB NOT NULL, + EVENT_SER_ID NUMBER(10) NOT NULL, + EVENT_SER_MANIFEST VARCHAR(255), + META_PAYLOAD BLOB, + META_SER_ID NUMBER(10), + META_SER_MANIFEST VARCHAR(255), + PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER) + ) +/ + +CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOURNAL REFERENCING NEW AS NEW FOR EACH ROW WHEN (new.ORDERING is null) begin select EVENT_JOURNAL__ORDERING_seq.nextval into :new.ORDERING from sys.dual; end; +/ + +CREATE TABLE EVENT_TAG ( + EVENT_ID NUMERIC NOT NULL, + TAG VARCHAR(255) NOT NULL, + PRIMARY KEY(EVENT_ID, TAG), + FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING) + ON DELETE CASCADE + ) +/ + +CREATE TABLE SNAPSHOT ( + PERSISTENCE_ID VARCHAR(255) NOT NULL, + SEQUENCE_NUMBER NUMERIC NOT NULL, + CREATED NUMERIC NOT NULL, + SNAPSHOT_SER_ID NUMBER(10) NOT NULL, + SNAPSHOT_SER_MANIFEST VARCHAR(255), + SNAPSHOT_PAYLOAD BLOB NOT NULL, + META_SER_ID NUMBER(10), + META_SER_MANIFEST VARCHAR(255), + META_PAYLOAD BLOB, + PRIMARY KEY(PERSISTENCE_ID,SEQUENCE_NUMBER) + ) +/ + +CREATE OR REPLACE PROCEDURE "reset_sequence" +IS + l_value NUMBER; +BEGIN + EXECUTE IMMEDIATE 'SELECT EVENT_JOURNAL__ORDERING_SEQ.nextval FROM dual' INTO l_value; + EXECUTE IMMEDIATE 'ALTER SEQUENCE EVENT_JOURNAL__ORDERING_SEQ INCREMENT BY -' || l_value || ' MINVALUE 0'; + EXECUTE IMMEDIATE 'SELECT EVENT_JOURNAL__ORDERING_SEQ.nextval FROM dual' INTO l_value; + EXECUTE IMMEDIATE 'ALTER SEQUENCE EVENT_JOURNAL__ORDERING_SEQ INCREMENT BY 1 MINVALUE 0'; +END; +/ diff --git a/migrator/src/test/resources/schema/oracle/oracle-drop-schema-legacy.sql b/migrator/src/test/resources/schema/oracle/oracle-drop-schema-legacy.sql new file mode 100644 index 000000000..0d2ef1100 --- /dev/null +++ b/migrator/src/test/resources/schema/oracle/oracle-drop-schema-legacy.sql @@ -0,0 +1,21 @@ +-- (ddl lock timeout in seconds) this allows tests which are still writing to the db to finish gracefully +ALTER SESSION SET ddl_lock_timeout = 150 +/ + +DROP TABLE "journal" CASCADE CONSTRAINT +/ + +DROP TABLE "legacy_snapshot" CASCADE CONSTRAINT +/ + +DROP TABLE "deleted_to" CASCADE CONSTRAINT +/ + +DROP TRIGGER "ordering_seq_trigger" +/ + +DROP PROCEDURE "reset_sequence" +/ + +DROP SEQUENCE "ordering_seq" +/ diff --git a/migrator/src/test/resources/schema/oracle/oracle-drop-schema.sql b/migrator/src/test/resources/schema/oracle/oracle-drop-schema.sql new file mode 100644 index 000000000..ed69f1f0d --- /dev/null +++ b/migrator/src/test/resources/schema/oracle/oracle-drop-schema.sql @@ -0,0 +1,20 @@ +ALTER SESSION SET ddl_lock_timeout = 15 +/ + +DROP TABLE EVENT_TAG CASCADE CONSTRAINT +/ + +DROP TABLE EVENT_JOURNAL CASCADE CONSTRAINT +/ + +DROP TABLE SNAPSHOT CASCADE CONSTRAINT +/ + +DROP TABLE SNAPSHOT CASCADE CONSTRAINT +/ + +DROP SEQUENCE EVENT_JOURNAL__ORDERING_SEQ +/ + +DROP TRIGGER EVENT_JOURNAL__ORDERING_TRG +/ diff --git a/migrator/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql b/migrator/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql new file mode 100644 index 000000000..123d5dea7 --- /dev/null +++ b/migrator/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql @@ -0,0 +1,32 @@ +CREATE TABLE IF NOT EXISTS public.journal ( + ordering BIGSERIAL, + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + deleted BOOLEAN DEFAULT FALSE NOT NULL, + tags VARCHAR(255) DEFAULT NULL, + message BYTEA NOT NULL, + PRIMARY KEY(persistence_id, sequence_number) +); +CREATE UNIQUE INDEX IF NOT EXISTS journal_ordering_idx ON public.journal(ordering); + +CREATE TABLE IF NOT EXISTS public.legacy_snapshot ( + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + created BIGINT NOT NULL, + snapshot BYTEA NOT NULL, + PRIMARY KEY(persistence_id, sequence_number) +); + +CREATE TABLE IF NOT EXISTS public.durable_state ( + global_offset BIGSERIAL, + persistence_id VARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + state_payload BYTEA NOT NULL, + state_serial_id INTEGER NOT NULL, + state_serial_manifest VARCHAR(255), + tag VARCHAR, + state_timestamp BIGINT NOT NULL, + PRIMARY KEY(persistence_id) + ); +CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag); +CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset); diff --git a/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql b/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql new file mode 100644 index 000000000..7ae7e0999 --- /dev/null +++ b/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql @@ -0,0 +1,62 @@ +CREATE TABLE IF NOT EXISTS public.event_journal( + ordering BIGSERIAL, + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + deleted BOOLEAN DEFAULT FALSE NOT NULL, + + writer VARCHAR(255) NOT NULL, + write_timestamp BIGINT, + adapter_manifest VARCHAR(255), + + event_ser_id INTEGER NOT NULL, + event_ser_manifest VARCHAR(255) NOT NULL, + event_payload BYTEA NOT NULL, + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id, sequence_number) +); + +CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering); + +CREATE TABLE IF NOT EXISTS public.event_tag( + event_id BIGINT, + tag VARCHAR(256), + PRIMARY KEY(event_id, tag), + CONSTRAINT fk_event_journal + FOREIGN KEY(event_id) + REFERENCES event_journal(ordering) + ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS public.snapshot ( + persistence_id VARCHAR(255) NOT NULL, + sequence_number BIGINT NOT NULL, + created BIGINT NOT NULL, + + snapshot_ser_id INTEGER NOT NULL, + snapshot_ser_manifest VARCHAR(255) NOT NULL, + snapshot_payload BYTEA NOT NULL, + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id, sequence_number) +); + +CREATE TABLE IF NOT EXISTS public.durable_state ( + global_offset BIGSERIAL, + persistence_id VARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + state_payload BYTEA NOT NULL, + state_serial_id INTEGER NOT NULL, + state_serial_manifest VARCHAR(255), + tag VARCHAR, + state_timestamp BIGINT NOT NULL, + PRIMARY KEY(persistence_id) + ); +CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag); +CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset); diff --git a/migrator/src/test/resources/schema/postgres/postgres-drop-schema-legacy.sql b/migrator/src/test/resources/schema/postgres/postgres-drop-schema-legacy.sql new file mode 100644 index 000000000..ae824f1b8 --- /dev/null +++ b/migrator/src/test/resources/schema/postgres/postgres-drop-schema-legacy.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS public.journal; +DROP TABLE IF EXISTS public.legacy_snapshot; +DROP TABLE IF EXISTS public.durable_state; diff --git a/migrator/src/test/resources/schema/postgres/postgres-drop-schema.sql b/migrator/src/test/resources/schema/postgres/postgres-drop-schema.sql new file mode 100644 index 000000000..01cb9b461 --- /dev/null +++ b/migrator/src/test/resources/schema/postgres/postgres-drop-schema.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS public.event_tag; +DROP TABLE IF EXISTS public.event_journal; +DROP TABLE IF EXISTS public.snapshot; +DROP TABLE IF EXISTS public.durable_state; + diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql new file mode 100644 index 000000000..12ba4d411 --- /dev/null +++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql @@ -0,0 +1,24 @@ +IF NOT EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'"journal"') AND type in (N'U')) +begin +CREATE TABLE journal ( + "ordering" BIGINT IDENTITY(1,1) NOT NULL, + "deleted" BIT DEFAULT 0 NOT NULL, + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" NUMERIC(10,0) NOT NULL, + "tags" VARCHAR(255) NULL DEFAULT NULL, + "message" VARBINARY(max) NOT NULL, + PRIMARY KEY ("persistence_id", "sequence_number") +) +CREATE UNIQUE INDEX journal_ordering_idx ON journal (ordering) +end; + + +IF NOT EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'"snapshot"') AND type in (N'U')) +CREATE TABLE legacy_snapshot ( + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" NUMERIC(10,0) NOT NULL, + "created" NUMERIC NOT NULL, + "snapshot" VARBINARY(max) NOT NULL, + PRIMARY KEY ("persistence_id", "sequence_number") +); +end; diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql new file mode 100644 index 000000000..f4cf59f18 --- /dev/null +++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql @@ -0,0 +1,42 @@ +CREATE TABLE event_journal( + "ordering" BIGINT IDENTITY(1,1) NOT NULL, + "deleted" BIT DEFAULT 0 NOT NULL, + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" NUMERIC(10,0) NOT NULL, + "writer" VARCHAR(255) NOT NULL, + "write_timestamp" BIGINT NOT NULL, + "adapter_manifest" VARCHAR(MAX) NOT NULL, + "event_payload" VARBINARY(MAX) NOT NULL, + "event_ser_id" INTEGER NOT NULL, + "event_ser_manifest" VARCHAR(MAX) NOT NULL, + "meta_payload" VARBINARY(MAX), + "meta_ser_id" INTEGER, + "meta_ser_manifest" VARCHAR(MAX) + PRIMARY KEY ("persistence_id", "sequence_number") +); + +CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); + +CREATE TABLE event_tag ( + "event_id" BIGINT NOT NULL, + "tag" VARCHAR(255) NOT NULL + PRIMARY KEY ("event_id","tag") + constraint "fk_event_journal" + foreign key("event_id") + references "dbo"."event_journal"("ordering") + on delete CASCADE +); + +CREATE TABLE "snapshot" ( + "persistence_id" VARCHAR(255) NOT NULL, + "sequence_number" NUMERIC(10,0) NOT NULL, + "created" BIGINT NOT NULL, + "snapshot_ser_id" INTEGER NOT NULL, + "snapshot_ser_manifest" VARCHAR(255) NOT NULL, + "snapshot_payload" VARBINARY(MAX) NOT NULL, + "meta_ser_id" INTEGER, + "meta_ser_manifest" VARCHAR(255), + "meta_payload" VARBINARY(MAX), + PRIMARY KEY ("persistence_id", "sequence_number") + ) + diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema-legacy.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema-legacy.sql new file mode 100644 index 000000000..7a3cc849f --- /dev/null +++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema-legacy.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS journal; +DROP TABLE IF EXISTS legacy_snapshot; diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema.sql new file mode 100644 index 000000000..750504e76 --- /dev/null +++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS event_tag; +DROP TABLE IF EXISTS event_journal; +DROP TABLE IF EXISTS snapshot; diff --git a/migrator/src/test/resources/sqlserver-application.conf b/migrator/src/test/resources/sqlserver-application.conf new file mode 100644 index 000000000..0298fd82d --- /dev/null +++ b/migrator/src/test/resources/sqlserver-application.conf @@ -0,0 +1,65 @@ +# Copyright (C) 2019 - 2021 Lightbend Inc. + +include "general.conf" + +akka { + persistence { + journal { + plugin = "jdbc-journal" + // Enable the line below to automatically start the journal when the actorsystem is started + // auto-start-journals = ["jdbc-journal"] + } + snapshot-store { + plugin = "jdbc-snapshot-store" + // Enable the line below to automatically start the snapshot-store when the actorsystem is started + // auto-start-snapshot-stores = ["jdbc-snapshot-store"] + } + } +} + +jdbc-journal { + tables { + journal { + schemaName = "dbo" + } + } + + slick = ${slick} +} + +# the akka-persistence-snapshot-store in use +jdbc-snapshot-store { + tables { + snapshot { + schemaName = "dbo" + } + } + + slick = ${slick} +} + +# the akka-persistence-query provider in use +jdbc-read-journal { + tables { + journal { + schemaName = "dbo" + } + } + + slick = ${slick} +} + +slick { + profile = "slick.jdbc.SQLServerProfile$" + db { + host = ${docker.host} + host = ${?DB_HOST} + url = "jdbc:sqlserver://"${slick.db.host}":1433;databaseName=docker;integratedSecurity=false" + user = "docker" + password = "docker" + driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" + numThreads = 5 + maxConnections = 5 + minConnections = 1 + } +} diff --git a/migrator/src/test/scala/akka/persistence/jdbc/migrator/JournalMigratorTest.scala b/migrator/src/test/scala/akka/persistence/jdbc/migrator/JournalMigratorTest.scala new file mode 100644 index 000000000..b000ee293 --- /dev/null +++ b/migrator/src/test/scala/akka/persistence/jdbc/migrator/JournalMigratorTest.scala @@ -0,0 +1,126 @@ +package akka.persistence.jdbc.migrator + +import akka.Done +import akka.pattern.ask +import akka.persistence.jdbc.db.SlickDatabase +import akka.persistence.jdbc.migrator.MigratorSpec._ + +abstract class JournalMigratorTest(configName: String) extends MigratorSpec(configName) { + + it should "migrate the event journal" in { + withLegacyActorSystem { implicit systemLegacy => + withReadJournal { implicit readJournal => + withTestActors() { (actorA1, actorA2, actorA3) => + eventually { + countJournal().futureValue shouldBe 0 + (actorA1 ? CreateAccount(1)).futureValue //balance 1 + (actorA2 ? CreateAccount(2)).futureValue //balance 2 + (actorA3 ? CreateAccount(3)).futureValue //balance 3 + (actorA1 ? Deposit(3)).futureValue //balance 4 + (actorA2 ? Deposit(2)).futureValue //balance 4 + (actorA3 ? Deposit(1)).futureValue //balance 4 + (actorA1 ? Withdraw(3)).futureValue //balance 1 + (actorA2 ? Withdraw(2)).futureValue //balance 1 + (actorA3 ? Withdraw(1)).futureValue //balance 1 + (actorA1 ? State).mapTo[Int].futureValue shouldBe 1 + (actorA2 ? State).mapTo[Int].futureValue shouldBe 2 + (actorA3 ? State).mapTo[Int].futureValue shouldBe 3 + countJournal().futureValue shouldBe 9 + } + } + } + } // legacy persistence + withActorSystem { implicit systemNew => + withReadJournal { implicit readJournal => + eventually { + countJournal().futureValue shouldBe 0 // before migration + JournalMigrator(SlickDatabase.profile(config, "slick")).migrate().futureValue shouldBe Done + countJournal().futureValue shouldBe 9 // after migration + } + withTestActors() { (actorB1, actorB2, actorB3) => + eventually { + (actorB1 ? State).mapTo[Int].futureValue shouldBe 1 + (actorB2 ? State).mapTo[Int].futureValue shouldBe 2 + (actorB3 ? State).mapTo[Int].futureValue shouldBe 3 + } + } + } + } // new persistence + } + + it should "migrate the event journal preserving the order of events" in { + withLegacyActorSystem { implicit systemLegacy => + withReadJournal { implicit readJournal => + withTestActors() { (actorA1, actorA2, actorA3) => + (actorA1 ? CreateAccount(0)).futureValue + (actorA2 ? CreateAccount(0)).futureValue + (actorA3 ? CreateAccount(0)).futureValue + for (i <- 1 to 999) { + (actorA1 ? Deposit(i)).futureValue + (actorA2 ? Deposit(i)).futureValue + (actorA3 ? Deposit(i)).futureValue + } + eventually { + countJournal().futureValue shouldBe 3000 + } + } + } + } // legacy persistence + withActorSystem { implicit systemNew => + withReadJournal { implicit readJournal => + eventually { + countJournal().futureValue shouldBe 0 // before migration + JournalMigrator(SlickDatabase.profile(config, "slick")).migrate().futureValue shouldBe Done + countJournal().futureValue shouldBe 3000 // after migration + val allEvents: Seq[Seq[AccountEvent]] = events().futureValue + allEvents.size shouldBe 3 + val seq1: Seq[Int] = allEvents.head.map(_.amount) + val seq2: Seq[Int] = allEvents(1).map(_.amount) + val seq3: Seq[Int] = allEvents(2).map(_.amount) + val expectedResult: Seq[Int] = 0 to 999 + seq1 shouldBe expectedResult + seq2 shouldBe expectedResult + seq3 shouldBe expectedResult + } + } + } // new persistence + } + + it should "migrate the event journal preserving tags" in { + withLegacyActorSystem { implicit systemLegacy => + withReadJournal { implicit readJournal => + withTestActors() { (actorA1, actorA2, actorA3) => + (actorA1 ? CreateAccount(0)).futureValue + (actorA2 ? CreateAccount(0)).futureValue + (actorA3 ? CreateAccount(0)).futureValue + for (i <- 1 to 999) { + (actorA1 ? Deposit(i)).futureValue + (actorA2 ? Deposit(i)).futureValue + (actorA3 ? Deposit(i)).futureValue + } + eventually { + countJournal().futureValue shouldBe 3000 + } + } + } + } // legacy persistence + withActorSystem { implicit systemNew => + withReadJournal { implicit readJournal => + eventually { + countJournal().futureValue shouldBe 0 // before migration + JournalMigrator(SlickDatabase.profile(config, "slick")).migrate().futureValue shouldBe Done + countJournal().futureValue shouldBe 3000 // after migration + val evenEvents: Seq[AccountEvent] = eventsByTag(MigratorSpec.Even).futureValue + evenEvents.size shouldBe 1500 + evenEvents.forall(e => e.amount % 2 == 0) shouldBe true + + val oddEvents: Seq[AccountEvent] = eventsByTag(MigratorSpec.Odd).futureValue + oddEvents.size shouldBe 1500 + oddEvents.forall(e => e.amount % 2 == 1) shouldBe true + } + } + } // new persistence + } +} + +class H2JournalMigratorTest extends JournalMigratorTest("h2-application.conf") with MigratorSpec.H2Cleaner diff --git a/migrator/src/test/scala/akka/persistence/jdbc/migrator/MigratorSpec.scala b/migrator/src/test/scala/akka/persistence/jdbc/migrator/MigratorSpec.scala new file mode 100644 index 000000000..1f512aa8d --- /dev/null +++ b/migrator/src/test/scala/akka/persistence/jdbc/migrator/MigratorSpec.scala @@ -0,0 +1,396 @@ +package akka.persistence.jdbc.migrator + +import akka.actor.{ ActorRef, ActorSystem, Props, Stash } +import akka.event.LoggingReceive +import akka.pattern.ask +import akka.persistence.jdbc.SimpleSpec +import akka.persistence.jdbc.config.{ JournalConfig, SlickConfiguration } +import akka.persistence.jdbc.db.SlickDatabase +import akka.persistence.jdbc.migrator.MigratorSpec._ +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import akka.persistence.jdbc.testkit.internal._ +import akka.persistence.journal.EventSeq.single +import akka.persistence.journal.{ EventAdapter, EventSeq, Tagged } +import akka.persistence.query.PersistenceQuery +import akka.persistence.{ PersistentActor, SaveSnapshotSuccess, SnapshotMetadata, SnapshotOffer } +import akka.stream.Materializer +import akka.stream.scaladsl.Sink +import akka.util.Timeout +import com.typesafe.config.{ Config, ConfigFactory, ConfigValue, ConfigValueFactory } +import org.scalatest.BeforeAndAfterEach +import org.slf4j.{ Logger, LoggerFactory } +import slick.jdbc.JdbcBackend.{ Database, Session } + +import java.sql.Statement +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ ExecutionContextExecutor, Future } + +abstract class MigratorSpec(val config: Config) extends SimpleSpec with BeforeAndAfterEach { + + // The db is initialized in the before and after each bocks + var dbOpt: Option[Database] = None + + implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds) + implicit val timeout: Timeout = Timeout(1.minute) + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + + private val cfg: Config = config.getConfig("jdbc-journal") + private val journalConfig: JournalConfig = new JournalConfig(cfg) + + protected val newJournalTableName: String = journalConfig.eventJournalTableConfiguration.tableName + protected val legacyJournalTableName: String = journalConfig.journalTableConfiguration.tableName + + protected val newTables: Seq[String] = + List(journalConfig.eventTagTableConfiguration.tableName, journalConfig.eventJournalTableConfiguration.tableName) + protected val legacyTables: Seq[String] = List(journalConfig.journalTableConfiguration.tableName) + protected val tables: Seq[String] = legacyTables ++ newTables + + def this(config: String = "postgres-application.conf", configOverrides: Map[String, ConfigValue] = Map.empty) = + this(configOverrides.foldLeft(ConfigFactory.load(config)) { case (conf, (path, configValue)) => + conf.withValue(path, configValue) + }) + + def db: Database = dbOpt.getOrElse { + val db = SlickDatabase.database(cfg, new SlickConfiguration(cfg.getConfig("slick")), "slick.db") + dbOpt = Some(db) + db + } + + protected def dropAndCreate(schemaType: SchemaType): Unit = { + // blocking calls, usually done in our before test methods + // legacy + SchemaUtilsImpl.dropWithSlick(schemaType, logger, db, legacy = true) + SchemaUtilsImpl.createWithSlick(schemaType, logger, db, legacy = true) + // new + SchemaUtilsImpl.dropWithSlick(schemaType, logger, db, legacy = false) + SchemaUtilsImpl.createWithSlick(schemaType, logger, db, legacy = false) + } + + def withSession[A](f: Session => A)(db: Database): A = { + val session = db.createSession() + try f(session) + finally session.close() + } + + def withStatement[A](f: Statement => A)(db: Database): A = + withSession(session => session.withStatement()(f))(db) + + def closeDb(): Unit = { + dbOpt.foreach(_.close()) + dbOpt = None + } + + override protected def afterEach(): Unit = { + super.afterEach() + closeDb() + } + + override protected def afterAll(): Unit = { + super.afterAll() + closeDb() + } + + protected def setupEmpty(persistenceId: Int)(implicit system: ActorSystem): ActorRef = + system.actorOf(Props(new TestAccountActor(persistenceId))) + + def withTestActors(seq: Int = 1)(f: (ActorRef, ActorRef, ActorRef) => Unit)(implicit system: ActorSystem): Unit = { + implicit val ec: ExecutionContextExecutor = system.dispatcher + val refs = (seq until seq + 3).map(setupEmpty).toList + try { + // make sure we notice early if the actors failed to start (because of issues with journal) makes debugging + // failing tests easier as we know it is not the actual interaction from the test that is the problem + Future.sequence(refs.map(_ ? State)).futureValue + + f(refs.head, refs.drop(1).head, refs.drop(2).head) + } finally killActors(refs: _*) + } + + def withActorSystem(f: ActorSystem => Unit): Unit = { + implicit val system: ActorSystem = ActorSystem("migrator-test", config) + f(system) + system.terminate().futureValue + } + + def withLegacyActorSystem(f: ActorSystem => Unit): Unit = { + + val configOverrides: Map[String, ConfigValue] = Map( + "jdbc-journal.dao" -> ConfigValueFactory.fromAnyRef( + "akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao"), + "jdbc-snapshot-store.dao" -> ConfigValueFactory.fromAnyRef( + "akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao"), + "jdbc-read-journal.dao" -> ConfigValueFactory.fromAnyRef( + "akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao")) + + val legacyDAOConfig = configOverrides.foldLeft(ConfigFactory.load(config)) { case (conf, (path, configValue)) => + conf.withValue(path, configValue) + } + + implicit val system: ActorSystem = ActorSystem("migrator-test", legacyDAOConfig) + f(system) + system.terminate().futureValue + } + + def withReadJournal(f: JdbcReadJournal => Unit)(implicit system: ActorSystem): Unit = { + val readJournal: JdbcReadJournal = + PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier) + f(readJournal) + } + + def countJournal(filterPid: String => Boolean = _ => true)( + implicit system: ActorSystem, + mat: Materializer, + readJournal: JdbcReadJournal): Future[Long] = + readJournal + .currentPersistenceIds() + .filter(filterPid(_)) + .mapAsync(1) { pid => + readJournal + .currentEventsByPersistenceId(pid, 0, Long.MaxValue) + .map(_ => 1L) + .runWith(Sink.seq) + .map(_.sum)(system.dispatcher) + } + .runWith(Sink.seq) + .map(_.sum)(system.dispatcher) + + def eventsByTag(tag: String)(implicit mat: Materializer, readJournal: JdbcReadJournal): Future[Seq[AccountEvent]] = + readJournal + .currentEventsByTag(tag, offset = 0) + .map(_.event) + .collect { case e: AccountEvent => + e + } + .runWith(Sink.seq) + + def events(filterPid: String => Boolean = _ => true)( + implicit mat: Materializer, + readJournal: JdbcReadJournal): Future[Seq[Seq[AccountEvent]]] = + readJournal + .currentPersistenceIds() + .filter(filterPid(_)) + .mapAsync(1) { pid => + readJournal + .currentEventsByPersistenceId(pid, fromSequenceNr = 0, toSequenceNr = Long.MaxValue) + .map(e => e.event) + .collect { case e: AccountEvent => + e + } + .runWith(Sink.seq) + } + .runWith(Sink.seq) + +} + +object MigratorSpec { + + private final val Zero: Int = 0 + + private final val SnapshotInterval: Int = 10 + + val Even: String = "EVEN" + val Odd: String = "ODD" + + /** Commands */ + sealed trait AccountCommand extends Serializable + + final case class CreateAccount(amount: Int) extends AccountCommand + + final case class Deposit(amount: Int) extends AccountCommand + + final case class Withdraw(amount: Int) extends AccountCommand + + final object State extends AccountCommand + + /** Events */ + sealed trait AccountEvent extends Serializable { + val amount: Int + } + + final case class AccountCreated(override val amount: Int) extends AccountEvent + + final case class Deposited(override val amount: Int) extends AccountEvent + + final case class Withdrawn(override val amount: Int) extends AccountEvent + + /** Reply */ + final case class CurrentBalance(balance: Int) + + class AccountEventAdapter extends EventAdapter { + + override def manifest(event: Any): String = event.getClass.getSimpleName + + def fromJournal(event: Any, manifest: String): EventSeq = event match { + case event: AccountEvent => single(event) + case _ => sys.error(s"Unexpected case '${event.getClass.getName}'") + } + + def toJournal(event: Any): Any = event match { + case event: AccountEvent => + val tag: String = if (event.amount % 2 == 0) Even else Odd + Tagged(event, Set(tag)) + case _ => sys.error(s"Unexpected case '${event.getClass.getName}'") + } + } + + /** Actor */ + class TestAccountActor(id: Int) extends PersistentActor with Stash { + override val persistenceId: String = s"test-account-$id" + + var state: Int = Zero + + private def saveSnapshot(): Unit = { + if (state % SnapshotInterval == 0) { + saveSnapshot(state) + } + } + + override def receiveCommand: Receive = + LoggingReceive { + + case SaveSnapshotSuccess(_: SnapshotMetadata) => () + + case CreateAccount(balance) => + persist(AccountCreated(balance)) { (event: AccountCreated) => + updateState(event) + saveSnapshot() + sender() ! akka.actor.Status.Success(event) + } + case Deposit(balance) => + persist(Deposited(balance)) { (event: Deposited) => + updateState(event) + saveSnapshot() + sender() ! akka.actor.Status.Success(event) + } + case Withdraw(balance) => + persist(Withdrawn(balance)) { (event: Withdrawn) => + updateState(event) + saveSnapshot() + sender() ! akka.actor.Status.Success(event) + } + case State => + sender() ! akka.actor.Status.Success(state) + } + + def updateState(event: AccountEvent): Unit = event match { + case AccountCreated(amount) => state = state + amount + case Deposited(amount) => state = state + amount + case Withdrawn(amount) => state = state - amount + } + + override def receiveRecover: Receive = + LoggingReceive { + case SnapshotOffer(_, snapshot: Int) => + state = snapshot + case event: AccountEvent => updateState(event) + } + } + + trait PostgresCleaner extends MigratorSpec { + + def clearPostgres(): Unit = { + tables.foreach { name => + withStatement(stmt => stmt.executeUpdate(s"DELETE FROM $name"))(db) + } + } + + override def beforeAll(): Unit = { + dropAndCreate(Postgres) + super.beforeAll() + } + + override def beforeEach(): Unit = { + dropAndCreate(Postgres) + super.beforeEach() + } + } + + trait MysqlCleaner extends MigratorSpec { + + def clearMySQL(): Unit = { + withStatement { stmt => + stmt.execute("SET FOREIGN_KEY_CHECKS = 0") + tables.foreach { name => stmt.executeUpdate(s"TRUNCATE $name") } + stmt.execute("SET FOREIGN_KEY_CHECKS = 1") + }(db) + } + + override def beforeAll(): Unit = { + dropAndCreate(MySQL) + super.beforeAll() + } + + override def beforeEach(): Unit = { + clearMySQL() + super.beforeEach() + } + } + + trait OracleCleaner extends MigratorSpec { + + def clearOracle(): Unit = { + tables.foreach { name => + withStatement(stmt => stmt.executeUpdate(s"""DELETE FROM "$name" """))(db) + } + withStatement(stmt => stmt.executeUpdate("""BEGIN "reset_sequence"; END; """))(db) + } + + override def beforeAll(): Unit = { + dropAndCreate(Oracle) + super.beforeAll() + } + + override def beforeEach(): Unit = { + clearOracle() + super.beforeEach() + } + } + + trait SqlServerCleaner extends MigratorSpec { + + var initial = true + + def clearSqlServer(): Unit = { + val reset = if (initial) { + initial = false + 1 + } else { + 0 + } + withStatement { stmt => + tables.foreach { name => stmt.executeUpdate(s"DELETE FROM $name") } + stmt.executeUpdate(s"DBCC CHECKIDENT('$legacyJournalTableName', RESEED, $reset)") + stmt.executeUpdate(s"DBCC CHECKIDENT('$newJournalTableName', RESEED, $reset)") + }(db) + } + + override def beforeAll(): Unit = { + dropAndCreate(SqlServer) + super.beforeAll() + } + + override def afterAll(): Unit = { + dropAndCreate(SqlServer) + super.afterAll() + } + + override def beforeEach(): Unit = { + clearSqlServer() + super.beforeEach() + } + } + + trait H2Cleaner extends MigratorSpec { + + def clearH2(): Unit = { + tables.foreach { name => + withStatement(stmt => stmt.executeUpdate(s"DELETE FROM $name"))(db) + } + } + + override def beforeEach(): Unit = { + dropAndCreate(H2) + super.beforeEach() + } + } +} diff --git a/migrator/src/test/scala/akka/persistence/jdbc/migrator/SnapshotMigratorTest.scala b/migrator/src/test/scala/akka/persistence/jdbc/migrator/SnapshotMigratorTest.scala new file mode 100644 index 000000000..8682cf9ae --- /dev/null +++ b/migrator/src/test/scala/akka/persistence/jdbc/migrator/SnapshotMigratorTest.scala @@ -0,0 +1,50 @@ +package akka.persistence.jdbc.migrator + +import akka.Done +import akka.pattern.ask +import akka.persistence.jdbc.db.SlickDatabase +import akka.persistence.jdbc.migrator.MigratorSpec._ + +abstract class SnapshotMigratorTest(configName: String) extends MigratorSpec(configName) { + + it should "migrate snapshots" in { + withLegacyActorSystem { implicit systemLegacy => + withReadJournal { implicit readJournal => + withTestActors() { (actorA1, actorA2, actorA3) => + (actorA1 ? CreateAccount(1)).futureValue + (actorA2 ? CreateAccount(1)).futureValue + (actorA3 ? CreateAccount(1)).futureValue + for (_ <- 1 to 99) { + (actorA1 ? Deposit(1)).futureValue + (actorA2 ? Deposit(1)).futureValue + (actorA3 ? Deposit(1)).futureValue + } + eventually { + (actorA1 ? State).mapTo[Int].futureValue shouldBe 100 + (actorA2 ? State).mapTo[Int].futureValue shouldBe 100 + (actorA3 ? State).mapTo[Int].futureValue shouldBe 100 + countJournal().futureValue shouldBe 300 + } + } + } + } // legacy persistence + withActorSystem { implicit systemNew => + withReadJournal { implicit readJournal => + eventually { + countJournal().futureValue shouldBe 0 // before migration + SnapshotMigrator(SlickDatabase.profile(config, "slick")).migrateAll().futureValue shouldBe Done + countJournal().futureValue shouldBe 0 // after migration + } + withTestActors() { (actorB1, actorB2, actorB3) => + eventually { + (actorB1 ? State).mapTo[Int].futureValue shouldBe 100 + (actorB2 ? State).mapTo[Int].futureValue shouldBe 100 + (actorB3 ? State).mapTo[Int].futureValue shouldBe 100 + } + } + } + } // new persistence + } +} + +class H2SnapshotMigratorTest extends SnapshotMigratorTest("h2-application.conf") with MigratorSpec.H2Cleaner diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0c24e1b22..1f14f3e91 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,5 +1,4 @@ import sbt._ -import Keys._ object Dependencies { val Nightly = sys.env.get("TRAVIS_EVENT_TYPE").contains("cron") @@ -33,7 +32,6 @@ object Dependencies { "org.scalatest" %% "scalatest" % ScalaTestVersion % Test) ++ JdbcDrivers.map(_ % Test) val Migration: Seq[ModuleID] = Seq( - "org.flywaydb" % "flyway-core" % "7.14.0", "com.typesafe" % "config" % "1.4.1", "ch.qos.logback" % "logback-classic" % "1.2.6", "org.testcontainers" % "postgresql" % "1.16.1" % Test,