From 83d00359ec665406e7805b1711e2da69c143291a Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Thu, 21 Dec 2023 17:15:38 +0000 Subject: [PATCH 1/4] Convert Dialect to interface --- .../DefaultMigrationManager.java | 182 ++---------- .../gruelbox/transactionoutbox/Dialect.java | 273 ++++++++++++++---- .../gruelbox/transactionoutbox/Migration.java | 17 ++ 3 files changed, 271 insertions(+), 201 deletions(-) create mode 100644 transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java index a3b0e134..99b74382 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java @@ -1,16 +1,13 @@ package com.gruelbox.transactionoutbox; +import static com.gruelbox.transactionoutbox.spi.Utils.uncheck; + import java.io.PrintWriter; import java.io.Writer; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import lombok.AllArgsConstructor; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; /** @@ -20,92 +17,15 @@ @Slf4j class DefaultMigrationManager { - /** Migrations can be dialect specific * */ - private static final List MIGRATIONS = - List.of( - new Migration( - 1, - "Create outbox table", - "CREATE TABLE TXNO_OUTBOX (\n" - + " id VARCHAR(36) PRIMARY KEY,\n" - + " invocation TEXT,\n" - + " nextAttemptTime TIMESTAMP(6),\n" - + " attempts INT,\n" - + " blacklisted BOOLEAN,\n" - + " version INT\n" - + ")", - Map.of( - Dialect.ORACLE, - "CREATE TABLE TXNO_OUTBOX (\n" - + " id VARCHAR2(36) PRIMARY KEY,\n" - + " invocation CLOB,\n" - + " nextAttemptTime TIMESTAMP(6),\n" - + " attempts NUMBER,\n" - + " blacklisted NUMBER(1),\n" - + " version NUMBER\n" - + ")")), - new Migration( - 2, - "Add unique request id", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN uniqueRequestId VARCHAR(100) NULL UNIQUE", - Map.of( - Dialect.ORACLE, - "ALTER TABLE TXNO_OUTBOX ADD uniqueRequestId VARCHAR(100) NULL UNIQUE")), - new Migration( - 3, - "Add processed flag", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN processed BOOLEAN", - Map.of(Dialect.ORACLE, "ALTER TABLE TXNO_OUTBOX ADD processed NUMBER(1)")), - new Migration( - 4, - "Add flush index", - "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blacklisted, nextAttemptTime)"), - new Migration( - 5, - "Increase size of uniqueRequestId", - "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN uniqueRequestId VARCHAR(250)", - Map.of( - Dialect.POSTGRESQL_9, - "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId TYPE VARCHAR(250)", - Dialect.H2, - "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId VARCHAR(250)", - Dialect.ORACLE, - "ALTER TABLE TXNO_OUTBOX MODIFY uniqueRequestId VARCHAR2(250)")), - new Migration( - 6, - "Rename column blacklisted to blocked", - "ALTER TABLE TXNO_OUTBOX CHANGE COLUMN blacklisted blocked VARCHAR(250)", - Map.of( - Dialect.POSTGRESQL_9, - "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked", - Dialect.ORACLE, - "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked", - Dialect.H2, - "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked")), - new Migration( - 7, - "Add lastAttemptTime column to outbox", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation", - Map.of( - Dialect.POSTGRESQL_9, - "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)", - Dialect.ORACLE, - "ALTER TABLE TXNO_OUTBOX ADD lastAttemptTime TIMESTAMP(6)")), - new Migration( - 8, - "Update length of invocation column on outbox for MySQL dialects only.", - "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT", - Map.of( - Dialect.POSTGRESQL_9, "", Dialect.H2, "", Dialect.ORACLE, "SELECT * FROM dual"))); - static void migrate(TransactionManager transactionManager, Dialect dialect) { transactionManager.inTransaction( transaction -> { try { int currentVersion = currentVersion(transaction.connection(), dialect); - MIGRATIONS.stream() - .filter(migration -> migration.version > currentVersion) - .forEach(migration -> runSql(transaction.connection(), migration, dialect)); + dialect + .getMigrations() + .filter(migration -> migration.getVersion() > currentVersion) + .forEach(migration -> uncheck(() -> runSql(transaction.connection(), migration))); } catch (Exception e) { throw new RuntimeException("Migrations failed", e); } @@ -114,86 +34,46 @@ static void migrate(TransactionManager transactionManager, Dialect dialect) { static void writeSchema(Writer writer, Dialect dialect) { PrintWriter printWriter = new PrintWriter(writer); - MIGRATIONS.forEach( - migration -> { - printWriter.print("-- "); - printWriter.print(migration.version); - printWriter.print(": "); - printWriter.println(migration.name); - String sql = migration.sqlFor(dialect); - if (sql.isEmpty()) { - printWriter.println("-- Nothing for " + dialect); - } else { - printWriter.println(sql); - } - printWriter.println(); - }); + dialect + .getMigrations() + .forEach( + migration -> { + printWriter.print("-- "); + printWriter.print(migration.getVersion()); + printWriter.print(": "); + printWriter.println(migration.getName()); + if (migration.getSql() == null || migration.getSql().isEmpty()) { + printWriter.println("-- Nothing for " + dialect); + } else { + printWriter.println(migration.getSql()); + } + printWriter.println(); + }); printWriter.flush(); } - @SneakyThrows - private static void runSql(Connection connection, Migration migration, Dialect dialect) { - log.info("Running migration: {}", migration.name); + private static void runSql(Connection connection, Migration migration) throws SQLException { + log.info("Running migration: {}", migration.getName()); try (Statement s = connection.createStatement()) { - s.execute(migration.sqlFor(dialect)); - if (s.executeUpdate("UPDATE TXNO_VERSION SET version = " + migration.version) != 1) { - s.execute("INSERT INTO TXNO_VERSION VALUES (" + migration.version + ")"); + if (migration.getSql() != null && !migration.getSql().isEmpty()) { + s.execute(migration.getSql()); + } + if (s.executeUpdate("UPDATE TXNO_VERSION SET version = " + migration.getVersion()) != 1) { + // TODO shouldn't be necessary if the lock is done correctly + s.execute("INSERT INTO TXNO_VERSION VALUES (" + migration.getVersion() + ")"); } } } private static int currentVersion(Connection connection, Dialect dialect) throws SQLException { - createVersionTableIfNotExists(connection, dialect); + dialect.createVersionTableIfNotExists(connection); try (Statement s = connection.createStatement(); ResultSet rs = s.executeQuery("SELECT version FROM TXNO_VERSION FOR UPDATE")) { if (!rs.next()) { + // TODO should attempt to "win" at creating the record and then lock it return 0; } return rs.getInt(1); } } - - private static void createVersionTableIfNotExists(Connection connection, Dialect dialect) - throws SQLException { - try (Statement s = connection.createStatement()) { - switch (dialect) { - case ORACLE: - try { - s.execute("CREATE TABLE TXNO_VERSION (version NUMBER)"); - } catch (SQLException e) { - // oracle code for name already used by an existing object - if (!e.getMessage().contains("955")) { - throw e; - } - } - break; - case MY_SQL_5: - case H2: - case MY_SQL_8: - case POSTGRESQL_9: - default: - s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)"); - break; - } - } - } - - @AllArgsConstructor - private static final class Migration { - private final int version; - - private final String name; - - private final String sql; - - private final Map dialectSpecific; - - Migration(int version, String name, String sql) { - this(version, name, sql, Collections.emptyMap()); - } - - String sqlFor(Dialect dialect) { - return dialectSpecific.getOrDefault(dialect, sql); - } - } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 3a30bd08..470c665a 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -1,77 +1,250 @@ package com.gruelbox.transactionoutbox; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Stream; +import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; -/** - * The SQL dialects supported by {@link DefaultPersistor}. Currently this is only used to determine - * whether {@code SKIP LOCKED} is available, so using the wrong dialect may work for unsupported - * database platforms. However, in future this is likely to extend to other SQL features and - * possibly be expanded to an interface to allow easier extension. - */ -@AllArgsConstructor -@Getter -public enum Dialect { - MY_SQL_5( - false, - Constants.DEFAULT_DELETE_EXPIRED_STMT, - Constants.DEFAULT_LIMIT_CRITERIA, - Constants.DEFAULT_CHECK_SQL), // - MY_SQL_8( - true, - Constants.DEFAULT_DELETE_EXPIRED_STMT, - Constants.DEFAULT_LIMIT_CRITERIA, - Constants.DEFAULT_CHECK_SQL), // - POSTGRESQL_9( - true, - "DELETE FROM {{table}} WHERE id IN (SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)", - Constants.DEFAULT_LIMIT_CRITERIA, - Constants.DEFAULT_CHECK_SQL), // - H2( - false, - Constants.DEFAULT_DELETE_EXPIRED_STMT, - Constants.DEFAULT_LIMIT_CRITERIA, - Constants.DEFAULT_CHECK_SQL), // - ORACLE( - true, - "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 AND ROWNUM <= ?", - Constants.ORACLE_LIMIT_CRITERIA, - "SELECT 1 FROM DUAL"); +/** The SQL dialects supported by {@link DefaultPersistor}. */ +public interface Dialect { /** * @return True if hot row support ({@code SKIP LOCKED}) is available, increasing performance when * there are multiple instances of the application potentially competing to process the same * task. */ - @SuppressWarnings("JavaDoc") - private final boolean supportsSkipLock; + boolean isSupportsSkipLock(); /** * @return Format string for the SQL required to delete expired retained records. */ - @SuppressWarnings("JavaDoc") - private final String deleteExpired; + String getDeleteExpired(); - private final String limitCriteria; + String getLimitCriteria(); - private final String checkSql; + String getCheckSql(); - private static class Constants { - static final String DEFAULT_DELETE_EXPIRED_STMT = - "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?"; + String booleanValue(boolean criteriaValue); + + void createVersionTableIfNotExists(Connection connection) throws SQLException; + + Stream getMigrations(); - static final String DEFAULT_LIMIT_CRITERIA = " LIMIT ?"; + Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build(); + Dialect MY_SQL_8 = DefaultDialect.builder("MY_SQL_8").supportsSkipLock(true).build(); + Dialect POSTGRESQL_9 = + DefaultDialect.builder("POSTGRESQL_9") + .supportsSkipLock(true) + .deleteExpired( + "DELETE FROM {{table}} WHERE id IN (SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)") + .changeMigration( + 5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId TYPE VARCHAR(250)") + .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") + .changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)") + .disableMigration(8) + .build(); - static final String ORACLE_LIMIT_CRITERIA = " AND ROWNUM <= ?"; + Dialect H2 = + DefaultDialect.builder("H2") + .changeMigration(5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId VARCHAR(250)") + .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") + .disableMigration(8) + .build(); + Dialect ORACLE = + DefaultDialect.builder("ORACLE") + .supportsSkipLock(true) + .deleteExpired( + "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 AND ROWNUM <= ?") + .limitCriteria(" AND ROWNUM <= ?") + .checkSql("SELECT 1 FROM DUAL") + .changeMigration( + 1, + "CREATE TABLE TXNO_OUTBOX (\n" + + " id VARCHAR2(36) PRIMARY KEY,\n" + + " invocation CLOB,\n" + + " nextAttemptTime TIMESTAMP(6),\n" + + " attempts NUMBER,\n" + + " blacklisted NUMBER(1),\n" + + " version NUMBER\n" + + ")") + .changeMigration( + 2, "ALTER TABLE TXNO_OUTBOX ADD uniqueRequestId VARCHAR(100) NULL UNIQUE") + .changeMigration(3, "ALTER TABLE TXNO_OUTBOX ADD processed NUMBER(1)") + .changeMigration(5, "ALTER TABLE TXNO_OUTBOX MODIFY uniqueRequestId VARCHAR2(250)") + .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") + .changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD lastAttemptTime TIMESTAMP(6)") + .disableMigration(8) + .booleanValueFrom(v -> v ? "1" : "0") + .createVersionTableBy( + connection -> { + try (Statement s = connection.createStatement()) { + try { + s.execute("CREATE TABLE TXNO_VERSION (version NUMBER)"); + } catch (SQLException e) { + // oracle code for name already used by an existing object + if (!e.getMessage().contains("955")) { + throw e; + } + } + } + }) + .build(); +} + +@FunctionalInterface +interface SQLAction { + void doAction(Connection connection) throws SQLException; +} - static final String DEFAULT_CHECK_SQL = "SELECT 1"; +@AllArgsConstructor(access = AccessLevel.PRIVATE) +class DefaultDialect implements Dialect { + + static Builder builder(String name) { + return new Builder(name); } + @Getter private final String name; + @Getter private final boolean supportsSkipLock; + @Getter private final String deleteExpired; + @Getter private final String limitCriteria; + @Getter private final String checkSql; + private final Collection migrations; + + @Override public String booleanValue(boolean criteriaValue) { - String valueToReturn; - if (this == ORACLE) valueToReturn = criteriaValue ? "1" : "0"; - else valueToReturn = criteriaValue ? Boolean.TRUE.toString() : Boolean.FALSE.toString(); + return criteriaValue ? Boolean.TRUE.toString() : Boolean.FALSE.toString(); + } + + @Override + public void createVersionTableIfNotExists(Connection connection) throws SQLException { + try (Statement s = connection.createStatement()) { + s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)"); + } + } + + @Override + public String toString() { + return name; + } + + @Override + public Stream getMigrations() { + return migrations.stream(); + } + + @Setter + @Accessors(fluent = true) + static final class Builder { + private final String name; + private boolean supportsSkipLock = false; + private String deleteExpired = + "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?"; + private String limitCriteria = " LIMIT ?"; + private String checkSql = "SELECT 1"; + private Map migrations; + private Function booleanValueFrom; + private SQLAction createVersionTableBy; + + Builder(String name) { + this.name = name; + this.migrations = new TreeMap<>(); + migrations.put( + 1, + new Migration( + 1, + "Create outbox table", + "CREATE TABLE TXNO_OUTBOX (\n" + + " id VARCHAR(36) PRIMARY KEY,\n" + + " invocation TEXT,\n" + + " nextAttemptTime TIMESTAMP(6),\n" + + " attempts INT,\n" + + " blacklisted BOOLEAN,\n" + + " version INT\n" + + ")")); + migrations.put( + 2, + new Migration( + 2, + "Add unique request id", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN uniqueRequestId VARCHAR(100) NULL UNIQUE")); + migrations.put( + 3, + new Migration( + 3, "Add processed flag", "ALTER TABLE TXNO_OUTBOX ADD COLUMN processed BOOLEAN")); + migrations.put( + 4, + new Migration( + 4, + "Add flush index", + "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blacklisted, nextAttemptTime)")); + migrations.put( + 5, + new Migration( + 5, + "Increase size of uniqueRequestId", + "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN uniqueRequestId VARCHAR(250)")); + migrations.put( + 6, + new Migration( + 6, + "Rename column blacklisted to blocked", + "ALTER TABLE TXNO_OUTBOX CHANGE COLUMN blacklisted blocked VARCHAR(250)")); + migrations.put( + 7, + new Migration( + 7, + "Add lastAttemptTime column to outbox", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation")); + migrations.put( + 8, + new Migration( + 8, + "Update length of invocation column on outbox for MySQL dialects only.", + "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT")); + } + + Builder setMigration(Migration migration) { + this.migrations.put(migration.getVersion(), migration); + return this; + } + + Builder changeMigration(int version, String sql) { + return setMigration(this.migrations.get(version).withSql(sql)); + } + + Builder disableMigration(@SuppressWarnings("SameParameterValue") int version) { + return setMigration(this.migrations.get(version).withSql(null)); + } + + Dialect build() { + return new DefaultDialect( + name, supportsSkipLock, deleteExpired, limitCriteria, checkSql, migrations.values()) { + @Override + public String booleanValue(boolean criteriaValue) { + if (booleanValueFrom != null) { + return booleanValueFrom.apply(criteriaValue); + } + return super.booleanValue(criteriaValue); + } - return valueToReturn; + @Override + public void createVersionTableIfNotExists(Connection connection) throws SQLException { + if (createVersionTableBy != null) { + createVersionTableBy.doAction(connection); + } else { + super.createVersionTableIfNotExists(connection); + } + } + }; + } } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java new file mode 100644 index 00000000..ef2bc3e5 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java @@ -0,0 +1,17 @@ +package com.gruelbox.transactionoutbox; + +import lombok.Value; + +/** + * A database migration script entry. See {@link Dialect#getMigrations()}. + */ +@Value +public class Migration { + int version; + String name; + String sql; + + public Migration withSql(String sql) { + return new Migration(version, name, sql); + } +} From d0ea8ea2dda0333277bdac3c2002a22e7e59f873 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Thu, 21 Dec 2023 17:18:07 +0000 Subject: [PATCH 2/4] Fix formatting --- .../main/java/com/gruelbox/transactionoutbox/Migration.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java index ef2bc3e5..39dd9950 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Migration.java @@ -2,9 +2,7 @@ import lombok.Value; -/** - * A database migration script entry. See {@link Dialect#getMigrations()}. - */ +/** A database migration script entry. See {@link Dialect#getMigrations()}. */ @Value public class Migration { int version; From 6622356d68b18715b38198d55423ac73242760e8 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Thu, 21 Dec 2023 17:56:03 +0000 Subject: [PATCH 3/4] Move classes to top level --- .../transactionoutbox/DefaultDialect.java | 159 ++++++++++++++++++ .../gruelbox/transactionoutbox/Dialect.java | 158 ----------------- .../gruelbox/transactionoutbox/SQLAction.java | 9 + 3 files changed, 168 insertions(+), 158 deletions(-) create mode 100644 transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java create mode 100644 transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SQLAction.java diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java new file mode 100644 index 00000000..e877de52 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -0,0 +1,159 @@ +package com.gruelbox.transactionoutbox; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Stream; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +class DefaultDialect implements Dialect { + + static Builder builder(String name) { + return new Builder(name); + } + + @Getter private final String name; + @Getter private final boolean supportsSkipLock; + @Getter private final String deleteExpired; + @Getter private final String limitCriteria; + @Getter private final String checkSql; + private final Collection migrations; + + @Override + public String booleanValue(boolean criteriaValue) { + return criteriaValue ? Boolean.TRUE.toString() : Boolean.FALSE.toString(); + } + + @Override + public void createVersionTableIfNotExists(Connection connection) throws SQLException { + try (Statement s = connection.createStatement()) { + s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)"); + } + } + + @Override + public String toString() { + return name; + } + + @Override + public Stream getMigrations() { + return migrations.stream(); + } + + @Setter + @Accessors(fluent = true) + static final class Builder { + private final String name; + private boolean supportsSkipLock = false; + private String deleteExpired = + "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?"; + private String limitCriteria = " LIMIT ?"; + private String checkSql = "SELECT 1"; + private Map migrations; + private Function booleanValueFrom; + private SQLAction createVersionTableBy; + + Builder(String name) { + this.name = name; + this.migrations = new TreeMap<>(); + migrations.put( + 1, + new Migration( + 1, + "Create outbox table", + "CREATE TABLE TXNO_OUTBOX (\n" + + " id VARCHAR(36) PRIMARY KEY,\n" + + " invocation TEXT,\n" + + " nextAttemptTime TIMESTAMP(6),\n" + + " attempts INT,\n" + + " blacklisted BOOLEAN,\n" + + " version INT\n" + + ")")); + migrations.put( + 2, + new Migration( + 2, + "Add unique request id", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN uniqueRequestId VARCHAR(100) NULL UNIQUE")); + migrations.put( + 3, + new Migration( + 3, "Add processed flag", "ALTER TABLE TXNO_OUTBOX ADD COLUMN processed BOOLEAN")); + migrations.put( + 4, + new Migration( + 4, + "Add flush index", + "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blacklisted, nextAttemptTime)")); + migrations.put( + 5, + new Migration( + 5, + "Increase size of uniqueRequestId", + "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN uniqueRequestId VARCHAR(250)")); + migrations.put( + 6, + new Migration( + 6, + "Rename column blacklisted to blocked", + "ALTER TABLE TXNO_OUTBOX CHANGE COLUMN blacklisted blocked VARCHAR(250)")); + migrations.put( + 7, + new Migration( + 7, + "Add lastAttemptTime column to outbox", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation")); + migrations.put( + 8, + new Migration( + 8, + "Update length of invocation column on outbox for MySQL dialects only.", + "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT")); + } + + Builder setMigration(Migration migration) { + this.migrations.put(migration.getVersion(), migration); + return this; + } + + Builder changeMigration(int version, String sql) { + return setMigration(this.migrations.get(version).withSql(sql)); + } + + Builder disableMigration(@SuppressWarnings("SameParameterValue") int version) { + return setMigration(this.migrations.get(version).withSql(null)); + } + + Dialect build() { + return new DefaultDialect( + name, supportsSkipLock, deleteExpired, limitCriteria, checkSql, migrations.values()) { + @Override + public String booleanValue(boolean criteriaValue) { + if (booleanValueFrom != null) { + return booleanValueFrom.apply(criteriaValue); + } + return super.booleanValue(criteriaValue); + } + + @Override + public void createVersionTableIfNotExists(Connection connection) throws SQLException { + if (createVersionTableBy != null) { + createVersionTableBy.doAction(connection); + } else { + super.createVersionTableIfNotExists(connection); + } + } + }; + } + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 470c665a..7f54401f 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -3,16 +3,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; -import java.util.Collection; -import java.util.Map; -import java.util.TreeMap; -import java.util.function.Function; import java.util.stream.Stream; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; -import lombok.experimental.Accessors; /** The SQL dialects supported by {@link DefaultPersistor}. */ public interface Dialect { @@ -99,152 +90,3 @@ public interface Dialect { }) .build(); } - -@FunctionalInterface -interface SQLAction { - void doAction(Connection connection) throws SQLException; -} - -@AllArgsConstructor(access = AccessLevel.PRIVATE) -class DefaultDialect implements Dialect { - - static Builder builder(String name) { - return new Builder(name); - } - - @Getter private final String name; - @Getter private final boolean supportsSkipLock; - @Getter private final String deleteExpired; - @Getter private final String limitCriteria; - @Getter private final String checkSql; - private final Collection migrations; - - @Override - public String booleanValue(boolean criteriaValue) { - return criteriaValue ? Boolean.TRUE.toString() : Boolean.FALSE.toString(); - } - - @Override - public void createVersionTableIfNotExists(Connection connection) throws SQLException { - try (Statement s = connection.createStatement()) { - s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)"); - } - } - - @Override - public String toString() { - return name; - } - - @Override - public Stream getMigrations() { - return migrations.stream(); - } - - @Setter - @Accessors(fluent = true) - static final class Builder { - private final String name; - private boolean supportsSkipLock = false; - private String deleteExpired = - "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?"; - private String limitCriteria = " LIMIT ?"; - private String checkSql = "SELECT 1"; - private Map migrations; - private Function booleanValueFrom; - private SQLAction createVersionTableBy; - - Builder(String name) { - this.name = name; - this.migrations = new TreeMap<>(); - migrations.put( - 1, - new Migration( - 1, - "Create outbox table", - "CREATE TABLE TXNO_OUTBOX (\n" - + " id VARCHAR(36) PRIMARY KEY,\n" - + " invocation TEXT,\n" - + " nextAttemptTime TIMESTAMP(6),\n" - + " attempts INT,\n" - + " blacklisted BOOLEAN,\n" - + " version INT\n" - + ")")); - migrations.put( - 2, - new Migration( - 2, - "Add unique request id", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN uniqueRequestId VARCHAR(100) NULL UNIQUE")); - migrations.put( - 3, - new Migration( - 3, "Add processed flag", "ALTER TABLE TXNO_OUTBOX ADD COLUMN processed BOOLEAN")); - migrations.put( - 4, - new Migration( - 4, - "Add flush index", - "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blacklisted, nextAttemptTime)")); - migrations.put( - 5, - new Migration( - 5, - "Increase size of uniqueRequestId", - "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN uniqueRequestId VARCHAR(250)")); - migrations.put( - 6, - new Migration( - 6, - "Rename column blacklisted to blocked", - "ALTER TABLE TXNO_OUTBOX CHANGE COLUMN blacklisted blocked VARCHAR(250)")); - migrations.put( - 7, - new Migration( - 7, - "Add lastAttemptTime column to outbox", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation")); - migrations.put( - 8, - new Migration( - 8, - "Update length of invocation column on outbox for MySQL dialects only.", - "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT")); - } - - Builder setMigration(Migration migration) { - this.migrations.put(migration.getVersion(), migration); - return this; - } - - Builder changeMigration(int version, String sql) { - return setMigration(this.migrations.get(version).withSql(sql)); - } - - Builder disableMigration(@SuppressWarnings("SameParameterValue") int version) { - return setMigration(this.migrations.get(version).withSql(null)); - } - - Dialect build() { - return new DefaultDialect( - name, supportsSkipLock, deleteExpired, limitCriteria, checkSql, migrations.values()) { - @Override - public String booleanValue(boolean criteriaValue) { - if (booleanValueFrom != null) { - return booleanValueFrom.apply(criteriaValue); - } - return super.booleanValue(criteriaValue); - } - - @Override - public void createVersionTableIfNotExists(Connection connection) throws SQLException { - if (createVersionTableBy != null) { - createVersionTableBy.doAction(connection); - } else { - super.createVersionTableIfNotExists(connection); - } - } - }; - } - } -} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SQLAction.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SQLAction.java new file mode 100644 index 00000000..87a6bb9b --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SQLAction.java @@ -0,0 +1,9 @@ +package com.gruelbox.transactionoutbox; + +import java.sql.Connection; +import java.sql.SQLException; + +@FunctionalInterface +interface SQLAction { + void doAction(Connection connection) throws SQLException; +} From e29b932c9e561aaabdbc4fe31ac193f40eec0a96 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Thu, 21 Dec 2023 20:53:53 +0000 Subject: [PATCH 4/4] Fix concurrent upgrades --- .../transactionoutbox/DefaultDialect.java | 3 +- .../DefaultMigrationManager.java | 116 +++++++++++++++--- .../TestDefaultMigrationManager.java | 77 ++++++++++++ 3 files changed, 178 insertions(+), 18 deletions(-) create mode 100644 transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestDefaultMigrationManager.java diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index e877de52..53d512bb 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -36,7 +36,8 @@ public String booleanValue(boolean criteriaValue) { @Override public void createVersionTableIfNotExists(Connection connection) throws SQLException { try (Statement s = connection.createStatement()) { - s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)"); + s.execute( + "CREATE TABLE IF NOT EXISTS TXNO_VERSION (id INT DEFAULT 0, version INT, PRIMARY KEY (id))"); } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java index 99b74382..c0ab162c 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java @@ -5,9 +5,14 @@ import java.io.PrintWriter; import java.io.Writer; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; /** @@ -17,6 +22,25 @@ @Slf4j class DefaultMigrationManager { + private static final Executor basicExecutor = + runnable -> { + new Thread(runnable).start(); + }; + + private static CountDownLatch waitLatch; + private static CountDownLatch readyLatch; + + static void withLatch(CountDownLatch readyLatch, Consumer runnable) { + waitLatch = new CountDownLatch(1); + DefaultMigrationManager.readyLatch = readyLatch; + try { + runnable.accept(waitLatch); + } finally { + waitLatch = null; + DefaultMigrationManager.readyLatch = null; + } + } + static void migrate(TransactionManager transactionManager, Dialect dialect) { transactionManager.inTransaction( transaction -> { @@ -25,7 +49,10 @@ static void migrate(TransactionManager transactionManager, Dialect dialect) { dialect .getMigrations() .filter(migration -> migration.getVersion() > currentVersion) - .forEach(migration -> uncheck(() -> runSql(transaction.connection(), migration))); + .forEach( + migration -> + uncheck( + () -> runSql(transactionManager, transaction.connection(), migration))); } catch (Exception e) { throw new RuntimeException("Migrations failed", e); } @@ -52,28 +79,83 @@ static void writeSchema(Writer writer, Dialect dialect) { printWriter.flush(); } - private static void runSql(Connection connection, Migration migration) throws SQLException { - log.info("Running migration: {}", migration.getName()); - try (Statement s = connection.createStatement()) { - if (migration.getSql() != null && !migration.getSql().isEmpty()) { - s.execute(migration.getSql()); - } - if (s.executeUpdate("UPDATE TXNO_VERSION SET version = " + migration.getVersion()) != 1) { - // TODO shouldn't be necessary if the lock is done correctly - s.execute("INSERT INTO TXNO_VERSION VALUES (" + migration.getVersion() + ")"); + private static void runSql(TransactionManager txm, Connection connection, Migration migration) + throws SQLException { + log.info("Running migration {}: {}", migration.getVersion(), migration.getName()); + + if (migration.getSql() != null && !migration.getSql().isEmpty()) { + CompletableFuture.runAsync( + () -> { + try { + txm.inTransactionThrows( + tx -> { + try (var s = tx.connection().prepareStatement(migration.getSql())) { + s.execute(); + } + }); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }, + basicExecutor) + .join(); + } + + try (var s = connection.prepareStatement("UPDATE TXNO_VERSION SET version = ?")) { + s.setInt(1, migration.getVersion()); + if (s.executeUpdate() != 1) { + throw new IllegalStateException("Version table should already exist"); } } } private static int currentVersion(Connection connection, Dialect dialect) throws SQLException { dialect.createVersionTableIfNotExists(connection); - try (Statement s = connection.createStatement(); - ResultSet rs = s.executeQuery("SELECT version FROM TXNO_VERSION FOR UPDATE")) { - if (!rs.next()) { - // TODO should attempt to "win" at creating the record and then lock it - return 0; + int version = fetchCurrentVersion(connection); + if (version >= 0) { + return version; + } + try { + log.info("No version record found. Attempting to create"); + if (waitLatch != null) { + log.info("Stopping at latch"); + readyLatch.countDown(); + if (!waitLatch.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("Latch not released in 10 seconds"); + } + log.info("Latch released"); + } + try (var s = connection.prepareStatement("INSERT INTO TXNO_VERSION (version) VALUES (0)")) { + s.executeUpdate(); + } + log.info("Created version record."); + return fetchCurrentVersion(connection); + } catch (Exception e) { + log.info( + "Error attempting to create ({} - {}). May have been beaten to it, attempting second fetch", + e.getClass().getSimpleName(), + e.getMessage()); + version = fetchCurrentVersion(connection); + if (version >= 0) { + return version; + } + throw new IllegalStateException("Unable to read or create version record", e); + } + } + + private static int fetchCurrentVersion(Connection connection) throws SQLException { + try (PreparedStatement s = + connection.prepareStatement("SELECT version FROM TXNO_VERSION FOR UPDATE"); + ResultSet rs = s.executeQuery()) { + if (rs.next()) { + var version = rs.getInt(1); + log.info("Current version is {}, obtained lock", version); + if (rs.next()) { + throw new IllegalStateException("More than one version record"); + } + return version; } - return rs.getInt(1); + return -1; } } } diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestDefaultMigrationManager.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestDefaultMigrationManager.java new file mode 100644 index 00000000..1c9a1a6d --- /dev/null +++ b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestDefaultMigrationManager.java @@ -0,0 +1,77 @@ +package com.gruelbox.transactionoutbox; + +import static com.gruelbox.transactionoutbox.Dialect.H2; +import static org.junit.jupiter.api.Assertions.fail; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import java.util.concurrent.*; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +@Slf4j +public class TestDefaultMigrationManager { + + private static HikariDataSource dataSource; + + @BeforeAll + static void beforeAll() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl( + "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DEFAULT_LOCK_TIMEOUT=60000;LOB_TIMEOUT=2000;MV_STORE=TRUE;DATABASE_TO_UPPER=FALSE"); + config.setUsername("test"); + config.setPassword("test"); + config.addDataSourceProperty("cachePrepStmts", "true"); + dataSource = new HikariDataSource(config); + } + + @AfterAll + static void afterAll() { + dataSource.close(); + } + + @Test + void parallelMigrations() { + CountDownLatch readyLatch = new CountDownLatch(2); + DefaultMigrationManager.withLatch( + readyLatch, + waitLatch -> { + Executor executor = runnable -> new Thread(runnable).start(); + TransactionManager txm = TransactionManager.fromDataSource(dataSource); + CompletableFuture threads = + CompletableFuture.allOf( + CompletableFuture.runAsync( + () -> { + try { + DefaultMigrationManager.migrate(txm, H2); + } catch (Exception e) { + log.error("Thread 1 failed", e); + throw e; + } + }, + executor), + CompletableFuture.runAsync( + () -> { + try { + DefaultMigrationManager.migrate(txm, H2); + } catch (Exception e) { + log.error("Thread 2 failed", e); + throw e; + } + }, + executor)); + try { + if (!readyLatch.await(15, TimeUnit.SECONDS)) { + throw new TimeoutException(); + } + waitLatch.countDown(); + } catch (InterruptedException | TimeoutException e) { + fail("Timed out or interrupted waiting for ready latch"); + } finally { + threads.join(); + } + }); + } +}