diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala index 6a98adad..87f339ae 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala @@ -39,6 +39,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match { case "H2" => new H2SequenceNextValUpdater(profile, durableStateTableCfg) case "Postgres" => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg) + case "MySQL" => new MySQLSequenceNextValUpdater(profile, durableStateTableCfg) case _ => ??? } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala index 65cbb1be..3feeb408 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala @@ -51,3 +51,19 @@ import slick.sql.SqlStreamingAction def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] } + +/** + * INTERNAL API + */ +@InternalApi private[jdbc] class MySQLSequenceNextValUpdater( + profile: JdbcProfile, + val durableStateTableCfg: DurableStateTableConfiguration) + extends SequenceNextValUpdater { + import profile.api._ + private val schema = durableStateTableCfg.schemaName.map(n => s"'$n'").getOrElse("DATABASE()") + // Note: for actual MySQL servers (i.e. not MariaDB) the variable information_schema_stats_expiry should be set to zero. + final val nextValFetcher = + s"""(SELECT AUTO_INCREMENT FROM information_schema.tables WHERE table_name = '${durableStateTableCfg.tableName}' AND table_schema = ${schema})""" + + def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] +} diff --git a/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLDurableStateStorePluginSpec.scala b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLDurableStateStorePluginSpec.scala new file mode 100644 index 00000000..c3ef795f --- /dev/null +++ b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLDurableStateStorePluginSpec.scala @@ -0,0 +1,8 @@ +package akka.persistence.jdbc.integration + +import com.typesafe.config.ConfigFactory +import slick.jdbc.MySQLProfile +import akka.persistence.jdbc.state.scaladsl.DurableStateStorePluginSpec + +class MySQLDurableStateStorePluginSpec + extends DurableStateStorePluginSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQLProfile) {} diff --git a/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLScalaJdbcDurableStateChangesByTagTest.scala b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLScalaJdbcDurableStateChangesByTagTest.scala new file mode 100644 index 00000000..2d43d741 --- /dev/null +++ b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLScalaJdbcDurableStateChangesByTagTest.scala @@ -0,0 +1,12 @@ +package akka.persistence.jdbc.integration + +import com.typesafe.config.ConfigFactory +import akka.actor.ActorSystem +import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateSpec +import akka.persistence.jdbc.testkit.internal.Mysql + +class MySQLScalaJdbcDurableStateStoreQueryTest + extends JdbcDurableStateSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQL) { + implicit lazy val system: ActorSystem = + ActorSystem("JdbcDurableStateSpec", config.withFallback(customSerializers)) +}