From 6f8615e03e92effbc85aff73e87c73e0b16b2337 Mon Sep 17 00:00:00 2001 From: QiYu Zhuang Date: Sun, 28 Jan 2024 21:37:23 +0800 Subject: [PATCH] release version 1.0 for GeoTP --- default_table_size | 3 +- .../proxy/src/main/resources/bin/start.sh | 10 +- .../statistics/monitor/LocalLockTable.java | 9 + .../statistics/monitor/LockMetaData.java | 6 +- .../infra/statistics/network/Latency.java | 17 +- .../transactions/AgentAsyncXAManager.java | 13 +- .../infra/transactions/CustomXID.java | 2 +- .../executor/sql/context/ExecutionUnit.java | 7 + .../infra/executor/sql/context/SQLUnit.java | 2 +- .../driver/jdbc/JDBCExecutorCallback.java | 4 + .../driver/DriverExecutionPrepareEngine.java | 2 + .../XAShardingSphereTransactionManager.java | 9 + .../dialect/MySQLXAConnectionWrapperTest.java | 6 +- .../xa/harp/manager/CustomTransactionImp.java | 89 +++++++++- .../harp/manager/XAResourceTransaction.java | 7 +- pom.xml | 2 +- proxy/backend/core/pom.xml | 5 + .../backend/connector/BackendConnection.java | 12 ++ .../backend/connector/ProxySQLExecutor.java | 24 +-- .../BackendTransactionManager.java | 7 + .../transaction/TransactionSetHandler.java | 8 +- .../backend/session/ConnectionSession.java | 4 + .../main/resources/conf/config-sharding.yaml | 166 ++++++++++++------ .../proxy/frontend/ShardingSphereProxy.java | 10 +- .../query/MySQLMultiStatementsHandler.java | 36 +++- 25 files changed, 359 insertions(+), 101 deletions(-) diff --git a/default_table_size b/default_table_size index 08e63931777..a2f1ac9b4a0 100644 --- a/default_table_size +++ b/default_table_size @@ -1 +1,2 @@ -usertable 1000001 \ No newline at end of file +usertable 1000001 +warehouse 401 diff --git a/distribution/proxy/src/main/resources/bin/start.sh b/distribution/proxy/src/main/resources/bin/start.sh index 001f38bcd0e..897fddf8b02 100644 --- a/distribution/proxy/src/main/resources/bin/start.sh +++ b/distribution/proxy/src/main/resources/bin/start.sh @@ -198,11 +198,15 @@ if [[ $1 == -a ]] || [[ $1 == -p ]] || [[ $1 == -c ]] || [[ $1 == -f ]] || [[ $1 done elif [ "$1" == "--harp" ]; then - HARP="alg=harp" + HARP="--alg=harp" elif [ "$1" == "--aharp" ]; then - HARP="alg=aharp" + HARP="--alg=aharp" +elif [ "$1" == "--aharp_lppa" ]; then + HARP="--alg=aharp_lppa" +elif [ "$1" == "--aharp_pa" ]; then + HARP="--alg=aharp_pa" elif [ "$1" == "--a" ]; then - HARP="alg=a" + HARP="--alg=a" elif [ $# == 1 ]; then PORT=$1 diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LocalLockTable.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LocalLockTable.java index fc92785fcba..15987dc6339 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LocalLockTable.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LocalLockTable.java @@ -85,6 +85,15 @@ public void registerTable(String tableName) { } } + public boolean isRegisterTable(String tableName) { + for (String each : DefaultTableNameToSize.keySet()) { + if (each.equals(tableName)) { + return true; + } + } + return false; + } + private int getDefaultTableSize(String tableName) { if (DefaultTableNameToSize.get(tableName) != null) { return DefaultTableNameToSize.get(tableName); diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LockMetaData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LockMetaData.java index 197def24628..e968de0c726 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LockMetaData.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/monitor/LockMetaData.java @@ -24,7 +24,7 @@ @Setter public final class LockMetaData { - private static final int countThreshold = 128; + private static final int countThreshold = 32; private static final double alpha = 0.75; private int readCount; @@ -62,7 +62,7 @@ public LockMetaData(double networkLatency) { writeLatency = 0.01; latency = 0.01; startTime = System.nanoTime(); - networkThreshold = networkLatency * 2; // hyper-parameter, 1RTT + networkThreshold = networkLatency * 2; // hyper-parameter, 2 * RTT } public synchronized void incCount() { @@ -91,7 +91,7 @@ public synchronized double nonBlockProbability() { } // TODO: // System.out.println("successCount: " + successCount + "; count: " + count + "; processing: " + processing); - return Math.pow(successCount * 1.0 / count, processing); + return Math.pow(successCount * 1.0 / count, Math.max(processing - 1, 0)); } public synchronized void updateLatency(double newLatency) { diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java index 85fef7d579d..66b623cb5dc 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java @@ -60,6 +60,18 @@ public void SetDataSourceIp(String src, String ip) { } } + public boolean IsPostgreSQLNode(String resource_name) { + for (Map.Entry each : srcToIp.entrySet()) { + if (resource_name.contains(each.getKey())) { + // find the ip of postgreSQL node + if (each.getValue().equals("10.77.70.89") || each.getValue().equals("10.77.70.90") || each.getValue().equals("127.0.0.1")) { + return true; + } + } + } + return false; + } + public double GetLatency(String src) { if (latencies.containsKey(src)) { return latencies.get(src)[windowSize]; @@ -103,7 +115,10 @@ public void SetAlgorithm(String alg) { } public boolean NeedDelay() { - return algorithm.equals("harp") || algorithm.equals("aharp"); + return algorithm.equals("harp") || algorithm.equals("aharp") || + algorithm.equals("harp_lp") || algorithm.equals("aharp_lp") || + algorithm.equals("harp_pa") || algorithm.equals("aharp_pa") || + algorithm.equals("harp_lppa") || algorithm.equals("aharp_lppa"); } public boolean NeedLatencyPredict() { diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java index bfdb3caf1b9..e90066d87f0 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java @@ -21,7 +21,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; @Slf4j public class AgentAsyncXAManager { @@ -34,6 +34,7 @@ private class XATransactionInfo { XATransactionState state; String errorInfo; + boolean preAbort = false; public XATransactionInfo(XATransactionState state) { this.state = state; @@ -44,7 +45,7 @@ public static AgentAsyncXAManager getInstance() { return Instance; } - private final HashMap XAStates = new HashMap<>(); + private final ConcurrentHashMap XAStates = new ConcurrentHashMap<>(2048, 0.75f, 256); private long globalTid = 0; public synchronized long fetchAndAddGlobalTid() { @@ -66,6 +67,7 @@ public void setStateByXid(CustomXID xid, XATransactionState state) { XAStates.get(xid).setState(state); } else { assert state == XATransactionState.ROLLBACK_ONLY; + XAStates.get(xid).setState(state); log.info("rollback only" + xid); } } else { @@ -82,8 +84,13 @@ public void setErrorInfoByXid(CustomXID xid, String errorInfo) { public XATransactionState getStateByXid(CustomXID xid) { if (XAStates.get(xid) == null) { - System.out.println(xid.toString()); + System.out.println("XA manager can not find" + xid.toString()); } return XAStates.get(xid).getState(); } + + public void clearStateByXid(CustomXID xid) { + System.out.println("XA state size: " + XAStates.size()); + XAStates.remove(xid); + } } diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java index 812e7d34ec4..429abff2d9f 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java @@ -20,7 +20,7 @@ import java.util.Arrays; import java.util.List; -import com.mysql.jdbc.jdbc2.optional.MysqlXid; +import com.mysql.cj.jdbc.MysqlXid; import lombok.ToString; import javax.transaction.xa.Xid; diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java index 1514ba335df..a6cd7a40a6c 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java @@ -102,6 +102,13 @@ public void SetDelayTime(long delayTime) { this.delayTime = delayTime; } + public void clearAnalysis() { + networkLatency = 0; + localExecuteLatency = 0; + abortProbability = 0.0; + delayTime = 0; + } + public void SetFinishTime(long finishTime) { this.finishTime = finishTime; } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java index ef76a815241..8f9f8ff0852 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java @@ -63,7 +63,7 @@ public void CombineSQLUnit(SQLUnit other) { public void setLastQueryComment(boolean onePhase) { if (onePhase) { - sql = "/*last one phase query*/" + sql; + // sql = "/*last one phase query*/" + sql; } else { sql = "/*last query*/" + sql; } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java index ff75d20b9e7..0418732a7e1 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java @@ -60,12 +60,14 @@ public abstract class JDBCExecutorCallback implements ExecutorCallback execute(final Collection executionUnits, final boolean isTrunkThread) throws SQLException { // TODO It is better to judge whether need sane result before execute, can avoid exception thrown Collection result = new LinkedList<>(); + long startTime = System.nanoTime(); for (JDBCExecutionUnit each : executionUnits) { T executeResult = execute(each, isTrunkThread); if (null != executeResult) { result.add(executeResult); } } + System.out.println("JDBCExecutorCallback execution time: " + (System.nanoTime() - startTime) / 1000000 + " ms"); return result; } @@ -76,6 +78,7 @@ public final Collection execute(final Collection execution */ @SneakyThrows private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread) throws SQLException { + long start = System.nanoTime(); SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); DatabaseType storageType = storageTypes.get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()); DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(jdbcExecutionUnit.getStorageResource().getConnection().getMetaData(), storageType); @@ -102,6 +105,7 @@ private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTru executionUnit.setRealExecuteLatency((int) (executeTime / 1000000)); sqlExecutionHook.finishSuccess(); finishReport(jdbcExecutionUnit); + System.out.println("execute time: " + (System.nanoTime() - start) / 1000000 + " ms"); return result; } catch (final SQLException ex) { if (!storageType.equals(protocolType)) { diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java index 4481d9188b3..bc7293ddb9a 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java @@ -85,7 +85,9 @@ private SQLExecutionUnitBuilder getCachedSqlExecutionUnitBuilder(final String ty @Override protected List> group(final String dataSourceName, final List> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException { List> result = new LinkedList<>(); + long startTime = System.nanoTime(); List connections = connectionManager.getConnections(dataSourceName, sqlUnitGroups.size(), connectionMode); + System.out.println("Get connection time: " + (System.nanoTime() - startTime) / 1000000 + " ms;"); int count = 0; for (List each : sqlUnitGroups) { result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode)); diff --git a/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java b/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java index 625e852c3e5..ea5bd379f67 100644 --- a/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java +++ b/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java @@ -26,6 +26,7 @@ import org.apache.shardingsphere.transaction.exception.TransactionTimeoutException; import org.apache.shardingsphere.transaction.spi.ShardingSphereTransactionManager; import org.apache.shardingsphere.transaction.xa.jta.datasource.XATransactionDataSource; +import org.apache.shardingsphere.transaction.xa.harp.manager.CustomTransactionImp; import org.apache.shardingsphere.transaction.xa.jta.datasource.checker.DataSourcePrivilegeChecker; import org.apache.shardingsphere.transaction.xa.spi.XATransactionManagerProvider; @@ -53,6 +54,14 @@ public final class XAShardingSphereTransactionManager implements ShardingSphereT private XATransactionManagerProvider xaTransactionManagerProvider; + @SneakyThrows + public void setXATransactionPreAbort(boolean preAbort) { + // System.out.println("set pre abort: " + preAbort); + if (xaTransactionManagerProvider.getTransactionManager().getTransaction() instanceof CustomTransactionImp) { + ((CustomTransactionImp) xaTransactionManagerProvider.getTransactionManager().getTransaction()).setPreAbort(preAbort); + } + } + @Override public void init(final Map databaseTypes, final Map dataSources, final String providerType) { // dataSources.forEach((key, value) -> TypedSPILoader.getService(DataSourcePrivilegeChecker.class, databaseTypes.get(key).getType()).checkPrivilege(value)); diff --git a/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/connection/dialect/MySQLXAConnectionWrapperTest.java b/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/connection/dialect/MySQLXAConnectionWrapperTest.java index b8af059a221..f0db9a25ed2 100644 --- a/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/connection/dialect/MySQLXAConnectionWrapperTest.java +++ b/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/connection/dialect/MySQLXAConnectionWrapperTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.transaction.xa.jta.connection.dialect; -import com.mysql.jdbc.jdbc2.optional.JDBC4MysqlXAConnection; +import com.mysql.cj.jdbc.MysqlXAConnection; import com.zaxxer.hikari.HikariDataSource; import org.apache.shardingsphere.infra.database.type.DatabaseType; import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader; @@ -45,7 +45,7 @@ class MySQLXAConnectionWrapperTest { @Test void assertWrap() throws SQLException { XAConnection actual = TypedSPILoader.getService(XAConnectionWrapper.class, databaseType.getType()).wrap(createXADataSource(), mockConnection()); - assertThat(actual.getXAResource(), instanceOf(JDBC4MysqlXAConnection.class)); + assertThat(actual.getXAResource(), instanceOf(MysqlXAConnection.class)); } private XADataSource createXADataSource() { @@ -55,7 +55,7 @@ private XADataSource createXADataSource() { private Connection mockConnection() throws SQLException { Connection result = mock(Connection.class); - when(result.unwrap(com.mysql.jdbc.Connection.class)).thenReturn(mock(com.mysql.jdbc.Connection.class)); + when(result.unwrap(com.mysql.cj.MysqlConnection.class)).thenReturn(mock(com.mysql.cj.MysqlConnection.class)); return result; } } diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java index 306db5ffa71..4a618dc92ad 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java @@ -24,7 +24,6 @@ import com.atomikos.datasource.xa.XATransactionalResource; import com.atomikos.icatch.config.Configuration; import com.atomikos.icatch.jta.ExtendedSystemException; -import com.mysql.jdbc.StringUtils; import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; @@ -39,6 +38,7 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.sql.SQLException; import java.util.*; @Slf4j @@ -57,6 +57,9 @@ public class CustomTransactionImp implements Transaction { private volatile int status = 6; private volatile boolean timeout = false; + @Setter + private boolean preAbort = false; + CustomTransactionImp(String tid) { this.xaResourceToResourceTransactionMap_ = new HashMap<>(); this.tid = tid; @@ -92,7 +95,7 @@ private static void rethrowAsJtaHeuristicRollbackException(String msg, Throwable public void commit() throws RollbackException, SecurityException, IllegalStateException { // TODO boolean onePhase = xaResourceToResourceTransactionMap_.size() == 1; - if (Latency.getInstance().asyncPreparation()) { + if (Latency.getInstance().asyncPreparation() && !preAbort && !onePhase) { try { asyncCommit(onePhase); } catch (InterruptedException e) { @@ -112,12 +115,21 @@ public void commit() throws RollbackException, SecurityException, IllegalStateEx public void syncCommit(boolean onePhase) throws XAException { boolean prepareSuccess = true; long enterTime = System.nanoTime(); - System.out.println("Enter in commit: " + enterTime); + System.out.println("Txn " + tid + " Enter in commit: " + enterTime); // xa end try { for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); + if (AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.IDLE) { + continue; + } + if (txn.isInPostgreSQL()) { + AgentAsyncXAManager.getInstance().setStateByXid(xid, XATransactionState.IDLE); + continue; + } each.getXares().end(txn.getXid(), 67108864); + AgentAsyncXAManager.getInstance().setStateByXid(xid, XATransactionState.IDLE); } } catch (XAException e) { log.error("xa end failed."); @@ -130,10 +142,19 @@ public void syncCommit(boolean onePhase) throws XAException { if (!onePhase) { for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); try { + if (AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.PREPARED || + AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.ROLLBACK_ONLY || + AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.FAILED) { + continue; + } each.getXares().prepare(txn.getXid()); + AgentAsyncXAManager.getInstance().setStateByXid(xid, XATransactionState.PREPARED); } catch (XAException ex) { prepareSuccess = false; + AgentAsyncXAManager.getInstance().setStateByXid(xid, XATransactionState.FAILED); + log.error("xa prepare failed."); } } } @@ -150,6 +171,7 @@ public void syncCommit(boolean onePhase) throws XAException { } else { each.getXares().commit(txn.getXid(), false); } + } } else { for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { @@ -158,6 +180,15 @@ public void syncCommit(boolean onePhase) throws XAException { } } + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); + AgentAsyncXAManager.getInstance().clearStateByXid(xid); + } + + if (!prepareSuccess) { + throw new XAException(tid + " - XA PREPARE FAILED."); + } + long xaCommitTime = System.nanoTime(); System.out.println("XA Commit Time: " + (xaCommitTime - xaPrepareTime) / 1000); } @@ -167,14 +198,16 @@ public void asyncCommit(boolean onePhase) throws InterruptedException, XAExcepti if (onePhase) { long enterTime = System.nanoTime(); - System.out.println("Enter in commit: " + enterTime); + System.out.println("Txn " + tid + " Enter in one phase async commit: " + enterTime); for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); while (AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.IDLE && + AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.FAILED && AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ROLLBACK_ONLY) { Thread.sleep(1); } - if (AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.ROLLBACK_ONLY) { + if (AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.ROLLBACK_ONLY || + AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.FAILED) { prepareSuccess = false; } } @@ -199,14 +232,24 @@ public void asyncCommit(boolean onePhase) throws InterruptedException, XAExcepti } long endTime = System.nanoTime(); System.out.println("Txn " + tid + " Finish Commit Time: " + (endTime - finishAsyncTime) / 1000 + "us"); + + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); + AgentAsyncXAManager.getInstance().clearStateByXid(xid); + } + + if (!prepareSuccess) { + throw new XAException(tid + " - XA PREPARE FAILED."); + } } else { long enterTime = System.nanoTime(); - System.out.println("Enter in commit: " + enterTime); + System.out.println("Txn " + tid + " Enter in async commit: " + enterTime); for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); while (AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.PREPARED && AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.FAILED && AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ROLLBACK_ONLY) { + // System.out.println("tid - " + tid + ", xid - " + xid.toString() + ", state - " + AgentAsyncXAManager.getInstance().getStateByXid(xid)); Thread.sleep(1); } if (AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.ROLLBACK_ONLY || @@ -245,6 +288,15 @@ public void asyncCommit(boolean onePhase) throws InterruptedException, XAExcepti } long endTime = System.nanoTime(); System.out.println("Txn " + tid + " Finish Commit Time: " + (endTime - finishAsyncTime) / 1000 + "us"); + + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); + AgentAsyncXAManager.getInstance().clearStateByXid(xid); + } + + if (!prepareSuccess) { + throw new XAException(tid + " - XA PREPARE FAILED."); + } } } @@ -434,7 +486,8 @@ public void registerSynchronization(Synchronization synchronization) throws Roll @SneakyThrows @Override public void rollback() throws IllegalStateException, SystemException { - if (Latency.getInstance().asyncPreparation()) { + boolean onePhase = xaResourceToResourceTransactionMap_.size() == 1; + if (Latency.getInstance().asyncPreparation() && !preAbort && !onePhase) { asyncRollback(); } else { syncRollback(); @@ -443,10 +496,20 @@ public void rollback() throws IllegalStateException, SystemException { public void syncRollback() throws XAException { // xa end + System.out.println("Txn " + tid + " sync rollback."); try { for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); + if (AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ACTIVE) { + continue; + } + if (txn.isInPostgreSQL()) { + AgentAsyncXAManager.getInstance().setStateByXid(xid, XATransactionState.IDLE); + continue; + } each.getXares().end(txn.getXid(), 67108864); + AgentAsyncXAManager.getInstance().setStateByXid(xid, XATransactionState.IDLE); } } catch (XAException e) { log.error("xa end failed."); @@ -458,9 +521,15 @@ public void syncRollback() throws XAException { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); each.getXares().rollback(txn.getXid()); } + + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); + AgentAsyncXAManager.getInstance().clearStateByXid(xid); + } } public void asyncRollback() throws InterruptedException, XAException { + System.out.println("Txn " + tid + " async rollback"); for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); while (AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.PREPARED && @@ -474,6 +543,7 @@ public void asyncRollback() throws InterruptedException, XAException { for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { threadList.add(new Thread(() -> { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); + log.info("XA ROLLBACK " + SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); try { each.getXares().rollback(txn.getXid()); } catch (XAException ex) { @@ -488,6 +558,11 @@ public void asyncRollback() throws InterruptedException, XAException { for (Thread each : threadList) { each.join(); } + + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { + CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); + AgentAsyncXAManager.getInstance().clearStateByXid(xid); + } } @Override diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java index e191837d20b..8f16acfb43b 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java @@ -25,8 +25,9 @@ import com.atomikos.icatch.SysException; import com.atomikos.recovery.TxState; import com.atomikos.util.Assert; -import com.mysql.jdbc.jdbc2.optional.MysqlXid; +import com.mysql.cj.jdbc.MysqlXid; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.infra.statistics.network.Latency; import org.apache.shardingsphere.infra.transactions.CustomXID; import javax.transaction.xa.XAException; @@ -83,6 +84,10 @@ public String toString() { return this.toString; } + public boolean isInPostgreSQL() { + return Latency.getInstance().IsPostgreSQLNode(resourcename); + } + void setState(TxState state) { if (state.isHeuristic()) { log.warn("Heuristic termination of " + this.toString() + " with state " + state); diff --git a/pom.xml b/pom.xml index 3ecab7eb055..2d1faaa05c8 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ 42.4.1 3.1.0-og - 5.1.47 + 8.0.31 2.4.2 2.1.214 6.1.7.jre8-preview diff --git a/proxy/backend/core/pom.xml b/proxy/backend/core/pom.xml index 1688ffe53e2..4ae8700050a 100644 --- a/proxy/backend/core/pom.xml +++ b/proxy/backend/core/pom.xml @@ -138,6 +138,11 @@ shardingsphere-transaction-xa-core ${project.parent.version} + + + + + org.apache.shardingsphere shardingsphere-global-clock-core diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/BackendConnection.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/BackendConnection.java index e901734b74e..f008a5f69f6 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/BackendConnection.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/BackendConnection.java @@ -22,6 +22,7 @@ import com.google.common.collect.Multimap; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCConnectionManager; import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader; @@ -69,6 +70,9 @@ public final class BackendConnection implements ExecutorJDBCConnectionManager { private final Collection transactionHooks = ShardingSphereServiceLoader.getServiceInstances(TransactionHook.class); + @Setter + private boolean preAbort = false; + @Override public List getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException { Preconditions.checkNotNull(connectionSession.getDatabaseName(), "Current database name is null."); @@ -163,6 +167,7 @@ private void replayTransactionOption(final Connection connection) throws SQLExce connection.setReadOnly(true); } if (null != connectionSession.getIsolationLevel()) { + System.out.println("replayTransactionOption: " + TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel())); connection.setTransactionIsolation(TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel())); } } @@ -239,9 +244,11 @@ public void closeExecutionResources() throws BackendConnectionException { if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction()) { result.addAll(closeHandlers(true)); result.addAll(closeConnections(false)); + preAbort = false; } else if (closed.get()) { result.addAll(closeHandlers(true)); result.addAll(closeConnections(true)); + preAbort = false; } if (result.isEmpty()) { return; @@ -258,6 +265,7 @@ public void closeAllResources() { closed.set(true); closeHandlers(true); closeConnections(true); + preAbort = false; } } @@ -337,4 +345,8 @@ private void resetSessionVariablesIfNecessary(final Collection value } connectionSession.getRequiredSessionVariableRecorder().removeVariablesWithDefaultValue(); } + + public boolean getPreAbort() { + return preAbort; + } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java index a9f796bc433..8fe488e1ffa 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java @@ -209,6 +209,7 @@ type, maxConnectionsSizePerQuery, backendConnection, statementManager, new State boolean needStat = LocalLockTable.getInstance().needStatistic(); boolean op = false; + boolean isHot = false; String tableName = ""; int idx = -1; long startTime = 0; @@ -217,32 +218,33 @@ type, maxConnectionsSizePerQuery, backendConnection, statementManager, new State if (sqlStatement instanceof SelectStatement) { if (((SelectStatement) sqlStatement).getFrom() != null && ((SelectStatement) sqlStatement).getFrom() instanceof SimpleTableSegment) { tableName = ((SimpleTableSegment) ((SelectStatement) sqlStatement).getFrom()).getTableName().getIdentifier().getValue(); - if (tableName.contains("usertable")) { + if (LocalLockTable.getInstance().isRegisterTable(tableName)) { op = true; + isHot = true; } } - if (((SelectStatement) sqlStatement).getWhere().isPresent() && + if (isHot && ((SelectStatement) sqlStatement).getWhere().isPresent() && ((SelectStatement) sqlStatement).getWhere().get().getExpr() instanceof BinaryOperationExpression) { idx = (int) ((LiteralExpressionSegment) ((BinaryOperationExpression) ((SelectStatement) sqlStatement).getWhere().get().getExpr()).getRight()).getLiterals(); } } else if (sqlStatement instanceof UpdateStatement) { if (((MySQLUpdateStatement) sqlStatement).getTable() != null && ((MySQLUpdateStatement) sqlStatement).getTable() instanceof SimpleTableSegment) { tableName = ((SimpleTableSegment) ((MySQLUpdateStatement) sqlStatement).getTable()).getTableName().getIdentifier().getValue(); - if (tableName.contains("usertable")) { + if (LocalLockTable.getInstance().isRegisterTable(tableName)) { op = false; } } - if (((UpdateStatement) sqlStatement).getWhere().isPresent() && - (((UpdateStatement) sqlStatement).getWhere().get().getExpr()) instanceof BinaryOperationExpression) { - idx = (int) ((LiteralExpressionSegment) ((BinaryOperationExpression) ((UpdateStatement) sqlStatement).getWhere().get().getExpr()).getRight()).getLiterals(); - } + // if (((UpdateStatement) sqlStatement).getWhere().isPresent() && + // (((UpdateStatement) sqlStatement).getWhere().get().getExpr()) instanceof BinaryOperationExpression) { + // idx = (int) ((LiteralExpressionSegment) ((BinaryOperationExpression) ((UpdateStatement) sqlStatement).getWhere().get().getExpr()).getRight()).getLiterals(); + // } } LockMetaData lockMetaData = LocalLockTable.getInstance().getLockMetaData(tableName, idx); - boolean needPreAbort = analyseSingleSQL(tableName, idx); - if (!needPreAbort) { - throw new SQLException("this transaction is most likely to timeout, pre-abort in harp"); - } + // boolean needPreAbort = analyseSingleSQL(tableName, idx); + // if (!needPreAbort) { + // throw new SQLException("this transaction is most likely to timeout, pre-abort in harp"); + // } executeTransactionHooksBeforeExecuteSQL(backendConnection.getConnectionSession()); if (needStat && idx >= 0) diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java index 34cccc72ac7..98f18261acc 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java @@ -28,6 +28,7 @@ import org.apache.shardingsphere.transaction.rule.TransactionRule; import org.apache.shardingsphere.transaction.spi.ShardingSphereTransactionManager; import org.apache.shardingsphere.transaction.spi.TransactionHook; +import org.apache.shardingsphere.transaction.xa.XAShardingSphereTransactionManager; import java.sql.Connection; import java.sql.SQLException; @@ -95,6 +96,9 @@ public void commit() throws SQLException { if (TransactionType.LOCAL == transactionType || null == shardingSphereTransactionManager) { localTransactionManager.commit(); } else { + if (shardingSphereTransactionManager instanceof XAShardingSphereTransactionManager) { + ((XAShardingSphereTransactionManager) shardingSphereTransactionManager).setXATransactionPreAbort(connection.getPreAbort()); + } shardingSphereTransactionManager.commit(connection.getConnectionSession().getTransactionStatus().isRollbackOnly()); } long endTime = System.nanoTime(); @@ -121,6 +125,9 @@ public void rollback() throws SQLException { if (TransactionType.LOCAL == transactionType || null == shardingSphereTransactionManager) { localTransactionManager.rollback(); } else { + if (shardingSphereTransactionManager instanceof XAShardingSphereTransactionManager) { + ((XAShardingSphereTransactionManager) shardingSphereTransactionManager).setXATransactionPreAbort(connection.getPreAbort()); + } shardingSphereTransactionManager.rollback(); } } finally { diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionSetHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionSetHandler.java index 65952c339a6..6403b7e2ae9 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionSetHandler.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/transaction/TransactionSetHandler.java @@ -25,6 +25,7 @@ import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; import org.apache.shardingsphere.proxy.backend.util.TransactionUtils; import org.apache.shardingsphere.sql.parser.sql.common.enums.TransactionAccessType; +import org.apache.shardingsphere.sql.parser.sql.common.enums.TransactionIsolationLevel; import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SetTransactionStatement; import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement; import org.apache.shardingsphere.transaction.exception.SwitchTypeInTransactionException; @@ -68,9 +69,10 @@ private void setTransactionIsolationLevel() { if (!sqlStatement.getIsolationLevel().isPresent()) { return; } - connectionSession.setDefaultIsolationLevel(sqlStatement instanceof MySQLStatement - ? TransactionUtils.getTransactionIsolationLevel(Connection.TRANSACTION_REPEATABLE_READ) - : TransactionUtils.getTransactionIsolationLevel(Connection.TRANSACTION_READ_COMMITTED)); + connectionSession.setDefaultIsolationLevel(TransactionUtils.getTransactionIsolationLevel(Connection.TRANSACTION_SERIALIZABLE)); + // connectionSession.setDefaultIsolationLevel(sqlStatement instanceof MySQLStatement + // ? TransactionUtils.getTransactionIsolationLevel(Connection.TRANSACTION_REPEATABLE_READ) + // : TransactionUtils.getTransactionIsolationLevel(Connection.TRANSACTION_READ_COMMITTED)); connectionSession.setIsolationLevel(sqlStatement.getIsolationLevel().get()); } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java index c3efe823421..f738dfe4608 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java @@ -75,6 +75,10 @@ public final class ConnectionSession { private QueryContext queryContext; + public void setPreAbort(boolean preAbort) { + backendConnection.setPreAbort(preAbort); + } + public ConnectionSession(final DatabaseType protocolType, final TransactionType initialTransactionType, final AttributeMap attributeMap) { this.protocolType = protocolType; transactionStatus = new TransactionStatus(initialTransactionType); diff --git a/proxy/bootstrap/src/main/resources/conf/config-sharding.yaml b/proxy/bootstrap/src/main/resources/conf/config-sharding.yaml index 9352eb76f1e..bd7d2280b36 100644 --- a/proxy/bootstrap/src/main/resources/conf/config-sharding.yaml +++ b/proxy/bootstrap/src/main/resources/conf/config-sharding.yaml @@ -113,48 +113,112 @@ databaseName: sharding_db dataSources: -# ds_0: -# url: jdbc:mysql://9.134.213.233:3306/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true -# username: shardingsphere -# password: Ss123!@# -# connectionTimeoutMilliseconds: 30000 -# idleTimeoutMilliseconds: 60000 -# maxLifetimeMilliseconds: 1800000 -# maxPoolSize: 200 -# minPoolSize: 1 -# ds_1: -# url: jdbc:mysql://9.134.123.45:3306/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true -# username: shardingsphere -# password: Ss123!@# -# connectionTimeoutMilliseconds: 30000 -# idleTimeoutMilliseconds: 60000 -# maxLifetimeMilliseconds: 1800000 -# maxPoolSize: 200 -# minPoolSize: 1 + # ds_0: + # url: jdbc:mysql://10.77.70.85:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + # username: sharding + # password: sharding + # connectionTimeoutMilliseconds: 30000 + # idleTimeoutMilliseconds: 60000 + # maxLifetimeMilliseconds: 1800000 + # maxPoolSize: 256 + # minPoolSize: 32 + # ds_1: + # url: jdbc:mysql://10.77.70.86:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + # username: sharding + # password: sharding + # connectionTimeoutMilliseconds: 30000 + # idleTimeoutMilliseconds: 60000 + # maxLifetimeMilliseconds: 1800000 + # maxPoolSize: 256 + # minPoolSize: 32 + # ds_2: + # url: jdbc:mysql://10.77.70.87:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + # username: sharding + # password: sharding + # connectionTimeoutMilliseconds: 30000 + # idleTimeoutMilliseconds: 60000 + # maxLifetimeMilliseconds: 1800000 + # maxPoolSize: 256 + # minPoolSize: 32 + # ds_3: + # url: jdbc:mysql://10.77.70.88:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + # username: sharding + # password: sharding + # connectionTimeoutMilliseconds: 30000 + # idleTimeoutMilliseconds: 60000 + # maxLifetimeMilliseconds: 1800000 + # maxPoolSize: 2100 + # minPoolSize: 32 + # ds_0: + # url: jdbc:mysql://127.0.0.1:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + # username: sharding + # password: sharding + # connectionTimeoutMilliseconds: 30000 + # idleTimeoutMilliseconds: 60000 + # maxLifetimeMilliseconds: 1800000 + # maxPoolSize: 2100 + # minPoolSize: 32 ds_0: - url: jdbc:mysql://9.134.213.233:6603/sharding_db?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + url: jdbc:mysql://11.99.55.8:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true username: sharding password: sharding connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 - maxPoolSize: 200 - minPoolSize: 1 + maxPoolSize: 256 + minPoolSize: 32 ds_1: - url: jdbc:mysql://9.134.123.45:6603/sharding_db?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + url: jdbc:mysql://11.99.55.9:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + username: sharding + password: sharding + connectionTimeoutMilliseconds: 30000 + idleTimeoutMilliseconds: 60000 + maxLifetimeMilliseconds: 1800000 + maxPoolSize: 256 + minPoolSize: 32 + ds_2: + url: jdbc:mysql://11.99.55.12:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true + username: sharding + password: sharding + connectionTimeoutMilliseconds: 30000 + idleTimeoutMilliseconds: 60000 + maxLifetimeMilliseconds: 1800000 + maxPoolSize: 256 + minPoolSize: 32 + ds_3: + url: jdbc:mysql://11.129.111.12:6603/harp?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true username: sharding password: sharding connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 - maxPoolSize: 200 - minPoolSize: 1 + maxPoolSize: 2100 + minPoolSize: 32 +# ds_0: +# url: jdbc:mysql://9.134.213.233:6603/sharding_db?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true +# username: sharding +# password: sharding +# connectionTimeoutMilliseconds: 30000 +# idleTimeoutMilliseconds: 60000 +# maxLifetimeMilliseconds: 1800000 +# maxPoolSize: 200 +# minPoolSize: 1 +# ds_1: +# url: jdbc:mysql://9.134.123.45:6603/sharding_db?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&pinGlobalTxToPhysicalConnection=true +# username: sharding +# password: sharding +# connectionTimeoutMilliseconds: 30000 +# idleTimeoutMilliseconds: 60000 +# maxLifetimeMilliseconds: 1800000 +# maxPoolSize: 200 +# minPoolSize: 1 rules: - !SHARDING bindingTables: - - warehouse, customer - - stock, district, order_line + - warehouse + - customer, item, oorder, history + - stock, district, order_line, new_order - usertable defaultDatabaseStrategy: none: @@ -164,79 +228,79 @@ rules: snowflake: type: SNOWFLAKE tables: -# ycsb + # ycsb usertable: - actualDataNodes: ds_${0..1}.usertable + actualDataNodes: ds_${0..3}.usertable databaseStrategy: standard: shardingColumn: ycsb_key - shardingAlgorithmName: mod_2 -# tpcc + shardingAlgorithmName: mod_4 + # tpcc config: actualDataNodes: ds_0.config warehouse: - actualDataNodes: ds_${0..1}.warehouse - databaseStrategy: - standard: - shardingColumn: w_id - shardingAlgorithmName: mod_2 + actualDataNodes: ds_0.warehouse + # databaseStrategy: + # standard: + # shardingColumn: w_id + # shardingAlgorithmName: mod_4 district: - actualDataNodes: ds_${0..1}.district + actualDataNodes: ds_${0..3}.district databaseStrategy: standard: shardingColumn: d_w_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 customer: - actualDataNodes: ds_${0..1}.customer + actualDataNodes: ds_${0..3}.customer databaseStrategy: standard: shardingColumn: c_w_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 item: - actualDataNodes: ds_${0..1}.item + actualDataNodes: ds_${0..3}.item databaseStrategy: standard: shardingColumn: i_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 history: - actualDataNodes: ds_${0..1}.history + actualDataNodes: ds_${0..3}.history databaseStrategy: standard: shardingColumn: h_w_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 oorder: - actualDataNodes: ds_${0..1}.oorder + actualDataNodes: ds_${0..3}.oorder databaseStrategy: standard: shardingColumn: o_w_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 stock: - actualDataNodes: ds_${0..1}.stock + actualDataNodes: ds_${0..3}.stock databaseStrategy: standard: shardingColumn: s_w_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 new_order: - actualDataNodes: ds_${0..1}.new_order + actualDataNodes: ds_${0..3}.new_order databaseStrategy: standard: shardingColumn: no_w_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 order_line: - actualDataNodes: ds_${0..1}.order_line + actualDataNodes: ds_${0..3}.order_line databaseStrategy: standard: shardingColumn: ol_w_id - shardingAlgorithmName: mod_2 + shardingAlgorithmName: mod_4 shardingAlgorithms: mod_4: diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java index 13f29978400..e6c3195b60a 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java @@ -111,9 +111,11 @@ private List startInternal(final int port, final List add futures.add(bootstrap.bind(address, port).sync()); } - Map ips = Latency.getInstance().getSrcToIp(); - for (Map.Entry each : ips.entrySet()) { - startAsyncMessageInternal(3308, each.getValue()); + if (Latency.getInstance().asyncPreparation()) { + Map ips = Latency.getInstance().getSrcToIp(); + for (Map.Entry each : ips.entrySet()) { + startAsyncMessageInternal(3308, each.getValue()); + } } return futures; @@ -150,7 +152,7 @@ private void accept(final List futures) throws InterruptedExcepti private void createEventLoopGroup() { bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1); workerGroup = getWorkerGroup(); - asyncGroup = new NioEventLoopGroup(); + asyncGroup = new NioEventLoopGroup(8); } private EventLoopGroup getWorkerGroup() { diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java index 18dcddfca2b..61e68da6fbe 100644 --- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java +++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java @@ -222,10 +222,21 @@ public List execute() throws SQLException { } } - if (Latency.getInstance().NeedDelay()) { - boolean needPreAbort = analysisLatency((List>) executionGroupContext.getInputGroups()); - if (!needPreAbort) { - throw new SQLException("this transaction is most likely to timeout, pre-abort in harp"); + int blockCnt = 0; + while (Latency.getInstance().NeedDelay()) { + boolean normalExecute = analysisLatency((List>) executionGroupContext.getInputGroups()); + if (blockCnt > 10) { + connectionSession.setPreAbort(true); + throw new SQLException("this transaction is likely to timeout, rollback after 10 tries"); + } + if (normalExecute) { + break; + } + blockCnt++; + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @@ -244,6 +255,11 @@ private boolean analysisLatency(List> groupUni return true; } + for (ExecutionGroup each : groupUnits) { + ExecutionUnit executionUnit = each.getInputs().get(0).getExecutionUnit(); + executionUnit.clearAnalysis(); + } + int maxLatency = 0; for (ExecutionGroup each : groupUnits) { @@ -257,6 +273,9 @@ private boolean analysisLatency(List> groupUni for (QueryContext queryContext : dataSourcesToQueryContext.get(dataSourceName)) { String tableName = getTableNameFromSQLStatementContext(queryContext.getSqlStatementContext()); + if (!LocalLockTable.getInstance().isRegisterTable(tableName)) { + continue; + } int key = getKeyFromSQLStatementContext(queryContext.getSqlStatementContext()); if (Latency.getInstance().NeedPreAbort() || Latency.getInstance().NeedLatencyPredictionAndPreAbort()) { executionUnit.updateProbability(Objects.requireNonNull(LocalLockTable.getInstance().getLockMetaData(tableName, key)).nonBlockProbability()); @@ -286,6 +305,9 @@ private boolean analysisLatency(List> groupUni for (QueryContext queryContext : dataSourcesToQueryContext.get(dataSourceName)) { String tableName = getTableNameFromSQLStatementContext(queryContext.getSqlStatementContext()); + if (!LocalLockTable.getInstance().isRegisterTable(tableName)) { + continue; + } int key = getKeyFromSQLStatementContext(queryContext.getSqlStatementContext()); Objects.requireNonNull(LocalLockTable.getInstance().getLockMetaData(tableName, key)).incProcessing(); } @@ -358,6 +380,7 @@ private QueryHeader generateQueryHeader(QueryResultMetaData meta, int colIndex) private List executeMultiStatements(final ExecutionGroupContext executionGroupContext) throws SQLException { boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown(); + long start = System.nanoTime(); List result = new LinkedList<>(); Map storageTypes = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes(); if (isBatchInsert) { @@ -373,9 +396,8 @@ private List executeMultiStatements(final ExecutionGroupContext< } else { JDBCExecutorCallback> callback = new BatchedJDBCExecutorCallback(storageTypes, sqlStatementSample, isExceptionThrown); try { - long start = System.nanoTime() / 1000000; List> executeResults = jdbcExecutor.execute(executionGroupContext, callback); - // System.out.println("exectution time: " + (System.nanoTime() / 1000000 - start) + "ms; " + System.nanoTime() / 1000000 + "ms"); + System.out.println("JDBC execution time: " + (System.nanoTime() - start) / 1000000 + "ms;"); feedback((List>) executionGroupContext.getInputGroups(), true); boolean first = false; @@ -475,7 +497,6 @@ protected List executeSQL(final String sql, final Statement state try { long start = System.nanoTime(); resultsAvailable = statement.execute(sql); - System.out.println("True execute time: " + ((System.nanoTime() - start) / 1000) + "us"); List list = new ArrayList<>(); while (true) { @@ -492,6 +513,7 @@ protected List executeSQL(final String sql, final Statement state } resultsAvailable = statement.getMoreResults(); + System.out.println("True execute time: " + ((System.nanoTime() - start) / 1000000) + "ms"); } return list;