From 34970850572c8ffa4f294bf2aaabe9a178b95b57 Mon Sep 17 00:00:00 2001 From: QiYuZhuang Date: Mon, 6 Nov 2023 15:28:01 +0800 Subject: [PATCH] change machine --- .../metadata/SchemaMetaDataLoaderEngine.java | 2 +- .../infra/statistics/network/Latency.java | 31 +- .../transactions/AgentAsyncXAManager.java | 37 ++- .../transactions/AsyncMessageFromAgent.java | 28 +- .../infra/transactions/CustomXID.java | 45 ++- .../transactions/XATransactionState.java | 33 ++- .../executor/sql/context/ExecutionUnit.java | 8 +- .../infra/executor/sql/context/SQLUnit.java | 2 +- .../XAShardingSphereTransactionManager.java | 2 +- .../transaction/type/xa/provider/harp/pom.xml | 27 +- .../xa/harp/manager/CustomTransactionImp.java | 191 ++++++------ .../manager/CustomTransactionManager.java | 137 +++++---- .../transaction/xa/harp/manager/Decoder.java | 276 ++++++++++-------- .../harp/manager/HarpTransactionManager.java | 67 +++-- .../HarpTransactionManagerProvider.java | 32 +- .../manager/HarpXARecoverableResource.java | 22 +- .../xa/harp/manager/Scheduler.java | 108 ++++--- .../manager/ThreadTransactionContext.java | 32 +- .../xa/harp/manager/XAResourceKey.java | 32 +- .../harp/manager/XAResourceTransaction.java | 78 +++-- .../backend/connector/DatabaseConnector.java | 2 +- .../shardingsphere/proxy/Bootstrap.java | 1 + .../proxy/frontend/ShardingSphereProxy.java | 18 +- .../AsyncMessageChannelInboundHandler.java | 36 ++- .../frontend/netty/AsyncMessageDecoder.java | 29 +- .../netty/AsyncMessageHandlerInitializer.java | 18 ++ .../query/MySQLComQueryPacketExecutor.java | 2 +- .../query/MySQLMultiStatementsHandler.java | 18 +- .../sql/parser/sql/common/util/SQLUtils.java | 28 +- 29 files changed, 826 insertions(+), 516 deletions(-) diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/loader/metadata/SchemaMetaDataLoaderEngine.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/loader/metadata/SchemaMetaDataLoaderEngine.java index 1ef7bd79e82..34169aa826b 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/loader/metadata/SchemaMetaDataLoaderEngine.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/loader/metadata/SchemaMetaDataLoaderEngine.java @@ -92,7 +92,7 @@ private static Collection load(final SchemaMetaDataLoaderMateria private static Collection loadByDefault(final SchemaMetaDataLoaderMaterial material) throws SQLException { Collection tableMetaData = new LinkedList<>(); for (String each : material.getActualTableNames()) { -// TableMetaDataLoader.load(material.getDataSource(), each, material.getStorageType()).ifPresent(tableMetaData::add); + // TableMetaDataLoader.load(material.getDataSource(), each, material.getStorageType()).ifPresent(tableMetaData::add); } return Collections.singletonList(new SchemaMetaData(material.getDefaultSchemaName(), tableMetaData)); } diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java index 36cc7ad8744..85fef7d579d 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/statistics/network/Latency.java @@ -25,6 +25,7 @@ public final class Latency { private static final Latency INSTANCE = new Latency(0.8, 20); + @Getter private String algorithm; private final HashMap latencies; @Getter @@ -46,11 +47,11 @@ public Latency(double alpha, int windowSize) { public void AddDataSource(String src) { System.out.println("Add Source " + src); latencies.put(src, new double[windowSize + 1]); -// if (src.contains("ds_1")) { -// latencies.get(src)[windowSize] = 150; -// } else { -// latencies.get(src)[windowSize] = 20; -// } + // if (src.contains("ds_1")) { + // latencies.get(src)[windowSize] = 150; + // } else { + // latencies.get(src)[windowSize] = 20; + // } } public void SetDataSourceIp(String src, String ip) { @@ -104,13 +105,19 @@ public void SetAlgorithm(String alg) { public boolean NeedDelay() { return algorithm.equals("harp") || algorithm.equals("aharp"); } - - public boolean NeedLatencyPredict() { return algorithm.equals("harp_lp") || algorithm.equals("aharp_lp"); } - - public boolean NeedPreAbort() { return algorithm.equals("harp_pa") || algorithm.equals("aharp_pa"); } - - public boolean NeedLatencyPredictionAndPreAbort() { return algorithm.equals("harp_lppa") || algorithm.equals("aharp_lppa"); } - + + public boolean NeedLatencyPredict() { + return algorithm.equals("harp_lp") || algorithm.equals("aharp_lp"); + } + + public boolean NeedPreAbort() { + return algorithm.equals("harp_pa") || algorithm.equals("aharp_pa"); + } + + public boolean NeedLatencyPredictionAndPreAbort() { + return algorithm.equals("harp_lppa") || algorithm.equals("aharp_lppa"); + } + public boolean asyncPreparation() { return algorithm.equals("aharp") || algorithm.equals("aharp_lp") || algorithm.equals("aharp_pa") || algorithm.equals("aharp_lppa") || algorithm.equals("a"); diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java index 1b0d90c66e9..bfdb3caf1b9 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AgentAsyncXAManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.infra.transactions; import lombok.Getter; @@ -8,33 +25,35 @@ @Slf4j public class AgentAsyncXAManager { + static AgentAsyncXAManager Instance = new AgentAsyncXAManager(); - + @Getter @Setter private class XATransactionInfo { + XATransactionState state; String errorInfo; - + public XATransactionInfo(XATransactionState state) { this.state = state; } } - + public static AgentAsyncXAManager getInstance() { return Instance; } - + private final HashMap XAStates = new HashMap<>(); private long globalTid = 0; - + public synchronized long fetchAndAddGlobalTid() { return globalTid++; } - + public void setStateByXid(CustomXID xid, XATransactionState state) { if (XAStates.containsKey(xid)) { - assert(XAStates.get(xid).getState() == XATransactionState.ACTIVE); + assert (XAStates.get(xid).getState() == XATransactionState.ACTIVE); if (state == XATransactionState.IDLE) { // one phase commit XAStates.get(xid).setState(state); @@ -55,12 +74,12 @@ public void setStateByXid(CustomXID xid, XATransactionState state) { XAStates.put(xid, info); } } - + public void setErrorInfoByXid(CustomXID xid, String errorInfo) { assert XAStates.containsKey(xid); XAStates.get(xid).setErrorInfo(errorInfo); } - + public XATransactionState getStateByXid(CustomXID xid) { if (XAStates.get(xid) == null) { System.out.println(xid.toString()); diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AsyncMessageFromAgent.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AsyncMessageFromAgent.java index 20f9a042b27..074f295a838 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AsyncMessageFromAgent.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/AsyncMessageFromAgent.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.infra.transactions; import com.fasterxml.jackson.annotation.JsonCreator; @@ -10,14 +27,15 @@ @Getter @RequiredArgsConstructor public class AsyncMessageFromAgent implements Serializable { + private final String Xid; - + private final XATransactionState state; - + private final long currentTimeStamp; - + private final String SQLExceptionString; - + @JsonCreator public AsyncMessageFromAgent(@JsonProperty("state") String state, @JsonProperty("currentTimeStamp") long currentTimeStamp, @JsonProperty("xid") String xid, @JsonProperty("sqlexceptionString") String sqlexceptionString) { @@ -26,7 +44,7 @@ public AsyncMessageFromAgent(@JsonProperty("state") String state, @JsonProperty( this.Xid = xid; this.SQLExceptionString = sqlexceptionString; } - + @Override public String toString() { return "message{" + diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java index 165d79e082f..812e7d34ec4 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/CustomXID.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.infra.transactions; import java.util.Arrays; @@ -10,29 +27,30 @@ @ToString public class CustomXID { + byte[] myBqual; int myFormatId; byte[] myGtrid; String originStr; - + public CustomXID(byte[] gtrid, byte[] bqual, int formatId) { this.myGtrid = gtrid; this.myBqual = bqual; this.myFormatId = formatId; this.originStr = Arrays.toString(gtrid) + "," + Arrays.toString(bqual) + "," + formatId; } - + public CustomXID(byte[] gtrid, byte[] bqual) { this(gtrid, bqual, 1); } - + public CustomXID(String gtrid, String bqual) { this(gtrid.getBytes(), bqual.getBytes()); } - + public CustomXID(String str) { List list = Arrays.asList(str.split(",")); - + if (list.size() == 1) { this.myGtrid = list.get(0).getBytes(); this.myBqual = "".getBytes(); @@ -49,11 +67,12 @@ public CustomXID(String str) { this.myFormatId = Integer.parseInt(list.get(2).substring(2), 16); } else { this.myFormatId = Integer.parseInt(list.get(2)); - } } - + } + } + originStr = str; } - + public CustomXID(Xid xid) { if (xid instanceof MysqlXid) { this.myGtrid = xid.getGlobalTransactionId(); @@ -62,22 +81,22 @@ public CustomXID(Xid xid) { this.originStr = Arrays.toString(myGtrid) + "," + Arrays.toString(myBqual) + "," + myFormatId; } } - + @Override public boolean equals(Object obj) { boolean ret = false; if (obj instanceof CustomXID) { - CustomXID other = (CustomXID)obj; + CustomXID other = (CustomXID) obj; ret = originStr.equals(other.originStr); } - + return ret; } - + public int hashCode() { return (this.originStr.hashCode() / 100) * 100 + Integer.parseInt(String.valueOf(myFormatId)) % 100; } - + public String toString() { return this.originStr; } diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/XATransactionState.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/XATransactionState.java index ccb42eab19e..0901181d3f7 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/XATransactionState.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/transactions/XATransactionState.java @@ -1,21 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.infra.transactions; public enum XATransactionState { NO_TRANSACTION, - + ACTIVE, - + IDLE, - + PREPARED, - + COMMITTED, - + FAILED, - + ABORTED, - + ROLLBACK_ONLY, - + NUM_OF_STATES } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java index 0b7f1526d19..1514ba335df 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/ExecutionUnit.java @@ -48,7 +48,7 @@ public final class ExecutionUnit { private int networkLatency; private int realExecuteLatency; - + private long finishTime; private boolean isHarp = false; @@ -101,8 +101,10 @@ public long GetDelayTime() { public void SetDelayTime(long delayTime) { this.delayTime = delayTime; } - - public void SetFinishTime(long finishTime) { this.finishTime = finishTime; } + + public void SetFinishTime(long finishTime) { + this.finishTime = finishTime; + } public void CombineExecutionUnit(ExecutionUnit other) { sqlUnit.CombineSQLUnit(other.getSqlUnit()); diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java index 8a2340d07f4..ef76a815241 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/context/SQLUnit.java @@ -60,7 +60,7 @@ public void CombineSQLUnit(SQLUnit other) { parameters.addAll(other.getParameters()); tableRouteMappers.addAll(other.getTableRouteMappers()); } - + public void setLastQueryComment(boolean onePhase) { if (onePhase) { sql = "/*last one phase query*/" + sql; diff --git a/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java b/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java index b5f63678323..625e852c3e5 100644 --- a/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java +++ b/kernel/transaction/type/xa/core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java @@ -55,7 +55,7 @@ public final class XAShardingSphereTransactionManager implements ShardingSphereT @Override public void init(final Map databaseTypes, final Map dataSources, final String providerType) { -// dataSources.forEach((key, value) -> TypedSPILoader.getService(DataSourcePrivilegeChecker.class, databaseTypes.get(key).getType()).checkPrivilege(value)); + // dataSources.forEach((key, value) -> TypedSPILoader.getService(DataSourcePrivilegeChecker.class, databaseTypes.get(key).getType()).checkPrivilege(value)); xaTransactionManagerProvider = TypedSPILoader.getService(XATransactionManagerProvider.class, providerType); xaTransactionManagerProvider.init(); Map resourceDataSources = getResourceDataSources(dataSources); diff --git a/kernel/transaction/type/xa/provider/harp/pom.xml b/kernel/transaction/type/xa/provider/harp/pom.xml index ed2cb8013e2..9e22a06b266 100644 --- a/kernel/transaction/type/xa/provider/harp/pom.xml +++ b/kernel/transaction/type/xa/provider/harp/pom.xml @@ -1,17 +1,22 @@ - + 4.0.0 - shardingsphere-transaction-xa-provider org.apache.shardingsphere + shardingsphere-transaction-xa-provider 5.3.3-SNAPSHOT - 4.0.0 - + shardingsphere-transaction-xa-harp ${project.artifactId} - + + + 17 + 17 + UTF-8 + + org.apache.shardingsphere @@ -43,11 +48,5 @@ compile - - - 17 - 17 - UTF-8 - - - \ No newline at end of file + + diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java index c0be1b7b7e1..306db5ffa71 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionImp.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import com.atomikos.datasource.RecoverableResource; @@ -26,50 +43,51 @@ @Slf4j public class CustomTransactionImp implements Transaction { + @Getter private final String tid; private final Map xaResourceToResourceTransactionMap_; @Getter private final Scheduler synchronizationScheduler = new Scheduler(); - + private volatile Date startDate; private volatile Date timeoutDate; - + @Setter private volatile int status = 6; private volatile boolean timeout = false; - + CustomTransactionImp(String tid) { this.xaResourceToResourceTransactionMap_ = new HashMap<>(); this.tid = tid; } - + private synchronized void addXAResourceTransaction(XAResourceTransaction restx, XAResource xares) { this.xaResourceToResourceTransactionMap_.put(new XAResourceKey(xares), restx); } - + private synchronized void removeXAResourceTransaction(XAResource xares) { this.xaResourceToResourceTransactionMap_.remove(new XAResourceKey(xares)); } - + private static void rethrowAsJtaRollbackException(String msg, Throwable cause) throws RollbackException { RollbackException ret = new RollbackException(msg); ret.initCause(cause); throw ret; } - + private static void rethrowAsJtaHeuristicMixedException(String msg, Throwable cause) throws HeuristicMixedException { HeuristicMixedException ret = new HeuristicMixedException(msg); ret.initCause(cause); throw ret; } - + private static void rethrowAsJtaHeuristicRollbackException(String msg, Throwable cause) throws HeuristicRollbackException { HeuristicRollbackException ret = new HeuristicRollbackException(msg); ret.initCause(cause); throw ret; } - + @Override public void commit() throws RollbackException, SecurityException, IllegalStateException { // TODO @@ -90,14 +108,14 @@ public void commit() throws RollbackException, SecurityException, IllegalStateEx } } } - + public void syncCommit(boolean onePhase) throws XAException { boolean prepareSuccess = true; long enterTime = System.nanoTime(); System.out.println("Enter in commit: " + enterTime); // xa end try { - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); each.getXares().end(txn.getXid(), 67108864); } @@ -107,10 +125,10 @@ public void syncCommit(boolean onePhase) throws XAException { } long xaEndTime = System.nanoTime(); System.out.println("XA End Time: " + (xaEndTime - enterTime) / 1000); - + // xa prepare if (!onePhase) { - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); try { each.getXares().prepare(txn.getXid()); @@ -119,12 +137,12 @@ public void syncCommit(boolean onePhase) throws XAException { } } } - + long xaPrepareTime = System.nanoTime(); System.out.println("XA Prepare Time: " + (xaPrepareTime - xaEndTime) / 1000); - + if (prepareSuccess) { - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); if (onePhase) { assert xaResourceToResourceTransactionMap_.size() == 1; @@ -134,27 +152,26 @@ public void syncCommit(boolean onePhase) throws XAException { } } } else { - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); each.getXares().rollback(txn.getXid()); } } - + long xaCommitTime = System.nanoTime(); System.out.println("XA Commit Time: " + (xaCommitTime - xaPrepareTime) / 1000); } - + public void asyncCommit(boolean onePhase) throws InterruptedException, XAException { boolean prepareSuccess = true; - + if (onePhase) { long enterTime = System.nanoTime(); System.out.println("Enter in commit: " + enterTime); - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); while (AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.IDLE && - AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ROLLBACK_ONLY - ) { + AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ROLLBACK_ONLY) { Thread.sleep(1); } if (AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.ROLLBACK_ONLY) { @@ -164,7 +181,7 @@ public void asyncCommit(boolean onePhase) throws InterruptedException, XAExcepti long finishAsyncTime = System.nanoTime(); System.out.println("Txn " + tid + " Wait Async Commit Time: " + (finishAsyncTime - enterTime) / 1000 + "us"); try { - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); if (prepareSuccess) { each.getXares().commit(txn.getXid(), true); @@ -185,24 +202,22 @@ public void asyncCommit(boolean onePhase) throws InterruptedException, XAExcepti } else { long enterTime = System.nanoTime(); System.out.println("Enter in commit: " + enterTime); - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); while (AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.PREPARED && AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.FAILED && - AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ROLLBACK_ONLY - ) { + AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ROLLBACK_ONLY) { Thread.sleep(1); } if (AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.ROLLBACK_ONLY || - AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.FAILED - ) { + AgentAsyncXAManager.getInstance().getStateByXid(xid) == XATransactionState.FAILED) { prepareSuccess = false; } } long finishAsyncTime = System.nanoTime(); System.out.println("Txn " + tid + " Wait Async Commit Time: " + (finishAsyncTime - enterTime) / 1000 + "us"); List threadList = new LinkedList<>(); - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { boolean finalPrepareSuccess = prepareSuccess; threadList.add(new Thread(() -> { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); @@ -222,22 +237,22 @@ public void asyncCommit(boolean onePhase) throws InterruptedException, XAExcepti } })); } - for (Thread each: threadList) { + for (Thread each : threadList) { each.start(); } - for (Thread each: threadList) { + for (Thread each : threadList) { each.join(); } long endTime = System.nanoTime(); System.out.println("Txn " + tid + " Finish Commit Time: " + (endTime - finishAsyncTime) / 1000 + "us"); } - + } - + @Override public boolean delistResource(XAResource xares, int flag) throws IllegalStateException, SystemException { log.debug("delistResource ( " + xares + " ) with transaction " + this.toString()); - + XAResourceTransaction active = this.findXAResourceTransaction(xares); String msg; if (active == null) { @@ -251,7 +266,7 @@ public boolean delistResource(XAResource xares, int flag) throws IllegalStateExc log.warn(msg); throw new SystemException(msg); } - + try { active.xaSuspend(); } catch (XAException var5) { @@ -263,17 +278,17 @@ public boolean delistResource(XAResource xares, int flag) throws IllegalStateExc } catch (ResourceException var6) { throw new ExtendedSystemException("Error in delisting the given XAResource", var6); } - + this.removeXAResourceTransaction(xares); if (flag == 536870912) { this.setRollbackOnly(); } } - + return true; } } - + @Override public boolean enlistResource(XAResource xares) throws RollbackException, IllegalStateException, SystemException { TransactionalResource res = null; @@ -304,7 +319,7 @@ public boolean enlistResource(XAResource xares) throws RollbackException, Illega log.warn(msg); throw new IllegalStateException(msg); } - + try { suspendedXAResourceTransaction.setXAResource(xares); suspendedXAResourceTransaction.xaResume(); @@ -312,18 +327,18 @@ public boolean enlistResource(XAResource xares) throws RollbackException, Illega if (100 <= var9.errorCode && var9.errorCode <= 107) { rethrowAsJtaRollbackException("Transaction was already rolled back inside the back-end resource. Further enlists are useless.", var9); } - + throw new ExtendedSystemException("Unexpected error during enlist", var9); } } else { res = this.findRecoverableResourceForXaResource(xares); - + if (res == null) { msg = "There is no registered resource that can recover the given XAResource instance. \nPlease register a corresponding resource first."; log.warn(msg); throw new SystemException(msg); } - + try { restx = new XAResourceTransaction((XATransactionalResource) res, tid, tid); restx.setXAResource(xares); @@ -333,44 +348,44 @@ public boolean enlistResource(XAResource xares) throws RollbackException, Illega } catch (RuntimeException var8) { throw var8; } - + this.addXAResourceTransaction(restx, xares); } - + assert restx != null; registerIntoAsyncXAManager(restx.getXid()); return true; } } - + private void registerIntoAsyncXAManager(Xid xid) { CustomXID test = new CustomXID(SQLUtils.xidToHex(xid)); - + AgentAsyncXAManager.getInstance().setStateByXid(test, XATransactionState.ACTIVE); } - + private synchronized XAResourceTransaction findXAResourceTransaction(XAResource xares) { XAResourceTransaction ret = null; - ret = (XAResourceTransaction)this.xaResourceToResourceTransactionMap_.get(new XAResourceKey(xares)); + ret = (XAResourceTransaction) this.xaResourceToResourceTransactionMap_.get(new XAResourceKey(xares)); if (ret != null) { this.assertActiveOrSuspended(ret); } - + return ret; } - + private void assertActiveOrSuspended(XAResourceTransaction restx) { if (!restx.isActive() && !restx.isXaSuspended()) { log.warn("Unexpected resource transaction state for " + restx); } - + } - + private TransactionalResource findRecoverableResourceForXaResource(XAResource xares) { TransactionalResource ret = null; - synchronized(Configuration.class) { + synchronized (Configuration.class) { Collection resources = Configuration.getResources(); - + for (RecoverableResource rres : resources) { if (rres instanceof XATransactionalResource) { XATransactionalResource xatxres = (XATransactionalResource) rres; @@ -379,26 +394,26 @@ private TransactionalResource findRecoverableResourceForXaResource(XAResource xa } } } - + return ret; } } - + @Override public int getStatus() throws SystemException { return this.status; } - + public void setActive(int timeout) throws IllegalStateException, SystemException { if (this.status != 6) { throw new IllegalStateException("transaction has already started"); } else { this.setStatus(0); this.startDate = new Date(System.currentTimeMillis()); - this.timeoutDate = new Date(System.currentTimeMillis() + (long)timeout * 1000L); + this.timeoutDate = new Date(System.currentTimeMillis() + (long) timeout * 1000L); } } - + @Override public void registerSynchronization(Synchronization synchronization) throws RollbackException, IllegalStateException, SystemException { if (this.status == 6) { @@ -411,11 +426,11 @@ public void registerSynchronization(Synchronization synchronization) throws Roll if (log.isDebugEnabled()) { log.debug("registering synchronization " + synchronization); } - + this.synchronizationScheduler.add(synchronization, Scheduler.DEFAULT_POSITION); } } - + @SneakyThrows @Override public void rollback() throws IllegalStateException, SystemException { @@ -425,11 +440,11 @@ public void rollback() throws IllegalStateException, SystemException { syncRollback(); } } - + public void syncRollback() throws XAException { // xa end try { - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); each.getXares().end(txn.getXid(), 67108864); } @@ -437,27 +452,26 @@ public void syncRollback() throws XAException { log.error("xa end failed."); throw e; } - + // xa rollback - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); each.getXares().rollback(txn.getXid()); } } - + public void asyncRollback() throws InterruptedException, XAException { - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { CustomXID xid = new CustomXID(SQLUtils.xidToHex(xaResourceToResourceTransactionMap_.get(each).getXid())); while (AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.PREPARED && AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.FAILED && AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.ROLLBACK_ONLY && - AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.IDLE - ) { + AgentAsyncXAManager.getInstance().getStateByXid(xid) != XATransactionState.IDLE) { Thread.sleep(1); } } List threadList = new LinkedList<>(); - for(XAResourceKey each: xaResourceToResourceTransactionMap_.keySet()) { + for (XAResourceKey each : xaResourceToResourceTransactionMap_.keySet()) { threadList.add(new Thread(() -> { XAResourceTransaction txn = xaResourceToResourceTransactionMap_.get(each); try { @@ -468,15 +482,14 @@ public void asyncRollback() throws InterruptedException, XAException { } })); } - for (Thread each: threadList) { + for (Thread each : threadList) { each.start(); } - for (Thread each: threadList) { + for (Thread each : threadList) { each.join(); } } - - + @Override public void setRollbackOnly() throws IllegalStateException, SystemException { if (this.status == 6) { @@ -487,27 +500,26 @@ public void setRollbackOnly() throws IllegalStateException, SystemException { this.setStatus(1); } } - + public boolean equals(Object o) { if (o instanceof CustomTransactionImp) { - CustomTransactionImp other = (CustomTransactionImp)o; + CustomTransactionImp other = (CustomTransactionImp) o; return this.tid.equals(other.tid); } else { return false; } } - + public int hashCode() { return this.tid.hashCode(); } - + public String toString() { return "a Custom Transaction with Tid [" + this.tid + "], status=" + Decoder.decodeStatus(this.status); } - - + void suspendEnlistedXaResources() throws ExtendedSystemException { - + for (XAResourceTransaction resTx : this.xaResourceToResourceTransactionMap_.values()) { try { resTx.xaSuspend(); @@ -515,16 +527,15 @@ void suspendEnlistedXaResources() throws ExtendedSystemException { throw new ExtendedSystemException("Error in suspending the given XAResource", var4); } } - + } - - + void resumeEnlistedXaReources() throws ExtendedSystemException { Iterator xaResourceTransactions = this.xaResourceToResourceTransactionMap_.values().iterator(); - - while(xaResourceTransactions.hasNext()) { + + while (xaResourceTransactions.hasNext()) { XAResourceTransaction resTx = xaResourceTransactions.next(); - + try { resTx.xaResume(); xaResourceTransactions.remove(); @@ -533,7 +544,7 @@ void resumeEnlistedXaReources() throws ExtendedSystemException { } } } - + private boolean isDone() { switch (this.status) { case 2: @@ -549,7 +560,7 @@ private boolean isDone() { return false; } } - + private boolean isWorking() { switch (this.status) { case 2: diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionManager.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionManager.java index 97f46369fb7..47c2464b474 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionManager.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/CustomTransactionManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import com.atomikos.icatch.SysException; @@ -17,10 +34,11 @@ @Slf4j public class CustomTransactionManager implements TransactionManager, Referenceable, UserTransaction { + private static final CustomTransactionManager singleton = new CustomTransactionManager(); private final Map jtaTransactionToCoreTransactionMap; // Tid -> CustomTransactionImp private final Map contexts; - + private static void raiseNoTransaction() { StringBuilder msg = new StringBuilder(); msg.append("This method needs a transaction for the calling thread and none exists.\n"); @@ -33,72 +51,72 @@ private static void raiseNoTransaction() { log.warn(msg.toString()); throw new IllegalStateException(msg.toString()); } - + public static TransactionManager getTransactionManager() { return singleton; } - + private CustomTransactionManager() { this.jtaTransactionToCoreTransactionMap = new ConcurrentHashMap<>(128, 0.75F, 128); this.contexts = new ConcurrentHashMap<>(128, 0.75F, 128); } - + private void addToMap(String tid, CustomTransactionImp tx) { - synchronized(this.jtaTransactionToCoreTransactionMap) { + synchronized (this.jtaTransactionToCoreTransactionMap) { this.jtaTransactionToCoreTransactionMap.put(tid, tx); } } - + private void removeFromMap(String tid) { - synchronized(this.jtaTransactionToCoreTransactionMap) { + synchronized (this.jtaTransactionToCoreTransactionMap) { this.jtaTransactionToCoreTransactionMap.remove(tid); } } - + CustomTransactionImp getTransactionWithId(String tid) { - synchronized(this.jtaTransactionToCoreTransactionMap) { + synchronized (this.jtaTransactionToCoreTransactionMap) { return this.jtaTransactionToCoreTransactionMap.get(tid); } } - + public Transaction getTransaction(String tid) { return this.getTransactionWithId(tid); } - + public Transaction getTransaction() { return this.getCurrentTransaction(); } - + public CustomTransactionImp getCurrentTransaction() { return this.contexts.get(Thread.currentThread()) == null ? null : this.getOrCreateCurrentContext().getTransaction(); } - + private ThreadTransactionContext getOrCreateCurrentContext() { ThreadTransactionContext threadContext = this.contexts.get(Thread.currentThread()); if (threadContext == null) { if (log.isDebugEnabled()) { log.debug("creating new thread context"); } - + threadContext = new ThreadTransactionContext(); this.setCurrentContext(threadContext); } - + return threadContext; } - + private void setCurrentContext(ThreadTransactionContext context) { if (log.isDebugEnabled()) { log.debug("changing current thread context to " + context); } - + if (context == null) { throw new IllegalArgumentException("setCurrentContext() should not be called with a null context, clearCurrentContextForSuspension() should be used instead"); } else { this.contexts.put(Thread.currentThread(), context); } } - + private CustomTransactionImp establishTransaction(String tid) { CustomTransactionImp transaction = this.getTransactionWithId(tid); if (transaction == null) { @@ -106,38 +124,38 @@ private CustomTransactionImp establishTransaction(String tid) { } return transaction; } - + private CustomTransactionImp createTransaction(String tid) { CustomTransactionImp ret = new CustomTransactionImp(tid); this.addToMap(tid, ret); return ret; } - + public void committed(String tid) { this.removeFromMap(tid); } - + public void rolledback(String tid) { this.removeFromMap(tid); } - + @Override public Reference getReference() { return new Reference(this.getClass().getName(), new StringRefAddr("name", "TransactionManager"), TransactionManagerFactory.class.getName(), null); } - + @Override public void begin() throws SystemException, NotSupportedException { CustomTransactionImp currentTx = this.getCurrentTransaction(); - if (currentTx != null) { + if (currentTx != null) { throw new NotSupportedException("nested transactions not supported"); } String tid = String.valueOf(AgentAsyncXAManager.getInstance().fetchAndAddGlobalTid()); currentTx = this.establishTransaction(tid); this.getOrCreateCurrentContext().setTransaction(currentTx); - + ClearContextSynchronization clearContextSynchronization = new ClearContextSynchronization(currentTx); - + try { currentTx.getSynchronizationScheduler().add(clearContextSynchronization, Scheduler.ALWAYS_LAST_POSITION - 1); currentTx.setActive(this.getOrCreateCurrentContext().getTimeout()); @@ -146,27 +164,27 @@ public void begin() throws SystemException, NotSupportedException { throw ex; } } - + @Override public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException { Transaction tx = this.getTransaction(); if (tx == null) { raiseNoTransaction(); } - + assert tx != null; - ClearContextSynchronization clearContextSynchronization = new ClearContextSynchronization((CustomTransactionImp)tx); + ClearContextSynchronization clearContextSynchronization = new ClearContextSynchronization((CustomTransactionImp) tx); try { tx.commit(); clearContextSynchronization.afterCompletion(3); - committed(((CustomTransactionImp)tx).getTid()); + committed(((CustomTransactionImp) tx).getTid()); } catch (RollbackException | HeuristicMixedException | HeuristicRollbackException | SecurityException | IllegalStateException | SystemException ex) { clearContextSynchronization.afterCompletion(4); - rolledback(((CustomTransactionImp)tx).getTid()); + rolledback(((CustomTransactionImp) tx).getTid()); throw ex; } } - + @Override public int getStatus() throws SystemException { Transaction tx = this.getTransaction(); @@ -176,23 +194,23 @@ public int getStatus() throws SystemException { } else { ret = tx.getStatus(); } - + return ret; } - + @Override public void resume(Transaction transaction) throws InvalidTransactionException, IllegalStateException, SystemException { if (transaction instanceof CustomTransactionImp) { - CustomTransactionImp ret = (CustomTransactionImp)transaction; - + CustomTransactionImp ret = (CustomTransactionImp) transaction; + try { -// this.compositeTransactionManager.resume(tximp.getCT()); + // this.compositeTransactionManager.resume(tximp.getCT()); } catch (SysException var5) { String msg = "Unexpected error while resuming the transaction in the calling thread"; log.error(msg, var5); throw new ExtendedSystemException(msg, var5); } - + ret.resumeEnlistedXaReources(); } else { String msg = "The specified transaction object is invalid for this configuration: " + transaction; @@ -200,34 +218,34 @@ public void resume(Transaction transaction) throws InvalidTransactionException, throw new InvalidTransactionException(msg); } } - + @Override public void rollback() throws IllegalStateException, SecurityException, SystemException { Transaction tx = this.getTransaction(); if (tx == null) { raiseNoTransaction(); } - + assert tx != null; - ClearContextSynchronization clearContextSynchronization = new ClearContextSynchronization((CustomTransactionImp)tx); - + ClearContextSynchronization clearContextSynchronization = new ClearContextSynchronization((CustomTransactionImp) tx); + try { tx.rollback(); - rolledback(((CustomTransactionImp)tx).getTid()); + rolledback(((CustomTransactionImp) tx).getTid()); } catch (SecurityException | IllegalStateException | SystemException ex) { throw ex; } finally { clearContextSynchronization.afterCompletion(4); } } - + @Override public void setRollbackOnly() throws IllegalStateException, SystemException { Transaction tx = this.getTransaction(); if (tx == null) { raiseNoTransaction(); } - + try { assert tx != null; tx.setRollbackOnly(); @@ -237,7 +255,7 @@ public void setRollbackOnly() throws IllegalStateException, SystemException { throw new ExtendedSystemException(msg, var4); } } - + @Override public void setTransactionTimeout(int seconds) throws SystemException { if (seconds >= 0) { @@ -246,56 +264,57 @@ public void setTransactionTimeout(int seconds) throws SystemException { assert false; } } - + @Override public Transaction suspend() throws SystemException { - CustomTransactionImp ret = (CustomTransactionImp)this.getTransaction(); + CustomTransactionImp ret = (CustomTransactionImp) this.getTransaction(); if (ret != null) { ret.suspendEnlistedXaResources(); this.clearCurrentContextForSuspension(); } - + return ret; } - + private void clearCurrentContextForSuspension() { if (log.isDebugEnabled()) { log.debug("clearing current thread context: " + this.getOrCreateCurrentContext()); } - + this.contexts.remove(Thread.currentThread()); if (log.isDebugEnabled()) { log.debug("cleared current thread context: " + this.getOrCreateCurrentContext()); } } - + private class ClearContextSynchronization implements Synchronization { + private final CustomTransactionImp currentTx; - + public ClearContextSynchronization(CustomTransactionImp currentTx) { this.currentTx = currentTx; } - + public void beforeCompletion() { } - + public void afterCompletion(int status) { Iterator> it = CustomTransactionManager.this.contexts.entrySet().iterator(); - - while(it.hasNext()) { + + while (it.hasNext()) { Map.Entry entry = it.next(); ThreadTransactionContext context = entry.getValue(); if (context.getTransaction() == this.currentTx) { if (log.isDebugEnabled()) { log.debug("clearing thread context: " + context); } - + it.remove(); break; } } } - + public String toString() { return "a ClearContextSynchronization for " + this.currentTx; } diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Decoder.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Decoder.java index 2f7ac7bbad1..2f0b09d7554 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Decoder.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Decoder.java @@ -1,141 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import javax.transaction.xa.XAException; public class Decoder { - public static String decodeXAExceptionErrorCode(XAException ex) { - switch (ex.errorCode) { - case -9: - return "XAER_OUTSIDE"; - case -8: - return "XAER_DUPID"; - case -7: - return "XAER_RMFAIL"; - case -6: - return "XAER_PROTO"; - case -5: - return "XAER_INVAL"; - case -4: - return "XAER_NOTA"; - case -3: - return "XAER_RMERR"; - case -2: - return "XAER_ASYNC"; - case 5: - return "XA_HEURMIX"; - case 6: - return "XA_HEURRB"; - case 7: - return "XA_HEURCOM"; - case 8: - return "XA_HEURHAZ"; - case 100: - return "XA_RBROLLBACK"; - case 101: - return "XA_RBCOMMFAIL"; - case 102: - return "XA_RBDEADLOCK"; - case 103: - return "XA_RBINTEGRITY"; - case 104: - return "XA_RBOTHER"; - case 105: - return "XA_RBPROTO"; - case 106: - return "XA_RBTIMEOUT"; - case 107: - return "XA_RBTRANSIENT"; - default: - return "!invalid error code (" + ex.errorCode + ")!"; - } + + public static String decodeXAExceptionErrorCode(XAException ex) { + switch (ex.errorCode) { + case -9: + return "XAER_OUTSIDE"; + case -8: + return "XAER_DUPID"; + case -7: + return "XAER_RMFAIL"; + case -6: + return "XAER_PROTO"; + case -5: + return "XAER_INVAL"; + case -4: + return "XAER_NOTA"; + case -3: + return "XAER_RMERR"; + case -2: + return "XAER_ASYNC"; + case 5: + return "XA_HEURMIX"; + case 6: + return "XA_HEURRB"; + case 7: + return "XA_HEURCOM"; + case 8: + return "XA_HEURHAZ"; + case 100: + return "XA_RBROLLBACK"; + case 101: + return "XA_RBCOMMFAIL"; + case 102: + return "XA_RBDEADLOCK"; + case 103: + return "XA_RBINTEGRITY"; + case 104: + return "XA_RBOTHER"; + case 105: + return "XA_RBPROTO"; + case 106: + return "XA_RBTIMEOUT"; + case 107: + return "XA_RBTRANSIENT"; + default: + return "!invalid error code (" + ex.errorCode + ")!"; } - - public static String decodeStatus(int status) { - switch (status) { - case 0: - return "ACTIVE"; - case 1: - return "MARKED_ROLLBACK"; - case 2: - return "PREPARED"; - case 3: - return "COMMITTED"; - case 4: - return "ROLLEDBACK"; - case 5: - return "UNKNOWN"; - case 6: - return "NO_TRANSACTION"; - case 7: - return "PREPARING"; - case 8: - return "COMMITTING"; - case 9: - return "ROLLING_BACK"; - default: - return "!incorrect status (" + status + ")!"; - } + } + + public static String decodeStatus(int status) { + switch (status) { + case 0: + return "ACTIVE"; + case 1: + return "MARKED_ROLLBACK"; + case 2: + return "PREPARED"; + case 3: + return "COMMITTED"; + case 4: + return "ROLLEDBACK"; + case 5: + return "UNKNOWN"; + case 6: + return "NO_TRANSACTION"; + case 7: + return "PREPARING"; + case 8: + return "COMMITTING"; + case 9: + return "ROLLING_BACK"; + default: + return "!incorrect status (" + status + ")!"; } - - public static String decodeXAResourceFlag(int flag) { - switch (flag) { - case 0: - return "NOFLAGS"; - case 2097152: - return "JOIN"; - case 8388608: - return "ENDRSCAN"; - case 16777216: - return "STARTRSCAN"; - case 33554432: - return "SUSPEND"; - case 67108864: - return "SUCCESS"; - case 134217728: - return "RESUME"; - case 536870912: - return "FAIL"; - case 1073741824: - return "ONEPHASE"; - default: - return "!invalid flag (" + flag + ")!"; - } + } + + public static String decodeXAResourceFlag(int flag) { + switch (flag) { + case 0: + return "NOFLAGS"; + case 2097152: + return "JOIN"; + case 8388608: + return "ENDRSCAN"; + case 16777216: + return "STARTRSCAN"; + case 33554432: + return "SUSPEND"; + case 67108864: + return "SUCCESS"; + case 134217728: + return "RESUME"; + case 536870912: + return "FAIL"; + case 1073741824: + return "ONEPHASE"; + default: + return "!invalid flag (" + flag + ")!"; } - - public static String decodePrepareVote(int vote) { - switch (vote) { - case 0: - return "XA_OK"; - case 3: - return "XA_RDONLY"; - default: - return "!invalid return code (" + vote + ")!"; - } + } + + public static String decodePrepareVote(int vote) { + switch (vote) { + case 0: + return "XA_OK"; + case 3: + return "XA_RDONLY"; + default: + return "!invalid return code (" + vote + ")!"; } - - public static String decodeHeaderState(byte state) { - switch (state) { - case -1: - return "UNCLEAN_LOG_STATE"; - case 0: - return "CLEAN_LOG_STATE"; - default: - return "!invalid state (" + state + ")!"; - } + } + + public static String decodeHeaderState(byte state) { + switch (state) { + case -1: + return "UNCLEAN_LOG_STATE"; + case 0: + return "CLEAN_LOG_STATE"; + default: + return "!invalid state (" + state + ")!"; } - - public static String decodeXAStatefulHolderState(int state) { - switch (state) { - case 0: - return "CLOSED"; - case 1: - return "IN_POOL"; - case 2: - return "ACCESSIBLE"; - case 3: - return "NOT_ACCESSIBLE"; - default: - return "!invalid state (" + state + ")!"; - } + } + + public static String decodeXAStatefulHolderState(int state) { + switch (state) { + case 0: + return "CLOSED"; + case 1: + return "IN_POOL"; + case 2: + return "ACCESSIBLE"; + case 3: + return "NOT_ACCESSIBLE"; + default: + return "!invalid state (" + state + ")!"; } + } } diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManager.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManager.java index fe115c38c34..c2ecd06afef 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManager.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import com.atomikos.icatch.OrderedLifecycleComponent; @@ -16,49 +33,49 @@ @Getter @Setter public class HarpTransactionManager implements TransactionManager, Serializable, Referenceable, UserTransaction, OrderedLifecycleComponent { + boolean rollbackOnly; -// private transient TransactionManagerImp tm; + // private transient TransactionManagerImp tm; private CustomTransactionManager tm; private boolean forceShutdown; private boolean startupTransactionService = true; private boolean closed = false; private boolean coreStartedHere; int transactionTimeout; - + private void checkSetup() throws SystemException { if (!this.closed) { this.initializeTransactionManagerSingleton(); } } - + private void initializeTransactionManagerSingleton() throws SystemException { - this.tm = (CustomTransactionManager)CustomTransactionManager.getTransactionManager(); + this.tm = (CustomTransactionManager) CustomTransactionManager.getTransactionManager(); if (this.tm == null) { if (!this.getStartupTransactionService()) { throw new SystemException("Transaction service not running"); } - + this.startupTransactionService(); - this.tm = (CustomTransactionManager)CustomTransactionManager.getTransactionManager(); + this.tm = (CustomTransactionManager) CustomTransactionManager.getTransactionManager(); } } - + private void startupTransactionService() { this.coreStartedHere = Configuration.init(); } - + public HarpTransactionManager() { } - + public void setStartupTransactionService(boolean startup) { this.startupTransactionService = startup; } - + public boolean getStartupTransactionService() { return this.startupTransactionService; } - - + @Override public void begin() throws NotSupportedException, SystemException { if (this.closed) { @@ -69,7 +86,7 @@ public void begin() throws NotSupportedException, SystemException { this.tm.begin(); } } - + @Override public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException { if (this.closed) { @@ -80,70 +97,70 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi this.tm.commit(); } } - + @Override public int getStatus() throws SystemException { this.checkSetup(); return this.tm.getStatus(); } - + @Override public Transaction getTransaction() throws SystemException { this.checkSetup(); return this.tm.getTransaction(); } - + @Override public void resume(Transaction transaction) throws InvalidTransactionException, IllegalStateException, SystemException { this.checkSetup(); this.tm.resume(transaction); } - + @Override public void rollback() throws IllegalStateException, SecurityException, SystemException { // TODO: this.tm.rollback(); } - + @Override public void setRollbackOnly() throws IllegalStateException, SystemException { this.tm.setRollbackOnly(); } - + @Override public Transaction suspend() throws SystemException { assert false; this.checkSetup(); return this.tm.suspend(); } - + @Override public Reference getReference() throws NamingException { return SerializableObjectFactory.createReference(this); } - + @Override public void init() throws SystemException { this.closed = false; this.checkSetup(); } - + @Override public void close() throws Exception { this.shutdownTransactionService(); this.closed = true; } - + public void setTransactionTimeout(int secs) throws SystemException { this.checkSetup(); this.tm.setTransactionTimeout(secs); } - + private void shutdownTransactionService() { if (this.coreStartedHere) { Configuration.shutdown(this.forceShutdown); this.coreStartedHere = false; } - + } } diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManagerProvider.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManagerProvider.java index 0fb8093ef49..38cd81be045 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManagerProvider.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpTransactionManagerProvider.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import com.atomikos.icatch.config.UserTransactionService; @@ -12,43 +29,44 @@ import javax.transaction.RollbackException; public class HarpTransactionManagerProvider implements XATransactionManagerProvider { + @Getter HarpTransactionManager transactionManager; private UserTransactionService userTransactionService; - + @Override public void init() { transactionManager = new HarpTransactionManager(); userTransactionService = new UserTransactionServiceImp(); userTransactionService.init(); } - + @Override public void registerRecoveryResource(String dataSourceName, XADataSource xaDataSource) { userTransactionService.registerResource(new HarpXARecoverableResource(dataSourceName, xaDataSource)); } - + @Override public void removeRecoveryResource(String dataSourceName, XADataSource xaDataSource) { userTransactionService.removeResource(new HarpXARecoverableResource(dataSourceName, xaDataSource)); } - + @SneakyThrows({SystemException.class, RollbackException.class}) @Override public void enlistResource(SingleXAResource singleXAResource) { transactionManager.getTransaction().enlistResource(singleXAResource); } - + @Override public void close() throws Exception { userTransactionService.shutdown(true); } - + @Override public String getType() { return "Harp"; } - + @Override public boolean isDefault() { return true; diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpXARecoverableResource.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpXARecoverableResource.java index 42b0906d14c..23561b46fea 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpXARecoverableResource.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/HarpXARecoverableResource.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import com.atomikos.datasource.xa.jdbc.JdbcTransactionalResource; @@ -7,13 +24,14 @@ import javax.transaction.xa.XAResource; public class HarpXARecoverableResource extends JdbcTransactionalResource { + private final String resourceName; - + HarpXARecoverableResource(final String serverName, final XADataSource xaDataSource) { super(serverName, xaDataSource); resourceName = serverName; } - + @Override public boolean usesXAResource(final XAResource xaResource) { return resourceName.equals(((SingleXAResource) xaResource).getResourceName()); diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Scheduler.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Scheduler.java index 0589205c07c..fd963c16f25 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Scheduler.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/Scheduler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import java.util.ArrayList; @@ -11,118 +28,120 @@ import java.util.TreeSet; public class Scheduler implements Iterable { + public static final Integer DEFAULT_POSITION = 0; public static final Integer ALWAYS_FIRST_POSITION = Integer.MIN_VALUE; public static final Integer ALWAYS_LAST_POSITION = Integer.MAX_VALUE; private List keys = new ArrayList(); private Map> objects = new TreeMap(); private int size = 0; - + public Scheduler() { } - + public synchronized void add(T obj, Integer position) { - List list = (List)this.objects.get(position); + List list = (List) this.objects.get(position); if (list == null) { if (!this.keys.contains(position)) { this.keys.add(position); Collections.sort(this.keys); } - + list = new ArrayList(); this.objects.put(position, list); } - - ((List)list).add(obj); + + ((List) list).add(obj); ++this.size; } - + public synchronized void remove(T obj) { Iterator it = this.iterator(); - + Object o; do { if (!it.hasNext()) { throw new NoSuchElementException("no such element: " + obj); } - + o = it.next(); - } while(o != obj); - + } while (o != obj); + it.remove(); } - + public synchronized SortedSet getNaturalOrderPositions() { return new TreeSet(this.objects.keySet()); } - + public synchronized SortedSet getReverseOrderPositions() { TreeSet result = new TreeSet(Collections.reverseOrder()); result.addAll(this.getNaturalOrderPositions()); return result; } - + public synchronized List getByNaturalOrderForPosition(Integer position) { - return (List)this.objects.get(position); + return (List) this.objects.get(position); } - + public synchronized List getByReverseOrderForPosition(Integer position) { List result = new ArrayList(this.getByNaturalOrderForPosition(position)); Collections.reverse(result); return result; } - + public synchronized int size() { return this.size; } - + public Iterator iterator() { return new SchedulerNaturalOrderIterator(); } - + public Iterator reverseIterator() { return new SchedulerReverseOrderIterator(); } - + public String toString() { return "a Scheduler with " + this.size() + " object(s) in " + this.getNaturalOrderPositions().size() + " position(s)"; } - + private final class SchedulerReverseOrderIterator implements Iterator { + private int nextKeyIndex; private List objectsOfCurrentKey; private int objectsOfCurrentKeyIndex; - + private SchedulerReverseOrderIterator() { this.nextKeyIndex = Scheduler.this.keys.size() - 1; } - + public void remove() { - synchronized(Scheduler.this) { + synchronized (Scheduler.this) { if (this.objectsOfCurrentKey == null) { throw new NoSuchElementException("iterator not yet placed on an element"); } else { --this.objectsOfCurrentKeyIndex; this.objectsOfCurrentKey.remove(this.objectsOfCurrentKeyIndex); if (this.objectsOfCurrentKey.size() == 0) { - Integer key = (Integer)Scheduler.this.keys.get(this.nextKeyIndex + 1); + Integer key = (Integer) Scheduler.this.keys.get(this.nextKeyIndex + 1); Scheduler.this.keys.remove(this.nextKeyIndex + 1); Scheduler.this.objects.remove(key); this.objectsOfCurrentKey = null; } - + Scheduler.this.size--; } } } - + public boolean hasNext() { - synchronized(Scheduler.this) { + synchronized (Scheduler.this) { if (this.objectsOfCurrentKey != null && this.objectsOfCurrentKeyIndex < this.objectsOfCurrentKey.size()) { return true; } else if (this.nextKeyIndex >= 0) { - Integer currentKey = (Integer)Scheduler.this.keys.get(this.nextKeyIndex--); - this.objectsOfCurrentKey = (List)Scheduler.this.objects.get(currentKey); + Integer currentKey = (Integer) Scheduler.this.keys.get(this.nextKeyIndex--); + this.objectsOfCurrentKey = (List) Scheduler.this.objects.get(currentKey); this.objectsOfCurrentKeyIndex = 0; return true; } else { @@ -130,9 +149,9 @@ public boolean hasNext() { } } } - + public T next() { - synchronized(Scheduler.this) { + synchronized (Scheduler.this) { if (!this.hasNext()) { throw new NoSuchElementException("iterator bounds reached"); } else { @@ -141,18 +160,19 @@ public T next() { } } } - + private final class SchedulerNaturalOrderIterator implements Iterator { + private int nextKeyIndex; private List objectsOfCurrentKey; private int objectsOfCurrentKeyIndex; - + private SchedulerNaturalOrderIterator() { this.nextKeyIndex = 0; } - + public void remove() { - synchronized(Scheduler.this) { + synchronized (Scheduler.this) { if (this.objectsOfCurrentKey == null) { throw new NoSuchElementException("iterator not yet placed on an element"); } else { @@ -160,24 +180,24 @@ public void remove() { this.objectsOfCurrentKey.remove(this.objectsOfCurrentKeyIndex); if (this.objectsOfCurrentKey.size() == 0) { --this.nextKeyIndex; - Integer key = (Integer)Scheduler.this.keys.get(this.nextKeyIndex); + Integer key = (Integer) Scheduler.this.keys.get(this.nextKeyIndex); Scheduler.this.keys.remove(this.nextKeyIndex); Scheduler.this.objects.remove(key); this.objectsOfCurrentKey = null; } - + Scheduler.this.size--; } } } - + public boolean hasNext() { - synchronized(Scheduler.this) { + synchronized (Scheduler.this) { if (this.objectsOfCurrentKey != null && this.objectsOfCurrentKeyIndex < this.objectsOfCurrentKey.size()) { return true; } else if (this.nextKeyIndex < Scheduler.this.keys.size()) { - Integer currentKey = (Integer)Scheduler.this.keys.get(this.nextKeyIndex++); - this.objectsOfCurrentKey = (List)Scheduler.this.objects.get(currentKey); + Integer currentKey = (Integer) Scheduler.this.keys.get(this.nextKeyIndex++); + this.objectsOfCurrentKey = (List) Scheduler.this.objects.get(currentKey); this.objectsOfCurrentKeyIndex = 0; return true; } else { @@ -185,9 +205,9 @@ public boolean hasNext() { } } } - + public T next() { - synchronized(Scheduler.this) { + synchronized (Scheduler.this) { if (!this.hasNext()) { throw new NoSuchElementException("iterator bounds reached"); } else { diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/ThreadTransactionContext.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/ThreadTransactionContext.java index 03edbb2a31c..1739e52a46f 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/ThreadTransactionContext.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/ThreadTransactionContext.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import lombok.Getter; @@ -8,20 +25,21 @@ @Setter @Slf4j public class ThreadTransactionContext { + private volatile CustomTransactionImp transaction; private volatile int timeout = 0; @Getter XATransactionState state; - + public ThreadTransactionContext() { state = XATransactionState.NO_TRANSACTION; transaction = null; } - + public CustomTransactionImp getTransaction() { return this.transaction; } - + public void setTransaction(CustomTransactionImp transaction) { if (transaction == null) { throw new IllegalArgumentException("transaction parameter cannot be null"); @@ -29,19 +47,19 @@ public void setTransaction(CustomTransactionImp transaction) { if (log.isDebugEnabled()) { log.debug("assigning <" + transaction + "> to <" + this + ">"); } - + this.transaction = transaction; } } - + public int getTimeout() { return this.timeout; } - + public void setTimeout(int timeout) { this.timeout = timeout; } - + public String toString() { return "a ThreadContext with transaction " + this.transaction; } diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceKey.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceKey.java index 23292fc1ca7..b9e6e4b708c 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceKey.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceKey.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import lombok.Getter; @@ -6,31 +23,32 @@ import javax.transaction.xa.XAResource; public class XAResourceKey { + @Getter private XAResource xares; - + public XAResourceKey(XAResource xares) { this.xares = xares; } - + public boolean equals(Object o) { boolean ret = false; if (o instanceof XAResourceKey) { - XAResourceKey other = (XAResourceKey)o; - + XAResourceKey other = (XAResourceKey) o; + try { ret = other.xares == this.xares || other.xares.isSameRM(this.xares); } catch (XAException var5) { } } - + return ret; } - + public int hashCode() { return this.xares.getClass().getName().toString().hashCode(); } - + public String toString() { return this.xares.toString(); } diff --git a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java index 7469de50857..e191837d20b 100644 --- a/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java +++ b/kernel/transaction/type/xa/provider/harp/src/main/java/org/apache/shardingsphere/transaction/xa/harp/manager/XAResourceTransaction.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.transaction.xa.harp.manager; import com.atomikos.datasource.ResourceException; @@ -20,6 +37,7 @@ @Slf4j public class XAResourceTransaction implements ResourceTransaction { + static final long serialVersionUID = -8227293322090019196L; private String tid; private String root; @@ -32,17 +50,17 @@ public class XAResourceTransaction implements ResourceTransaction { private transient XAResource xaresource; private transient boolean knownInResource; private transient int timeout; - + private static String interpretErrorCode(String resourceName, String opCode, Xid xid, int errorCode) { String msg = XAExceptionHelper.convertErrorCodeToVerboseMessage(errorCode); return "XA resource '" + resourceName + "': " + opCode + " for XID '" + xid + "' raised " + errorCode + ": " + msg; } - + private void setXid(Xid xid) { this.xid = xid; this.toString = "XAResourceTransaction: " + xid; } - + XAResourceTransaction(XATransactionalResource resource, String tid, String root) { assert resource != null; this.resource = resource; @@ -56,47 +74,47 @@ private void setXid(Xid xid) { this.isXaSuspended = false; this.knownInResource = false; } - + public int hashCode() { return this.xid.hashCode(); } - + public String toString() { return this.toString; } - + void setState(TxState state) { if (state.isHeuristic()) { log.warn("Heuristic termination of " + this.toString() + " with state " + state); } - + this.state = state; } - + public Xid getXid() { return xid; } - + public void setXAResource(XAResource xaresource) { if (log.isTraceEnabled()) { log.trace(this + ": about to switch to XAResource " + xaresource); } - + this.xaresource = xaresource; - + try { this.xaresource.setTransactionTimeout(this.timeout); } catch (XAException var4) { String msg = interpretErrorCode(this.resourcename, "setTransactionTimeout", this.xid, var4.errorCode); log.warn(msg, var4); } - + if (log.isTraceEnabled()) { log.trace("XAResourceTransaction " + this.getXid() + ": switched to XAResource " + xaresource); } - + } - + public synchronized void suspend() throws ResourceException { if (this.state.equals(TxState.ACTIVE)) { try { @@ -106,11 +124,11 @@ public synchronized void suspend() throws ResourceException { String msg = interpretErrorCode(this.resourcename, "end", this.xid, var3.errorCode); log.trace(msg, var3); } - + this.setState(TxState.LOCALLY_DONE); } } - + @Override public synchronized void resume() throws IllegalStateException, ResourceException { int flag; @@ -122,34 +140,34 @@ public synchronized void resume() throws IllegalStateException, ResourceExceptio if (this.knownInResource) { throw new IllegalStateException("Wrong state for resume: " + this.state); } - + flag = 0; logFlag = "XAResource.TMNOFLAGS"; } - + try { if (log.isDebugEnabled()) { log.debug("XAResource.start ( " + this.xid + " , " + logFlag + " ) on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } - + this.xaresource.start(this.xid, flag); } catch (XAException var5) { String msg = interpretErrorCode(this.resourcename, "resume", this.xid, var5.errorCode); log.warn(msg, var5); throw new ResourceException(msg, var5); } - + this.setState(TxState.ACTIVE); this.knownInResource = true; } - + public void xaSuspend() throws XAException { if (!this.isXaSuspended) { try { if (log.isDebugEnabled()) { log.debug("XAResource.suspend ( " + this.xid + " , XAResource.TMSUSPEND ) on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } - + this.xaresource.end(this.xid, 33554432); this.isXaSuspended = true; } catch (XAException var3) { @@ -158,15 +176,15 @@ public void xaSuspend() throws XAException { throw var3; } } - + } - + public void xaResume() throws XAException { try { if (log.isDebugEnabled()) { log.debug("XAResource.start ( " + this.xid + " , XAResource.TMRESUME ) on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } - + this.xaresource.start(this.xid, 134217728); this.isXaSuspended = false; } catch (XAException var3) { @@ -175,23 +193,23 @@ public void xaResume() throws XAException { throw var3; } } - + public boolean isXaSuspended() { return this.isXaSuspended; } - + public boolean isActive() { return this.state.equals(TxState.ACTIVE); } - + public String getURI() { return Arrays.toString(this.xid.getBranchQualifier()); } - + public String getResourceName() { return this.resourcename; } - + private void assertConnectionIsStillAlive() throws XAException { this.xaresource.isSameRM(this.xaresource); } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java index fed580a6e84..9119a41fad1 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java @@ -124,7 +124,7 @@ public final class DatabaseConnector implements DatabaseBackendHandler { private List queryHeaders; private MergedResult mergedResult; - + @Setter private boolean isLastQuery; diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java index 928e17bb06d..ddffa28552a 100644 --- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java +++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java @@ -105,6 +105,7 @@ private static String[] preProcessingArgs(final String[] args) { } else if (each.contains("--alg")) { String[] split = each.split("="); Latency.getInstance().SetAlgorithm(split[split.length - 1]); + System.out.println("algorithm is: " + Latency.getInstance().getAlgorithm()); } else { result.add(each); } diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java index f17e7e3a34a..13f29978400 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java @@ -58,9 +58,9 @@ public final class ShardingSphereProxy { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; - + private EventLoopGroup asyncGroup; - + public ShardingSphereProxy() { createEventLoopGroup(); Runtime.getRuntime().addShutdownHook(new Thread(this::close)); @@ -110,15 +110,15 @@ private List startInternal(final int port, final List add for (String address : addresses) { futures.add(bootstrap.bind(address, port).sync()); } - + Map ips = Latency.getInstance().getSrcToIp(); - for (Map.Entry each: ips.entrySet()) { + for (Map.Entry each : ips.entrySet()) { startAsyncMessageInternal(3308, each.getValue()); } - + return futures; } - + @SneakyThrows(InterruptedException.class) public void startAsyncMessageInternal(final int port, final String dst_address) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); @@ -128,12 +128,12 @@ public void startAsyncMessageInternal(final int port, final String dst_address) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_RCVBUF, 16 * 1024) .option(ChannelOption.SO_SNDBUF, 16 * 1024); - + // connect to agent bootstrap.connect(dst_address, port).sync(); System.out.println("connect to agent success, ip: " + dst_address); } - + private ChannelFuture startDomainSocket(final String socketPath) { ServerBootstrap bootstrap = new ServerBootstrap(); initServerBootstrap(bootstrap, new DomainSocketAddress(socketPath)); @@ -171,7 +171,7 @@ private void initServerBootstrap(final ServerBootstrap bootstrap) { .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerHandlerInitializer(FrontDatabaseProtocolTypeFactory.getDatabaseType())); } - + private void initServerBootstrap(final ServerBootstrap bootstrap, final DomainSocketAddress localDomainSocketAddress) { bootstrap.group(bossGroup, workerGroup) .channel(EpollServerDomainSocketChannel.class) diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageChannelInboundHandler.java index 3b641fb03d3..5147b9967f5 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageChannelInboundHandler.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageChannelInboundHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.proxy.frontend.netty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -19,14 +36,15 @@ */ @Slf4j public final class AsyncMessageChannelInboundHandler extends ChannelInboundHandlerAdapter { + private static ChannelHandlerContext context; private AsyncMessageFromAgent asyncMessageFromAgent = null; - + @Override public void channelActive(ChannelHandlerContext ctx) { context = ctx; } - + public static void sendMessage(String message) throws InterruptedException { if (context != null) { context.writeAndFlush(message).sync(); @@ -34,7 +52,7 @@ public static void sendMessage(String message) throws InterruptedException { System.err.println("ChannelHandlerContext is not initialized."); } } - + public static void sendMessage(byte[] message) throws InterruptedException { if (context != null) { context.writeAndFlush(Unpooled.wrappedBuffer(message)).sync(); @@ -42,7 +60,7 @@ public static void sendMessage(byte[] message) throws InterruptedException { System.err.println("ChannelHandlerContext is not initialized."); } } - + public static void sendMessage(ByteBuf message) throws InterruptedException { if (context != null) { context.writeAndFlush(message).sync(); @@ -50,7 +68,7 @@ public static void sendMessage(ByteBuf message) throws InterruptedException { System.err.println("ChannelHandlerContext is not initialized."); } } - + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof AsyncMessageFromAgent) { @@ -63,16 +81,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } } else if (msg instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) msg; - + byte[] out = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(byteBuf.readableBytes()); // read one message String content = new String(out, StandardCharsets.UTF_8); - + try { ObjectMapper mapper = new ObjectMapper(); AsyncMessageFromAgent message = mapper.readValue(content, AsyncMessageFromAgent.class); System.out.println("receive message: " + message.toString()); - + CustomXID xidFromMessage = new CustomXID(message.getXid()); AgentAsyncXAManager.getInstance().setStateByXid(xidFromMessage, message.getState()); if (!message.getSQLExceptionString().equals("")) { @@ -83,7 +101,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } } } - + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageDecoder.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageDecoder.java index ab1734403d7..0f53470fb8f 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageDecoder.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageDecoder.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.proxy.frontend.netty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -10,28 +27,28 @@ import java.util.List; public class AsyncMessageDecoder extends ByteToMessageDecoder { + @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List out) throws Exception { if (byteBuf.readableBytes() < 4) { return; // Insufficient message length information, waiting for more data } - + byteBuf.markReaderIndex(); // mark current position - + int contentLength = byteBuf.readInt(); // read message length if (byteBuf.readableBytes() < contentLength) { byteBuf.resetReaderIndex(); return; // wait for more message } - + byte[] contentBytes = new byte[contentLength]; byteBuf.readBytes(contentBytes); // read one message String content = new String(contentBytes, StandardCharsets.UTF_8); - + ObjectMapper mapper = new ObjectMapper(); AsyncMessageFromAgent message = mapper.readValue(content, AsyncMessageFromAgent.class); - + out.add(message); } } - diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageHandlerInitializer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageHandlerInitializer.java index 266a962b6b0..6a554089e0c 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageHandlerInitializer.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/AsyncMessageHandlerInitializer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.proxy.frontend.netty; import io.netty.channel.Channel; @@ -10,6 +27,7 @@ @RequiredArgsConstructor @Slf4j public class AsyncMessageHandlerInitializer extends ChannelInitializer { + @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java index 3fa22d3e225..b629c6c41bf 100644 --- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java +++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java @@ -70,7 +70,7 @@ public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final Conne this.connectionSession = connectionSession; DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL"); SQLStatement sqlStatement = parseSql1(packet.getSql(), databaseType); - + if (sqlStatement instanceof InsertStatement) { proxyBackendHandler = ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSql(), sqlStatement, connectionSession, packet.getHintValueContext()); } else { diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java index d7b929bd057..18dcddfca2b 100644 --- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java +++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java @@ -144,7 +144,7 @@ public MySQLMultiStatementsHandler(final ConnectionSession connectionSession, fi // Pattern pattern = sqlStatementSample instanceof UpdateStatement ? MULTI_UPDATE_STATEMENTS : MULTI_DELETE_STATEMENTS; List sqls = SQLUtils.splitMultiSQL(sql); isLastQuery = SQLUtils.isLastQuery(sql); - + assert (sqlStatements.size() == sqls.size()); Map> groupExecuteUnits = new HashMap<>(); @@ -162,7 +162,7 @@ public MySQLMultiStatementsHandler(final ConnectionSession connectionSession, fi dataSourcesToQueryContext.computeIfAbsent(dataSourceName, unused -> new LinkedList<>()).add(executionContext.getQueryContext()); } - + for (Map.Entry entry : connectionSession.getBackendConnection().getCachedConnections().entries()) { String dataSourceName = entry.getKey().split("\\.")[1]; ExecutionUnit unit = new ExecutionUnit(dataSourceName, new SQLUnit()); @@ -228,9 +228,9 @@ public List execute() throws SQLException { throw new SQLException("this transaction is most likely to timeout, pre-abort in harp"); } } - + boolean onePhase = executionGroupContext.getInputGroups().size() == 1; - for (ExecutionGroup each: executionGroupContext.getInputGroups()) { + for (ExecutionGroup each : executionGroupContext.getInputGroups()) { ExecutionUnit executionUnit = each.getInputs().get(0).getExecutionUnit(); executionUnit.getSqlUnit().setLastQueryComment(onePhase); } @@ -261,7 +261,7 @@ private boolean analysisLatency(List> groupUni if (Latency.getInstance().NeedPreAbort() || Latency.getInstance().NeedLatencyPredictionAndPreAbort()) { executionUnit.updateProbability(Objects.requireNonNull(LocalLockTable.getInstance().getLockMetaData(tableName, key)).nonBlockProbability()); } - if (Latency.getInstance().NeedLatencyPredict() || Latency.getInstance().NeedLatencyPredictionAndPreAbort()){ + if (Latency.getInstance().NeedLatencyPredict() || Latency.getInstance().NeedLatencyPredictionAndPreAbort()) { executionUnit.updateLocalExecuteLatency((int) Objects.requireNonNull(LocalLockTable.getInstance().getLockMetaData(tableName, key)).getLatency()); } executionUnit.addKeys(tableName, key); @@ -375,8 +375,8 @@ private List executeMultiStatements(final ExecutionGroupContext< try { long start = System.nanoTime() / 1000000; List> executeResults = jdbcExecutor.execute(executionGroupContext, callback); -// System.out.println("exectution time: " + (System.nanoTime() / 1000000 - start) + "ms; " + System.nanoTime() / 1000000 + "ms"); - + // System.out.println("exectution time: " + (System.nanoTime() / 1000000 - start) + "ms; " + System.nanoTime() / 1000000 + "ms"); + feedback((List>) executionGroupContext.getInputGroups(), true); boolean first = false; for (List each : executeResults) { @@ -425,7 +425,7 @@ private void feedback(List> groupUnits, boolea String dataSourceName = executionUnit.getDataSourceName(); double localExecuteTime = Math.max(0, executionUnit.getRealExecuteLatency() - Latency.getInstance().GetLatency(dataSourceName)); - + if (isFinish && localExecuteTime < 1e-5) { for (Map.Entry> tableToKeys : executionUnit.getKeys().entrySet()) { for (Integer key : tableToKeys.getValue()) { @@ -452,7 +452,7 @@ private void feedback(List> groupUnits, boolea lockMetaData.incCount(); if (isFinish) { double singleLatency = localExecuteTime * lockMetaData.getLatency() / Math.max(totalWeight, 0.0001); - + if (singleLatency < networkThreshold) { lockMetaData.incSuccessCount(); Objects.requireNonNull(LocalLockTable.getInstance().getLockMetaData(tableToKeys.getKey(), key)).updateLatency(singleLatency); diff --git a/sql-parser/statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/util/SQLUtils.java b/sql-parser/statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/util/SQLUtils.java index 74b7228e435..81e6d43016f 100644 --- a/sql-parser/statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/util/SQLUtils.java +++ b/sql-parser/statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/util/SQLUtils.java @@ -63,7 +63,7 @@ public final class SQLUtils { private static final Pattern ANY_CHARACTER_PATTERN = Pattern.compile("^%|([^\\\\])%"); private static final Pattern ANY_CHARACTER_ESCAPE_PATTERN = Pattern.compile("\\\\%"); - + private static final char[] HEX_DIGITS = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; /** @@ -277,21 +277,21 @@ public static List splitMultiSQL(final String sql) { return result; } - + private static String getComment(final String sql) { String result = ""; if (sql.startsWith(COMMENT_PREFIX)) { result = sql.substring(2, sql.indexOf(COMMENT_SUFFIX)); } - + return result; } - + public static boolean isLastQuery(final String sql) { String comment = getComment(sql); return comment.contains("last query"); } - + public static void appendAsHex(StringBuilder builder, int value) { if (value == 0) { builder.append("0x0"); @@ -299,28 +299,28 @@ public static void appendAsHex(StringBuilder builder, int value) { int shift = 32; boolean nonZeroFound = false; builder.append("0x"); - + do { shift -= 4; - byte nibble = (byte)(value >>> shift & 15); + byte nibble = (byte) (value >>> shift & 15); if (nonZeroFound) { builder.append(HEX_DIGITS[nibble]); } else if (nibble != 0) { builder.append(HEX_DIGITS[nibble]); nonZeroFound = true; } - } while(shift != 0); + } while (shift != 0); } } - + public static void appendAsHex(StringBuilder builder, byte[] bytes) { builder.append("0x"); - + for (byte b : bytes) { builder.append(HEX_DIGITS[b >>> 4 & 15]).append(HEX_DIGITS[b & 15]); } } - + public static String xidToHex(Xid xid) { StringBuilder builder = new StringBuilder(); byte[] gtrid = xid.getGlobalTransactionId(); @@ -328,15 +328,15 @@ public static String xidToHex(Xid xid) { if (gtrid != null) { SQLUtils.appendAsHex(builder, gtrid); } - + builder.append(','); if (btrid != null) { SQLUtils.appendAsHex(builder, btrid); } - + builder.append(','); SQLUtils.appendAsHex(builder, xid.getFormatId()); - + return builder.toString(); } }