Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored the Dialect to use an interface to generate the DB specific SQL. #484

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -135,25 +135,7 @@ private static int currentVersion(Connection connection, Dialect dialect) throws
private static void createVersionTableIfNotExists(Connection connection, Dialect dialect)
throws SQLException {
try (Statement s = connection.createStatement()) {
switch (dialect) {
case ORACLE:
try {
s.execute("CREATE TABLE TXNO_VERSION (version NUMBER)");
} catch (SQLException e) {
// oracle code for name already used by an existing object
if (!e.getMessage().contains("955")) {
throw e;
}
}
break;
case MY_SQL_5:
case H2:
case MY_SQL_8:
case POSTGRESQL_9:
default:
s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)");
break;
}
dialect.createVersionTableIfNotExists(s);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class DefaultPersistor implements Persistor, Validatable {
/**
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write
* lock. There's no point making this long; it's always better to just back off as quickly as
* possible and try another record. Generally these lock timeouts only kick in if {@link
* Dialect#isSupportsSkipLock()} is false.
* possible and try another record. Generally these lock timeouts only kick in if the
* implementation does not support skip locking.
*/
@SuppressWarnings("JavaDoc")
@Builder.Default
Expand All @@ -62,7 +62,7 @@ public class DefaultPersistor implements Persistor, Validatable {
private final String tableName = "TXNO_OUTBOX";

/**
* @param migrate Set to false to disable automatic database migrations. This may be preferred if
* @param migrate Set too false to disable automatic database migrations. This may be preferred if
* the default migration behaviour interferes with your existing toolset, and you prefer to
* manage the migrations explicitly (e.g. using FlyWay or Liquibase), or your do not give the
* application DDL permissions at runtime.
Expand Down Expand Up @@ -190,18 +190,7 @@ public void update(Transaction tx, TransactionOutboxEntry entry) throws Exceptio
@Override
public boolean lock(Transaction tx, TransactionOutboxEntry entry) throws Exception {
//noinspection resource
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
dialect.isSupportsSkipLock()
// language=MySQL
? "SELECT id, invocation FROM "
+ tableName
+ " WHERE id = ? AND version = ? FOR UPDATE SKIP LOCKED"
// language=MySQL
: "SELECT id, invocation FROM "
+ tableName
+ " WHERE id = ? AND version = ? FOR UPDATE")) {
try (PreparedStatement stmt = tx.connection().prepareStatement(dialect.lock(tableName))) {
stmt.setString(1, entry.getId());
stmt.setInt(2, entry.getVersion());
stmt.setQueryTimeout(writeLockTimeoutSeconds);
Expand All @@ -228,44 +217,25 @@ public boolean lock(Transaction tx, TransactionOutboxEntry entry) throws Excepti
@Override
public boolean unblock(Transaction tx, String entryId) throws Exception {
@SuppressWarnings("resource")
PreparedStatement stmt =
tx.prepareBatchStatement(
"UPDATE "
+ tableName
+ " SET attempts = 0, blocked = "
+ dialect.booleanValue(false)
+ " "
+ "WHERE blocked = "
+ dialect.booleanValue(true)
+ " AND processed = "
+ dialect.booleanValue(false)
+ " AND id = ?");
stmt.setString(1, entryId);
PreparedStatement stmt = tx.prepareBatchStatement(dialect.unblock(tableName));
stmt.setInt(1, 0);
stmt.setBoolean(2, false);
stmt.setBoolean(3, true);
stmt.setBoolean(4, false);
stmt.setString(5, entryId);
stmt.setQueryTimeout(writeLockTimeoutSeconds);
return stmt.executeUpdate() != 0;
}

@Override
public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant now)
throws Exception {
String forUpdate = dialect.isSupportsSkipLock() ? " FOR UPDATE SKIP LOCKED" : "";
//noinspection resource
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
// language=MySQL
"SELECT "
+ ALL_FIELDS
+ " FROM "
+ tableName
+ " WHERE nextAttemptTime < ? AND blocked = "
+ dialect.booleanValue(false)
+ " AND processed = "
+ dialect.booleanValue(false)
+ dialect.getLimitCriteria()
+ forUpdate)) {
tx.connection().prepareStatement(dialect.selectBatch(tableName, ALL_FIELDS, batchSize))) {
stmt.setTimestamp(1, Timestamp.from(now));
stmt.setInt(2, batchSize);
stmt.setBoolean(2, false);
stmt.setBoolean(3, false);
return gatherResults(batchSize, stmt);
}
}
Expand All @@ -275,10 +245,10 @@ public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now)
throws Exception {
//noinspection resource
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(dialect.getDeleteExpired().replace("{{table}}", tableName))) {
tx.connection().prepareStatement(dialect.deleteExpired(tableName, batchSize))) {
stmt.setTimestamp(1, Timestamp.from(now));
stmt.setInt(2, batchSize);
stmt.setBoolean(2, true);
stmt.setBoolean(3, false);
return stmt.executeUpdate();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,60 +1,29 @@
package com.gruelbox.transactionoutbox;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* The SQL dialects supported by {@link DefaultPersistor}. Currently this is only used to determine
* whether {@code SKIP LOCKED} is available, so using the wrong dialect may work for unsupported
* database platforms. However, in future this is likely to extend to other SQL features and
* possibly be expanded to an interface to allow easier extension.
*/
@AllArgsConstructor
@Getter
@Beta
public enum Dialect {
MY_SQL_5(false, Constants.DEFAULT_DELETE_EXPIRED_STMT, Constants.DEFAULT_LIMIT_CRITERIA), //
MY_SQL_8(true, Constants.DEFAULT_DELETE_EXPIRED_STMT, Constants.DEFAULT_LIMIT_CRITERIA), //
POSTGRESQL_9(
true,
"DELETE FROM {{table}} WHERE id IN (SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)",
Constants.DEFAULT_LIMIT_CRITERIA), //
H2(false, Constants.DEFAULT_DELETE_EXPIRED_STMT, Constants.DEFAULT_LIMIT_CRITERIA), //
ORACLE(
true,
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 AND ROWNUM <= ?",
Constants.ORACLE_LIMIT_CRITERIA);

/**
* @return True if hot row support ({@code SKIP LOCKED}) is available, increasing performance when
* there are multiple instances of the application potentially competing to process the same
* task.
*/
@SuppressWarnings("JavaDoc")
private final boolean supportsSkipLock;

/**
* @return Format string for the SQL required to delete expired retained records.
*/
@SuppressWarnings("JavaDoc")
private final String deleteExpired;

private final String limitCriteria;

private static class Constants {
static final String DEFAULT_DELETE_EXPIRED_STMT =
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?";

static final String DEFAULT_LIMIT_CRITERIA = " LIMIT ?";

static final String ORACLE_LIMIT_CRITERIA = " AND ROWNUM <= ?";
}

public String booleanValue(boolean criteriaValue) {
String valueToReturn;
if (this == ORACLE) valueToReturn = criteriaValue ? "1" : "0";
else valueToReturn = criteriaValue ? Boolean.TRUE.toString() : Boolean.FALSE.toString();

return valueToReturn;
}
import java.sql.SQLException;
import java.sql.Statement;

/** The SQL dialects supported by {@link DefaultPersistor}. */
public interface Dialect {
Dialect MY_SQL_5 = new DialectMySQL5Impl();
Dialect MY_SQL_8 = new DialectMySQL8Impl();
Dialect POSTGRESQL_9 = new DialectPostgres9Impl();
Dialect H2 = new DialectH2Impl();
Dialect ORACLE = new DialectOracleImpl();

String lock(String tableName);

String unblock(String tableName);

String selectBatch(String tableName, String allFields, int batchSize);

String deleteExpired(String tableName, int batchSize);

boolean isSupportsSkipLock();

void createVersionTableIfNotExists(Statement s) throws SQLException;
// Required so the dialects can be used as keys in a Map.
boolean equals(Object o);

int hashCode();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.gruelbox.transactionoutbox;

import lombok.EqualsAndHashCode;

/** Dialect SQL implementation for H2. */
@EqualsAndHashCode
public class DialectH2Impl extends DialectMySQL5Impl {}
wynan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.gruelbox.transactionoutbox;

import java.sql.SQLException;
import java.sql.Statement;
import lombok.EqualsAndHashCode;

/** SQL implementation for MySQL 5. */
@EqualsAndHashCode
public class DialectMySQL5Impl implements Dialect {

@Override
public String lock(String tableName) {
return "SELECT id, invocation FROM " + tableName + " WHERE id = ? AND version = ? FOR UPDATE";
}

@Override
public String unblock(String tableName) {
return "UPDATE "
+ tableName
+ " SET attempts = ?, blocked = ? "
+ "WHERE blocked = ? AND processed = ? AND id = ?";
}

@Override
public String selectBatch(String tableName, String allFields, int batchSize) {
return "SELECT "
+ allFields
+ " FROM "
+ tableName
+ " WHERE nextAttemptTime < ? AND blocked = ? AND processed = ? "
+ "LIMIT "
+ batchSize;
}

@Override
public String deleteExpired(String tableName, int batchSize) {
return "DELETE FROM "
+ tableName
+ " WHERE nextAttemptTime < ? AND processed = ? AND "
+ "blocked = ? LIMIT "
+ batchSize;
}

@Override
public boolean isSupportsSkipLock() {
return false;
}

@Override
public void createVersionTableIfNotExists(Statement s) throws SQLException {
s.execute("CREATE TABLE IF NOT EXISTS TXNO_VERSION (version INT)");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.gruelbox.transactionoutbox;

import lombok.EqualsAndHashCode;

/** Dialect SQL implementation for MySQL 8. */
@EqualsAndHashCode
public class DialectMySQL8Impl extends DialectMySQL5Impl {
@Override
public String lock(String tableName) {
return "SELECT id, invocation FROM "
+ tableName
+ " WHERE id = ? AND version = ? FOR UPDATE SKIP LOCKED";
}

@Override
public String selectBatch(String tableName, String allFields, int batchSize) {
return super.selectBatch(tableName, allFields, batchSize) + " FOR UPDATE SKIP LOCKED";
}

@Override
public boolean isSupportsSkipLock() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.gruelbox.transactionoutbox;

import java.sql.SQLException;
import java.sql.Statement;
import lombok.EqualsAndHashCode;

/** Dialect SQL implementation for Oracle. */
@EqualsAndHashCode
public class DialectOracleImpl extends DialectMySQL8Impl {

@Override
public String selectBatch(String tableName, String allFields, int batchSize) {
return "SELECT "
+ allFields
+ " FROM "
+ tableName
+ " WHERE nextAttemptTime < ? AND blocked = ? AND processed = ? "
+ " AND ROWNUM <= "
+ batchSize
+ " FOR UPDATE SKIP LOCKED";
}

@Override
public String deleteExpired(String tableName, int batchSize) {
return "DELETE FROM "
+ tableName
+ " WHERE nextAttemptTime < ? AND processed = ? AND blocked = ?"
+ " AND ROWNUM <= "
+ batchSize;
}

@Override
public void createVersionTableIfNotExists(Statement s) throws SQLException {
try {
s.execute("CREATE TABLE TXNO_VERSION (version NUMBER)");
} catch (SQLException e) {
// oracle code for name already used by an existing object
if (!e.getMessage().contains("955")) {
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.gruelbox.transactionoutbox;

import lombok.EqualsAndHashCode;

/** Dialect SQL implementation for Postgres 9+. */
@EqualsAndHashCode
public class DialectPostgres9Impl extends DialectMySQL8Impl {

@Override
public String deleteExpired(String tableName, int batchSize) {
return "DELETE FROM "
+ tableName
+ " WHERE id IN (SELECT id FROM "
+ tableName
+ " WHERE "
+ "nextAttemptTime < ? AND processed = ? AND blocked = ? LIMIT "
+ batchSize
+ ")";
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.gruelbox.transactionoutbox.acceptance;

import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;

import com.gruelbox.transactionoutbox.*;
Expand Down