Skip to content

Commit

Permalink
feat: support for r2dbc sqlserver (#1122)
Browse files Browse the repository at this point in the history
Co-authored-by: Patrik Nordwall <[email protected]>
Co-authored-by: Johan Andrén <[email protected]>
  • Loading branch information
3 people authored May 6, 2024
1 parent 236f6bc commit 764bc38
Show file tree
Hide file tree
Showing 17 changed files with 521 additions and 71 deletions.
46 changes: 46 additions & 0 deletions .github/workflows/integration-tests-r2dbc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,49 @@ jobs:
- name: Run integration tests with with Scala and Java ${{ matrix.jdkVersion }}
run: |-
sbt -Dconfig.resource=application-h2.conf akka-projection-r2dbc-integration/test
test-r2dbc-sqlserver:
name: Run r2dbc tests with SQL Server
runs-on: ubuntu-22.04
if: github.repository == 'akka/akka-projection'
strategy:
fail-fast: false
matrix:
include:
# - { jdkVersion: "1.11.0", jvmName: "temurin:1.11.0", extraOpts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
- { jdkVersion: "1.17.0", jvmName: "temurin:1.17.0", extraOpts: '' }

steps:
- name: Checkout
# https://github.com/actions/checkout/releases
# v4.1.1
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
with:
fetch-depth: 0

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch
- name: Cache Coursier cache
# https://github.com/coursier/cache-action/releases
# v6.4.4
uses: coursier/cache-action@a0e7cd24be81bc84f0d7461e02bd1a96980553d7

- name: Set up JDK ${{ matrix.jdkVersion }}
# https://github.com/coursier/setup-action/releases
# v1.3.4
uses: coursier/setup-action@48280172a2c999022e42527711d6b28e4945e6f0
with:
jvm: ${{ matrix.jvmName }}

- name: Start DB
run: |-
docker compose -f docker-files/docker-compose-sqlserver.yml up --wait
docker exec -i sqlserver-db /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P '<YourStrong@Passw0rd>' -d master < akka-projection-r2dbc/ddl-scripts/create_tables_sqlserver.sql
- name: Run integration tests with with Scala and Java ${{ matrix.jdkVersion }}
run: |-
sbt -Dconfig.resource=application-sqlserver.conf akka-projection-r2dbc-integration/test
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.sqlserver}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<logger name="akka.projection" level="DEBUG" />
<logger name="akka.persistence.r2dbc" level="DEBUG" />
<!-- <logger name="io.r2dbc.postgresql.QUERY" level="DEBUG" />-->
<!-- <logger name="io.r2dbc.mssql.QUERY" level="DEBUG" />-->
<!-- <logger name="io.r2dbc.pool" level="DEBUG" />-->


Expand All @@ -23,4 +24,4 @@
<!-- <appender-ref ref="STDOUT"/>-->
</root>

</configuration>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import akka.persistence.query.Offset
import akka.persistence.query.TimestampOffset.toTimestampOffset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
Expand All @@ -40,6 +40,8 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement

object EventSourcedEndToEndSpec {

val config: Config = ConfigFactory
Expand Down Expand Up @@ -134,6 +136,8 @@ class EventSourcedEndToEndSpec
private val projectionSettings = R2dbcProjectionSettings(system)
private val stringSerializer = SerializationExtension(system).serializerFor(classOf[String])

import journalSettings.codecSettings.JournalImplicits._

override protected def beforeAll(): Unit = {
super.beforeAll()
}
Expand All @@ -156,7 +160,7 @@ class EventSourcedEndToEndSpec
.bind(1, entityType)
.bind(2, persistenceId)
.bind(3, seqNr)
.bind(4, timestamp)
.bindTimestamp(4, timestamp)
.bind(5, stringSerializer.identifier)
.bind(6, stringSerializer.toBinary(event))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.query.Sequence
import akka.persistence.query.TimeBasedUUID
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.projection.MergeableOffset
import akka.projection.ProjectionId
import akka.projection.internal.ManagementState
import akka.projection.r2dbc.internal.OffsetPidSeqNr
import akka.projection.r2dbc.internal.R2dbcOffsetStore
import org.scalatest.wordspec.AnyWordSpecLike

import akka.persistence.r2dbc.internal.codec.QueryAdapter

class R2dbcOffsetStoreSpec
extends ScalaTestWithActorTestKit(TestConfig.config)
with AnyWordSpecLike
Expand All @@ -34,6 +36,8 @@ class R2dbcOffsetStoreSpec

private val settings = R2dbcProjectionSettings(testKit.system)

private implicit val queryAdapter: QueryAdapter = r2dbcSettings.codecSettings.queryAdapter

private def createOffsetStore(projectionId: ProjectionId) =
new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor, clock)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ package akka.projection.r2dbc
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.Done
import akka.NotUsed
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.projection.HandlerRecoveryStrategy
import akka.projection.OffsetVerification
Expand All @@ -46,9 +48,12 @@ import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSource
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

import scala.util.control.NonFatal

import akka.persistence.r2dbc.internal.codec.IdentityAdapter
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter

object R2dbcProjectionSpec {
final case class Envelope(id: String, offset: Long, message: String)

Expand Down Expand Up @@ -86,17 +91,36 @@ object R2dbcProjectionSpec {
object TestRepository {
val table = "projection_spec_model"

val createTableSql: String =
s"""|CREATE table IF NOT EXISTS $table (
| id VARCHAR(255) NOT NULL,
| concatenated VARCHAR(255) NOT NULL,
| PRIMARY KEY(id)
|);""".stripMargin
def createTableSql(dialectName: String): String = {
dialectName match {
case "sqlserver" =>
s"""|IF object_id('$table') is null
| CREATE TABLE $table (
| id NVARCHAR(255) NOT NULL,
| concatenated NVARCHAR(255) NOT NULL
| PRIMARY KEY(id)
| );""".stripMargin
case _ =>
s"""|CREATE table IF NOT EXISTS $table (
| id VARCHAR(255) NOT NULL,
| concatenated VARCHAR(255) NOT NULL,
| PRIMARY KEY(id)
|);""".stripMargin
}

}
}

final case class TestRepository(session: R2dbcSession)(implicit ec: ExecutionContext, system: ActorSystem[_]) {
import TestRepository.table

private val dialect = system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect")

// no R2dbcSettings available to get the implicits
implicit private val queryAdapter: QueryAdapter = if (dialect == "sqlserver") {
SqlServerQueryAdapter
} else IdentityAdapter

private val logger = LoggerFactory.getLogger(this.getClass)

def concatToText(id: String, payload: String): Future[Done] = {
Expand Down Expand Up @@ -125,12 +149,22 @@ object R2dbcProjectionSpec {
logger.debug("TestRepository.upsert: [{}]", concatStr)

val stmtSql = {
if (system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect") == "h2") {
if (dialect == "h2") {
sql"""
MERGE INTO $table (id, concatenated)
KEY (id)
VALUES (?, ?)
"""
} else if (dialect == "sqlserver") {
sql"""
UPDATE $table SET
id = @id,
concatenated = @concatenated
WHERE id = @id
if @@ROWCOUNT = 0
INSERT INTO $table (id, concatenated)
VALUES (@id, @concatenated)
"""
} else {
sql"""
INSERT INTO $table (id, concatenated) VALUES (?, ?)
Expand Down Expand Up @@ -188,7 +222,8 @@ class R2dbcProjectionSpec
super.beforeAll()
try {
Await.result(
r2dbcExecutor.executeDdl("beforeAll createTable")(_.createStatement(TestRepository.createTableSql)),
r2dbcExecutor.executeDdl("beforeAll createTable")(
_.createStatement(TestRepository.createTableSql(r2dbcSettings.dialectName))),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from ${TestRepository.table}")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ class R2dbcTimestampOffsetProjectionSpec

override protected def beforeAll(): Unit = {
super.beforeAll()
val dialectName = system.settings.config.getConfig(settings.useConnectionFactory).getString("dialect")

Await.result(r2dbcExecutor.executeDdl("beforeAll createTable") { conn =>
conn.createStatement(TestRepository.createTableSql)
conn.createStatement(TestRepository.createTableSql(dialectName))
}, 10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from ${TestRepository.table}")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
r2dbcExecutor.updateOne("beforeAll delete")(
_.createStatement(s"delete from ${r2dbcSettings.journalTableWithSchema(0)}")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from ${r2dbcSettings.snapshotsTable}")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
_.createStatement(s"delete from ${r2dbcSettings.durableStateTableWithSchema(0)}")),
Expand Down
116 changes: 116 additions & 0 deletions akka-projection-r2dbc/ddl-scripts/create_tables_sqlserver.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
IF object_id('event_journal') is null
CREATE TABLE event_journal(
slice INT NOT NULL,
entity_type NVARCHAR(255) NOT NULL,
persistence_id NVARCHAR(255) NOT NULL,
seq_nr NUMERIC(10,0) NOT NULL,
db_timestamp datetime2(6) NOT NULL,
event_ser_id INTEGER NOT NULL,
event_ser_manifest NVARCHAR(255) NOT NULL,
event_payload VARBINARY(MAX) NOT NULL,
deleted BIT DEFAULT 0 NOT NULL,
writer NVARCHAR(255) NOT NULL,
adapter_manifest NVARCHAR(255) NOT NULL,
tags NVARCHAR(255),

meta_ser_id INTEGER,
meta_ser_manifest NVARCHAR(255),
meta_payload VARBINARY(MAX),
PRIMARY KEY(persistence_id, seq_nr)
);

IF NOT EXISTS(SELECT * FROM sys.indexes WHERE name = 'event_journal_slice_idx' AND object_id = OBJECT_ID('event_journal'))
BEGIN
CREATE INDEX event_journal_slice_idx ON event_journal (slice, entity_type, db_timestamp, seq_nr);
END;

IF object_id('snapshot') is null
CREATE TABLE snapshot(
slice INT NOT NULL,
entity_type NVARCHAR(255) NOT NULL,
persistence_id NVARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
db_timestamp datetime2(6),
write_timestamp BIGINT NOT NULL,
ser_id INTEGER NOT NULL,
ser_manifest NVARCHAR(255) NOT NULL,
snapshot VARBINARY(MAX) NOT NULL,
tags NVARCHAR(255),
meta_ser_id INTEGER,
meta_ser_manifest NVARCHAR(255),
meta_payload VARBINARY(MAX),
PRIMARY KEY(persistence_id)
);

-- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point
IF NOT EXISTS(SELECT * FROM sys.indexes WHERE name = 'snapshot_slice_idx' AND object_id = OBJECT_ID('snapshot'))
BEGIN
CREATE INDEX snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp);
END;

IF object_id('durable_state') is null
CREATE TABLE durable_state (
slice INT NOT NULL,
entity_type NVARCHAR(255) NOT NULL,
persistence_id NVARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
db_timestamp datetime2(6) NOT NULL,

state_ser_id INTEGER NOT NULL,
state_ser_manifest NVARCHAR(255),
state_payload VARBINARY(MAX) NOT NULL,
tags NVARCHAR(255),

PRIMARY KEY(persistence_id, revision)
);

-- `durable_state_slice_idx` is only needed if the slice based queries are used
IF NOT EXISTS(SELECT * FROM sys.indexes WHERE name = 'durable_state_slice_idx' AND object_id = OBJECT_ID('durable_state'))
BEGIN
CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);
END;

-- during creation of all tables below, the following warning is logged:
-- ###################################################################################
-- # Warning! The maximum key length for a clustered index is 900 bytes. #
-- # The index '[some name]' has maximum length of [some number] bytes. #
-- # For some combination of large values, the insert/update operation will fail. #
-- ###################################################################################

-- Primitive offset types are stored in this table.
-- If only timestamp based offsets are used this table is optional.
-- Configure akka.projection.r2dbc.offset-store.offset-table="" if the table is not created.
IF object_id('akka_projection_offset_store') is null
CREATE TABLE akka_projection_offset_store (
projection_name NVARCHAR(255) NOT NULL,
projection_key NVARCHAR(255) NOT NULL,
current_offset NVARCHAR(255) NOT NULL,
manifest NVARCHAR(32) NOT NULL,
mergeable BIT DEFAULT 0 NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);

IF object_id('akka_projection_timestamp_offset_store') is null
CREATE TABLE akka_projection_timestamp_offset_store (
projection_name NVARCHAR(255) NOT NULL,
projection_key NVARCHAR(255) NOT NULL,
slice INT NOT NULL,
persistence_id NVARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
-- timestamp_offset is the db_timestamp of the original event
timestamp_offset datetime2(6) NOT NULL,
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed datetime2(6) NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);

IF object_id('akka_projection_management') is null
CREATE TABLE akka_projection_management (
projection_name NVARCHAR(255) NOT NULL,
projection_key NVARCHAR(255) NOT NULL,
paused BIT NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
Loading

0 comments on commit 764bc38

Please sign in to comment.