diff --git a/.github/workflows/integration-tests-r2dbc.yml b/.github/workflows/integration-tests-r2dbc.yml index 5d9b88dff..c0cf227f4 100644 --- a/.github/workflows/integration-tests-r2dbc.yml +++ b/.github/workflows/integration-tests-r2dbc.yml @@ -144,3 +144,49 @@ jobs: - name: Run integration tests with with Scala and Java ${{ matrix.jdkVersion }} run: |- sbt -Dconfig.resource=application-h2.conf akka-projection-r2dbc-integration/test + + test-r2dbc-sqlserver: + name: Run r2dbc tests with SQL Server + runs-on: ubuntu-22.04 + if: github.repository == 'akka/akka-projection' + strategy: + fail-fast: false + matrix: + include: + # - { jdkVersion: "1.11.0", jvmName: "temurin:1.11.0", extraOpts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' } + - { jdkVersion: "1.17.0", jvmName: "temurin:1.17.0", extraOpts: '' } + + steps: + - name: Checkout + # https://github.com/actions/checkout/releases + # v4.1.1 + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 + with: + fetch-depth: 0 + + - name: Checkout GitHub merge + if: github.event.pull_request + run: |- + git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch + git checkout scratch + + - name: Cache Coursier cache + # https://github.com/coursier/cache-action/releases + # v6.4.4 + uses: coursier/cache-action@a0e7cd24be81bc84f0d7461e02bd1a96980553d7 + + - name: Set up JDK ${{ matrix.jdkVersion }} + # https://github.com/coursier/setup-action/releases + # v1.3.4 + uses: coursier/setup-action@48280172a2c999022e42527711d6b28e4945e6f0 + with: + jvm: ${{ matrix.jvmName }} + + - name: Start DB + run: |- + docker compose -f docker-files/docker-compose-sqlserver.yml up --wait + docker exec -i sqlserver-db /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P '' -d master < akka-projection-r2dbc/ddl-scripts/create_tables_sqlserver.sql + + - name: Run integration tests with with Scala and Java ${{ matrix.jdkVersion }} + run: |- + sbt -Dconfig.resource=application-sqlserver.conf akka-projection-r2dbc-integration/test diff --git a/akka-projection-r2dbc-integration/src/test/resources/application-sqlserver.conf b/akka-projection-r2dbc-integration/src/test/resources/application-sqlserver.conf new file mode 100644 index 000000000..524cf3da0 --- /dev/null +++ b/akka-projection-r2dbc-integration/src/test/resources/application-sqlserver.conf @@ -0,0 +1 @@ +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.sqlserver} \ No newline at end of file diff --git a/akka-projection-r2dbc-integration/src/test/resources/logback-test.xml b/akka-projection-r2dbc-integration/src/test/resources/logback-test.xml index d3049dd9b..5a4c066fd 100644 --- a/akka-projection-r2dbc-integration/src/test/resources/logback-test.xml +++ b/akka-projection-r2dbc-integration/src/test/resources/logback-test.xml @@ -15,6 +15,7 @@ + @@ -23,4 +24,4 @@ - + \ No newline at end of file diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala index 91a86f186..654b3d36e 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala @@ -23,7 +23,7 @@ import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset.toTimestampOffset import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect @@ -40,6 +40,8 @@ import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement + object EventSourcedEndToEndSpec { val config: Config = ConfigFactory @@ -134,6 +136,8 @@ class EventSourcedEndToEndSpec private val projectionSettings = R2dbcProjectionSettings(system) private val stringSerializer = SerializationExtension(system).serializerFor(classOf[String]) + import journalSettings.codecSettings.JournalImplicits._ + override protected def beforeAll(): Unit = { super.beforeAll() } @@ -156,7 +160,7 @@ class EventSourcedEndToEndSpec .bind(1, entityType) .bind(2, persistenceId) .bind(3, seqNr) - .bind(4, timestamp) + .bindTimestamp(4, timestamp) .bind(5, stringSerializer.identifier) .bind(6, stringSerializer.toBinary(event)) } diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreSpec.scala index b5c09692b..bf58e0514 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreSpec.scala @@ -12,7 +12,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem import akka.persistence.query.Sequence import akka.persistence.query.TimeBasedUUID -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.projection.MergeableOffset import akka.projection.ProjectionId import akka.projection.internal.ManagementState @@ -20,6 +20,8 @@ import akka.projection.r2dbc.internal.OffsetPidSeqNr import akka.projection.r2dbc.internal.R2dbcOffsetStore import org.scalatest.wordspec.AnyWordSpecLike +import akka.persistence.r2dbc.internal.codec.QueryAdapter + class R2dbcOffsetStoreSpec extends ScalaTestWithActorTestKit(TestConfig.config) with AnyWordSpecLike @@ -34,6 +36,8 @@ class R2dbcOffsetStoreSpec private val settings = R2dbcProjectionSettings(testKit.system) + private implicit val queryAdapter: QueryAdapter = r2dbcSettings.codecSettings.queryAdapter + private def createOffsetStore(projectionId: ProjectionId) = new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor, clock) diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala index 6ef7cb13f..1274214fc 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala @@ -7,12 +7,14 @@ package akka.projection.r2dbc import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference + import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ + import akka.Done import akka.NotUsed import akka.actor.testkit.typed.TestException @@ -20,7 +22,7 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.projection.HandlerRecoveryStrategy import akka.projection.OffsetVerification @@ -46,9 +48,12 @@ import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSource import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory - import scala.util.control.NonFatal +import akka.persistence.r2dbc.internal.codec.IdentityAdapter +import akka.persistence.r2dbc.internal.codec.QueryAdapter +import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter + object R2dbcProjectionSpec { final case class Envelope(id: String, offset: Long, message: String) @@ -86,17 +91,36 @@ object R2dbcProjectionSpec { object TestRepository { val table = "projection_spec_model" - val createTableSql: String = - s"""|CREATE table IF NOT EXISTS $table ( - | id VARCHAR(255) NOT NULL, - | concatenated VARCHAR(255) NOT NULL, - | PRIMARY KEY(id) - |);""".stripMargin + def createTableSql(dialectName: String): String = { + dialectName match { + case "sqlserver" => + s"""|IF object_id('$table') is null + | CREATE TABLE $table ( + | id NVARCHAR(255) NOT NULL, + | concatenated NVARCHAR(255) NOT NULL + | PRIMARY KEY(id) + | );""".stripMargin + case _ => + s"""|CREATE table IF NOT EXISTS $table ( + | id VARCHAR(255) NOT NULL, + | concatenated VARCHAR(255) NOT NULL, + | PRIMARY KEY(id) + |);""".stripMargin + } + + } } final case class TestRepository(session: R2dbcSession)(implicit ec: ExecutionContext, system: ActorSystem[_]) { import TestRepository.table + private val dialect = system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect") + + // no R2dbcSettings available to get the implicits + implicit private val queryAdapter: QueryAdapter = if (dialect == "sqlserver") { + SqlServerQueryAdapter + } else IdentityAdapter + private val logger = LoggerFactory.getLogger(this.getClass) def concatToText(id: String, payload: String): Future[Done] = { @@ -125,12 +149,22 @@ object R2dbcProjectionSpec { logger.debug("TestRepository.upsert: [{}]", concatStr) val stmtSql = { - if (system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect") == "h2") { + if (dialect == "h2") { sql""" MERGE INTO $table (id, concatenated) KEY (id) VALUES (?, ?) """ + } else if (dialect == "sqlserver") { + sql""" + UPDATE $table SET + id = @id, + concatenated = @concatenated + WHERE id = @id + if @@ROWCOUNT = 0 + INSERT INTO $table (id, concatenated) + VALUES (@id, @concatenated) + """ } else { sql""" INSERT INTO $table (id, concatenated) VALUES (?, ?) @@ -188,7 +222,8 @@ class R2dbcProjectionSpec super.beforeAll() try { Await.result( - r2dbcExecutor.executeDdl("beforeAll createTable")(_.createStatement(TestRepository.createTableSql)), + r2dbcExecutor.executeDdl("beforeAll createTable")( + _.createStatement(TestRepository.createTableSql(r2dbcSettings.dialectName))), 10.seconds) Await.result( r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from ${TestRepository.table}")), diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala index 655ce9472..1c41eeada 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala @@ -148,9 +148,10 @@ class R2dbcTimestampOffsetProjectionSpec override protected def beforeAll(): Unit = { super.beforeAll() + val dialectName = system.settings.config.getConfig(settings.useConnectionFactory).getString("dialect") Await.result(r2dbcExecutor.executeDdl("beforeAll createTable") { conn => - conn.createStatement(TestRepository.createTableSql) + conn.createStatement(TestRepository.createTableSql(dialectName)) }, 10.seconds) Await.result( r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from ${TestRepository.table}")), diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestDbLifecycle.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestDbLifecycle.scala index 1b76c9ce5..29c5e29d4 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestDbLifecycle.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestDbLifecycle.scala @@ -47,6 +47,9 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => r2dbcExecutor.updateOne("beforeAll delete")( _.createStatement(s"delete from ${r2dbcSettings.journalTableWithSchema(0)}")), 10.seconds) + Await.result( + r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from ${r2dbcSettings.snapshotsTable}")), + 10.seconds) Await.result( r2dbcExecutor.updateOne("beforeAll delete")( _.createStatement(s"delete from ${r2dbcSettings.durableStateTableWithSchema(0)}")), diff --git a/akka-projection-r2dbc/ddl-scripts/create_tables_sqlserver.sql b/akka-projection-r2dbc/ddl-scripts/create_tables_sqlserver.sql new file mode 100644 index 000000000..4bc125d97 --- /dev/null +++ b/akka-projection-r2dbc/ddl-scripts/create_tables_sqlserver.sql @@ -0,0 +1,116 @@ +IF object_id('event_journal') is null + CREATE TABLE event_journal( + slice INT NOT NULL, + entity_type NVARCHAR(255) NOT NULL, + persistence_id NVARCHAR(255) NOT NULL, + seq_nr NUMERIC(10,0) NOT NULL, + db_timestamp datetime2(6) NOT NULL, + event_ser_id INTEGER NOT NULL, + event_ser_manifest NVARCHAR(255) NOT NULL, + event_payload VARBINARY(MAX) NOT NULL, + deleted BIT DEFAULT 0 NOT NULL, + writer NVARCHAR(255) NOT NULL, + adapter_manifest NVARCHAR(255) NOT NULL, + tags NVARCHAR(255), + + meta_ser_id INTEGER, + meta_ser_manifest NVARCHAR(255), + meta_payload VARBINARY(MAX), + PRIMARY KEY(persistence_id, seq_nr) + ); + +IF NOT EXISTS(SELECT * FROM sys.indexes WHERE name = 'event_journal_slice_idx' AND object_id = OBJECT_ID('event_journal')) + BEGIN + CREATE INDEX event_journal_slice_idx ON event_journal (slice, entity_type, db_timestamp, seq_nr); + END; + +IF object_id('snapshot') is null + CREATE TABLE snapshot( + slice INT NOT NULL, + entity_type NVARCHAR(255) NOT NULL, + persistence_id NVARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp datetime2(6), + write_timestamp BIGINT NOT NULL, + ser_id INTEGER NOT NULL, + ser_manifest NVARCHAR(255) NOT NULL, + snapshot VARBINARY(MAX) NOT NULL, + tags NVARCHAR(255), + meta_ser_id INTEGER, + meta_ser_manifest NVARCHAR(255), + meta_payload VARBINARY(MAX), + PRIMARY KEY(persistence_id) + ); + +-- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point +IF NOT EXISTS(SELECT * FROM sys.indexes WHERE name = 'snapshot_slice_idx' AND object_id = OBJECT_ID('snapshot')) + BEGIN + CREATE INDEX snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp); + END; + +IF object_id('durable_state') is null + CREATE TABLE durable_state ( + slice INT NOT NULL, + entity_type NVARCHAR(255) NOT NULL, + persistence_id NVARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + db_timestamp datetime2(6) NOT NULL, + + state_ser_id INTEGER NOT NULL, + state_ser_manifest NVARCHAR(255), + state_payload VARBINARY(MAX) NOT NULL, + tags NVARCHAR(255), + + PRIMARY KEY(persistence_id, revision) + ); + +-- `durable_state_slice_idx` is only needed if the slice based queries are used +IF NOT EXISTS(SELECT * FROM sys.indexes WHERE name = 'durable_state_slice_idx' AND object_id = OBJECT_ID('durable_state')) + BEGIN + CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision); + END; + +-- during creation of all tables below, the following warning is logged: +-- ################################################################################### +-- # Warning! The maximum key length for a clustered index is 900 bytes. # +-- # The index '[some name]' has maximum length of [some number] bytes. # +-- # For some combination of large values, the insert/update operation will fail. # +-- ################################################################################### + +-- Primitive offset types are stored in this table. +-- If only timestamp based offsets are used this table is optional. +-- Configure akka.projection.r2dbc.offset-store.offset-table="" if the table is not created. +IF object_id('akka_projection_offset_store') is null + CREATE TABLE akka_projection_offset_store ( + projection_name NVARCHAR(255) NOT NULL, + projection_key NVARCHAR(255) NOT NULL, + current_offset NVARCHAR(255) NOT NULL, + manifest NVARCHAR(32) NOT NULL, + mergeable BIT DEFAULT 0 NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) + ); + +IF object_id('akka_projection_timestamp_offset_store') is null + CREATE TABLE akka_projection_timestamp_offset_store ( + projection_name NVARCHAR(255) NOT NULL, + projection_key NVARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id NVARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset datetime2(6) NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed datetime2(6) NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) + ); + +IF object_id('akka_projection_management') is null + CREATE TABLE akka_projection_management ( + projection_name NVARCHAR(255) NOT NULL, + projection_key NVARCHAR(255) NOT NULL, + paused BIT NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) + ); diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/Dialect.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/Dialect.scala index ec9ce4252..f430a87d1 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/Dialect.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/Dialect.scala @@ -65,3 +65,17 @@ private[projection] object H2Dialect extends Dialect { projectionId: ProjectionId): OffsetStoreDao = new H2OffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId) } + +/** + * INTERNAL API + */ +@InternalApi +private[projection] object SqlServerDialect extends Dialect { + def createOffsetStoreDao( + settings: R2dbcProjectionSettings, + sourceProvider: Option[BySlicesSourceProvider], + system: ActorSystem[_], + r2dbcExecutor: R2dbcExecutor, + projectionId: ProjectionId): OffsetStoreDao = + new SqlServerOffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId) +} diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala index 94609ec2a..957d5b706 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala @@ -9,11 +9,13 @@ import akka.annotation.InternalApi import akka.projection.internal.ManagementState import akka.projection.internal.OffsetSerialization import io.r2dbc.spi.Connection - import java.time.Instant + import scala.collection.immutable import scala.concurrent.Future +import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice + /** * INTERNAL API */ @@ -33,7 +35,7 @@ private[projection] trait OffsetStoreDao { timestamp: Instant, storageRepresentation: OffsetSerialization.StorageRepresentation): Future[Done] - def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[String]): Future[Long] + def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[LatestBySlice]): Future[Long] def deleteNewTimestampOffsetsInTx(connection: Connection, timestamp: Instant): Future[Long] diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index bd2e44062..e4c54401e 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -4,6 +4,17 @@ package akka.projection.r2dbc.internal +import java.time.Instant + +import scala.annotation.nowarn +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.Statement +import org.slf4j.LoggerFactory + import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps @@ -11,7 +22,13 @@ import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.r2dbc.internal.R2dbcExecutor -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.IdentityAdapter +import akka.persistence.r2dbc.internal.codec.QueryAdapter +import akka.persistence.r2dbc.internal.codec.TimestampCodec +import akka.persistence.r2dbc.internal.codec.TimestampCodec.PostgresTimestampCodec +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.projection.BySlicesSourceProvider import akka.projection.ProjectionId import akka.projection.internal.ManagementState @@ -20,15 +37,8 @@ import akka.projection.internal.OffsetSerialization.MultipleOffsets import akka.projection.internal.OffsetSerialization.SingleOffset import akka.projection.internal.OffsetSerialization.StorageRepresentation import akka.projection.r2dbc.R2dbcProjectionSettings +import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice import akka.projection.r2dbc.internal.R2dbcOffsetStore.Record -import io.r2dbc.spi.Connection -import io.r2dbc.spi.Statement -import org.slf4j.LoggerFactory - -import java.time.Instant -import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future /** * INTERNAL API @@ -46,9 +56,12 @@ private[projection] class PostgresOffsetStoreDao( private val persistenceExt = Persistence(system) - private val timestampOffsetTable = settings.timestampOffsetTableWithSchema - private val offsetTable = settings.offsetTableWithSchema - private val managementTable = settings.managementTableWithSchema + protected val timestampOffsetTable: String = settings.timestampOffsetTableWithSchema + protected val offsetTable: String = settings.offsetTableWithSchema + protected val managementTable: String = settings.managementTableWithSchema + + protected implicit def queryAdapter: QueryAdapter = IdentityAdapter + protected implicit def timestampCodec: TimestampCodec = PostgresTimestampCodec private implicit val ec: ExecutionContext = system.executionContext @@ -72,12 +85,30 @@ private[projection] class PostgresOffsetStoreDao( """ } - // delete less than a timestamp - private val deleteOldTimestampOffsetSql: String = + /** + * delete less than a timestamp + * @param notInLatestBySlice not used in postgres, but needed in sql + */ + @nowarn + protected def deleteOldTimestampOffsetSql(notInLatestBySlice: Seq[LatestBySlice]): String = sql""" DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ? AND NOT (persistence_id || '-' || seq_nr) = ANY (?)""" + protected def bindDeleteOldTimestampOffsetSql( + stmt: Statement, + minSlice: Int, + maxSlice: Int, + until: Instant, + notInLatestBySlice: Seq[LatestBySlice]): Statement = { + stmt + .bind(0, minSlice) + .bind(1, maxSlice) + .bind(2, projectionId.name) + .bindTimestamp(3, until) + .bind(4, notInLatestBySlice.iterator.map(record => s"${record.pid}-${record.seqNr}").toArray[String]) + } + // delete greater than or equal a timestamp private val deleteNewTimestampOffsetSql: String = sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset >= ?" @@ -121,9 +152,27 @@ private[projection] class PostgresOffsetStoreDao( paused = excluded.paused, last_updated = excluded.last_updated""" - val updateManagementStateSql: String = createUpdateManagementStateSql() + protected def bindCreateUpdateManagementStateSql( + stmt: Statement, + projectionId: ProjectionId, + paused: Boolean, + lastUpdated: Long): Statement = { + bindUpdateManagementStateSql(stmt, projectionId, paused, lastUpdated) + } + + protected def bindUpdateManagementStateSql( + stmt: Statement, + projectionId: ProjectionId, + paused: Boolean, + lastUpdated: Long): Statement = { + stmt + .bind(0, projectionId.name) + .bind(1, projectionId.key) + .bind(2, paused) + .bind(3, lastUpdated) + } - private def timestampOffsetBySlicesSourceProvider: BySlicesSourceProvider = + protected def timestampOffsetBySlicesSourceProvider: BySlicesSourceProvider = sourceProvider match { case Some(provider) => provider case None => @@ -152,7 +201,7 @@ private[projection] class PostgresOffsetStoreDao( val slice = row.get("slice", classOf[java.lang.Integer]) val pid = row.get("persistence_id", classOf[String]) val seqNr = row.get("seq_nr", classOf[java.lang.Long]) - val timestamp = row.get("timestamp_offset", classOf[Instant]) + val timestamp = row.getTimestamp("timestamp_offset") R2dbcOffsetStore.RecordWithProjectionKey(R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp), projectionKey) }) } @@ -193,7 +242,7 @@ private[projection] class PostgresOffsetStoreDao( .bind(bindStartIndex + 2, slice) .bind(bindStartIndex + 3, record.pid) .bind(bindStartIndex + 4, record.seqNr) - .bind(bindStartIndex + 5, record.timestamp) + .bindTimestamp(bindStartIndex + 5, record.timestamp) } require(records.nonEmpty) @@ -241,19 +290,23 @@ private[projection] class PostgresOffsetStoreDao( } } + protected def bindUpsertOffsetSql(stmt: Statement, singleOffset: SingleOffset, toEpochMilli: Long): Statement = { + stmt + .bind(0, singleOffset.id.name) + .bind(1, singleOffset.id.key) + .bind(2, singleOffset.offsetStr) + .bind(3, singleOffset.manifest) + .bind(4, java.lang.Boolean.valueOf(singleOffset.mergeable)) + .bind(5, toEpochMilli) + } + override def updatePrimitiveOffsetInTx( connection: Connection, timestamp: Instant, storageRepresentation: StorageRepresentation): Future[Done] = { def upsertStmt(singleOffset: SingleOffset): Statement = { - connection - .createStatement(upsertOffsetSql) - .bind(0, singleOffset.id.name) - .bind(1, singleOffset.id.key) - .bind(2, singleOffset.offsetStr) - .bind(3, singleOffset.manifest) - .bind(4, java.lang.Boolean.valueOf(singleOffset.mergeable)) - .bind(5, timestamp.toEpochMilli) + val stmt = connection.createStatement(upsertOffsetSql) + bindUpsertOffsetSql(stmt, singleOffset, timestamp.toEpochMilli) } val statements = storageRepresentation match { @@ -264,17 +317,12 @@ private[projection] class PostgresOffsetStoreDao( R2dbcExecutor.updateInTx(statements).map(_ => Done)(ExecutionContexts.parasitic) } - override def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[String]): Future[Long] = { + override def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[LatestBySlice]): Future[Long] = { val minSlice = timestampOffsetBySlicesSourceProvider.minSlice val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice r2dbcExecutor.updateOne("delete old timestamp offset") { conn => - conn - .createStatement(deleteOldTimestampOffsetSql) - .bind(0, minSlice) - .bind(1, maxSlice) - .bind(2, projectionId.name) - .bind(3, until) - .bind(4, notInLatestBySlice.toArray[String]) + val stmt = conn.createStatement(deleteOldTimestampOffsetSql(notInLatestBySlice)) + bindDeleteOldTimestampOffsetSql(stmt, minSlice, maxSlice, until, notInLatestBySlice) } } @@ -287,7 +335,7 @@ private[projection] class PostgresOffsetStoreDao( .bind(0, minSlice) .bind(1, maxSlice) .bind(2, projectionId.name) - .bind(3, timestamp)) + .bindTimestamp(3, timestamp)) } override def clearTimestampOffset(): Future[Long] = { @@ -326,11 +374,7 @@ private[projection] class PostgresOffsetStoreDao( override def updateManagementState(paused: Boolean, timestamp: Instant): Future[Long] = r2dbcExecutor .updateOne("update management state") { conn => - conn - .createStatement(updateManagementStateSql) - .bind(0, projectionId.name) - .bind(1, projectionId.key) - .bind(2, paused) - .bind(3, timestamp.toEpochMilli) + val stmt = conn.createStatement(createUpdateManagementStateSql()) + bindCreateUpdateManagementStateSql(stmt, projectionId, paused, timestamp.toEpochMilli) } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index aee1c07c6..b2989d013 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -181,6 +181,8 @@ private[projection] object R2dbcOffsetStore { } val FutureDone: Future[Done] = Future.successful(Done) + + final case class LatestBySlice(slice: Int, pid: String, seqNr: Long) } /** @@ -208,17 +210,18 @@ private[projection] class R2dbcOffsetStore( import offsetSerialization.fromStorageRepresentation import offsetSerialization.toStorageRepresentation + private val dialectName = system.settings.config.getConfig(settings.useConnectionFactory).getString("dialect") + private val dialect = + dialectName match { + case "postgres" => PostgresDialect + case "yugabyte" => YugabyteDialect + case "h2" => H2Dialect + case "sqlserver" => SqlServerDialect + case unknown => + throw new IllegalArgumentException( + s"[$unknown] is not a dialect supported by this version of Akka Projection R2DBC") + } private val dao = { - val dialectName = system.settings.config.getConfig(settings.useConnectionFactory).getString("dialect") - val dialect = - dialectName match { - case "postgres" => PostgresDialect - case "yugabyte" => YugabyteDialect - case "h2" => H2Dialect - case unknown => - throw new IllegalArgumentException( - s"[$unknown] is not a dialect supported by this version of Akka Projection R2DBC") - } logger.debug2("Offset store [{}] created, with dialect [{}]", projectionId, dialectName) dialect.createOffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId) } @@ -730,7 +733,7 @@ private[projection] class R2dbcOffsetStore( case record if record.timestamp.isBefore(until) => // note that deleteOldTimestampOffsetSql already has `AND timestamp_offset < ?` // and that's why timestamp >= until don't have to be included here - s"${record.pid}-${record.seqNr}" + LatestBySlice(record.slice, record.pid, record.seqNr) } val result = dao.deleteOldTimestampOffset(until, notInLatestBySlice) result.failed.foreach { exc => @@ -849,8 +852,12 @@ private[projection] class R2dbcOffsetStore( dao.readManagementState() def savePaused(paused: Boolean): Future[Done] = { - dao - .updateManagementState(paused, Instant.now(clock)) + + val update = dao.updateManagementState(paused, Instant.now(clock)) + (if (dialect == SqlServerDialect) { + // workaround for https://github.com/r2dbc/r2dbc-mssql/pull/290 + update.map(_ => 1) + } else update) .flatMap { case i if i == 1 => Future.successful(Done) case _ => diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala new file mode 100644 index 000000000..1199b7b61 --- /dev/null +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.projection.r2dbc.internal + +import java.time.Instant + +import io.r2dbc.spi.Statement + +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.internal.R2dbcExecutor +import akka.persistence.r2dbc.internal.codec.QueryAdapter +import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.TimestampCodec +import akka.persistence.r2dbc.internal.codec.TimestampCodec.SqlServerTimestampCodec +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement +import akka.projection.r2dbc.R2dbcProjectionSettings +import akka.projection.BySlicesSourceProvider +import akka.projection.ProjectionId +import akka.projection.internal.OffsetSerialization.SingleOffset +import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice + +/** + * INTERNAL API + */ +@InternalApi +private[projection] class SqlServerOffsetStoreDao( + settings: R2dbcProjectionSettings, + sourceProvider: Option[BySlicesSourceProvider], + system: ActorSystem[_], + r2dbcExecutor: R2dbcExecutor, + projectionId: ProjectionId) + extends PostgresOffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId) { + + override protected implicit def queryAdapter: QueryAdapter = SqlServerQueryAdapter + + override protected implicit def timestampCodec: TimestampCodec = SqlServerTimestampCodec + + override protected def createUpsertOffsetSql() = + sql""" + UPDATE $offsetTable SET + current_offset = @currentOffset, + manifest = @manifest, + mergeable = @mergeable, + last_updated = @lastUpdated + WHERE projection_name = @projectionName AND projection_key = @projectionKey + if @@ROWCOUNT = 0 + INSERT INTO $offsetTable + (projection_name, projection_key, current_offset, manifest, mergeable, last_updated) + VALUES (@projectionName, @projectionKey, @currentOffset, @manifest, @mergeable, @lastUpdated) + """ + + override protected def bindUpsertOffsetSql( + stmt: Statement, + singleOffset: SingleOffset, + toEpochMilli: Long): Statement = { + stmt + .bind("@projectionName", singleOffset.id.name) + .bind("@projectionKey", singleOffset.id.key) + .bind("@currentOffset", singleOffset.offsetStr) + .bind("@manifest", singleOffset.manifest) + .bind("@mergeable", java.lang.Boolean.valueOf(singleOffset.mergeable)) + .bind("@lastUpdated", toEpochMilli) + } + + override protected def createUpdateManagementStateSql(): String = { + sql""" + UPDATE $managementTable SET + paused = @paused, + last_updated = @lastUpdated + WHERE projection_name = @projectionName AND projection_key = @projectionKey + if @@ROWCOUNT = 0 + INSERT INTO $managementTable + (projection_name, projection_key, paused, last_updated) + VALUES (@projectionName, @projectionKey, @paused, @lastUpdated) + """ + } + + override protected def bindUpdateManagementStateSql( + stmt: Statement, + projectionId: ProjectionId, + paused: Boolean, + lastUpdated: Long): Statement = { + stmt + .bind("@paused", paused) + .bind("@lastUpdated", lastUpdated) + .bind("@projectionName", projectionId.name) + .bind("@projectionKey", projectionId.key) + } + + /** + * The r2dbc-sqlserver driver seems to not support binding of array[T]. + * So have to bake the param into the statement instead of binding it. + * + * @param notInLatestBySlice not used in postgres, but needed in sql + * @return + */ + override protected def deleteOldTimestampOffsetSql(notInLatestBySlice: Seq[LatestBySlice]): String = { + val base = + s"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN @from AND @to AND projection_name = @projectionName AND timestamp_offset < @timestampOffset" + if (notInLatestBySlice.isEmpty) { + sql"$base" + } else { + + val values = (timestampOffsetBySlicesSourceProvider.minSlice to timestampOffsetBySlicesSourceProvider.maxSlice) + .map { i => + s"@s$i" + } + .mkString(", ") + sql""" + $base + AND CONCAT(persistence_id, '-', seq_nr) NOT IN ($values)""" + } + } + + override protected def bindDeleteOldTimestampOffsetSql( + stmt: Statement, + minSlice: Int, + maxSlice: Int, + until: Instant, + notInLatestBySlice: Seq[LatestBySlice]): Statement = { + + stmt + .bind("@from", minSlice) + .bind("@to", maxSlice) + .bind("@projectionName", projectionId.name) + .bindTimestamp("@timestampOffset", until) + + if (notInLatestBySlice.nonEmpty) { + val sliceLookup = notInLatestBySlice.map { item => + item.slice -> item + }.toMap + + (timestampOffsetBySlicesSourceProvider.minSlice to timestampOffsetBySlicesSourceProvider.maxSlice).foreach { i => + val bindKey = s"@s$i" + sliceLookup.get(i) match { + case Some(value) => stmt.bind(bindKey, s"${value.pid}-${value.seqNr}") + case None => stmt.bind(bindKey, "-") + } + } + } + + stmt + } + +} diff --git a/docker-files/docker-compose-sqlserver.yml b/docker-files/docker-compose-sqlserver.yml new file mode 100644 index 000000000..76f95fb95 --- /dev/null +++ b/docker-files/docker-compose-sqlserver.yml @@ -0,0 +1,10 @@ +version: '2.2' +services: + sqlserver: + image: mcr.microsoft.com/mssql/server:2022-latest + container_name: sqlserver-db + environment: + - MSSQL_SA_PASSWORD= + - ACCEPT_EULA=Y + ports: + - 1433:1433 diff --git a/docs/src/main/paradox/r2dbc.md b/docs/src/main/paradox/r2dbc.md index 3b06eb6f5..df40d80f2 100644 --- a/docs/src/main/paradox/r2dbc.md +++ b/docs/src/main/paradox/r2dbc.md @@ -57,6 +57,12 @@ The table below shows `akka-projection-r2dbc`'s direct dependencies, and the sec The `akka_projection_offset_store`, `akka_projection_timestamp_offset_store` and `akka_projection_management` tables need to be created in the configured database: +@@@ warning + +The SQL Server dialect is marked `experimental` and not yet production ready until various [issues](https://github.com/akka/akka-persistence-r2dbc/issues?q=is%3Aopen+label%3Asqlserver+label%3Abug) with the integration of the `r2dbc-mssql` plugin have been resolved. + +@@@ + PostgreSQL : @@snip [PostgreSQL Schema](/akka-projection-r2dbc/ddl-scripts/create_tables_postgres.sql) @@ -66,6 +72,9 @@ YugaByte H2 : @@snip [H2 Schema](/akka-projection-r2dbc/src/main/resources/h2-default-projection-schema.conf) { #schema } +SQLServer +: @@snip [SQLServer Schema](/akka-projection-r2dbc/ddl-scripts/create_tables_sqlserver.sql) + For H2 the schema need to be defined as the `additional-init` setting in your config. This means it is created on first connection instead of up front (needed as there is no way to connect to the database from outside the JVM process): diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 11198a79e..11c19125f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -54,6 +54,8 @@ object Dependencies { val h2 = "com.h2database" % "h2" % "2.1.210" % Provided // EPL 1.0 val r2dbcH2 = "io.r2dbc" % "r2dbc-h2" % "1.0.0.RELEASE" % Provided // ApacheV2 + val r2dbcSqlServer = "io.r2dbc" % "r2dbc-mssql" % "1.0.2.RELEASE" % Provided // ApacheV2 + // pin this because testcontainers and slick brings in incompatible SLF4J 2.2 val sl4j = "org.slf4j" % "slf4j-api" % "1.7.36" val slick = "com.typesafe.slick" %% "slick" % Versions.slick @@ -266,6 +268,7 @@ object Dependencies { Compile.akkaPersistenceR2dbc, Compile.h2, // provided Compile.r2dbcH2, // provided + Compile.r2dbcSqlServer, // provided Compile.akkaPersistenceTyped, Compile.akkaStreamTyped, Test.akkaStreamTestkit, @@ -281,6 +284,7 @@ object Dependencies { Compile.akkaPersistenceR2dbc, Compile.h2, // provided Compile.r2dbcH2, // provided + Compile.r2dbcSqlServer, // provided Compile.akkaPersistenceTyped, Compile.akkaStreamTyped, Test.akkaStreamTestkit,