Skip to content

Commit

Permalink
change machine
Browse files Browse the repository at this point in the history
  • Loading branch information
QiYuZhuang committed Nov 6, 2023
1 parent 2fcb654 commit 3497085
Show file tree
Hide file tree
Showing 29 changed files with 826 additions and 516 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private static Collection<SchemaMetaData> load(final SchemaMetaDataLoaderMateria
private static Collection<SchemaMetaData> loadByDefault(final SchemaMetaDataLoaderMaterial material) throws SQLException {
Collection<TableMetaData> tableMetaData = new LinkedList<>();
for (String each : material.getActualTableNames()) {
// TableMetaDataLoader.load(material.getDataSource(), each, material.getStorageType()).ifPresent(tableMetaData::add);
// TableMetaDataLoader.load(material.getDataSource(), each, material.getStorageType()).ifPresent(tableMetaData::add);
}
return Collections.singletonList(new SchemaMetaData(material.getDefaultSchemaName(), tableMetaData));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public final class Latency {

private static final Latency INSTANCE = new Latency(0.8, 20);
@Getter
private String algorithm;
private final HashMap<String, double[]> latencies;
@Getter
Expand All @@ -46,11 +47,11 @@ public Latency(double alpha, int windowSize) {
public void AddDataSource(String src) {
System.out.println("Add Source " + src);
latencies.put(src, new double[windowSize + 1]);
// if (src.contains("ds_1")) {
// latencies.get(src)[windowSize] = 150;
// } else {
// latencies.get(src)[windowSize] = 20;
// }
// if (src.contains("ds_1")) {
// latencies.get(src)[windowSize] = 150;
// } else {
// latencies.get(src)[windowSize] = 20;
// }
}

public void SetDataSourceIp(String src, String ip) {
Expand Down Expand Up @@ -104,13 +105,19 @@ public void SetAlgorithm(String alg) {
public boolean NeedDelay() {
return algorithm.equals("harp") || algorithm.equals("aharp");
}

public boolean NeedLatencyPredict() { return algorithm.equals("harp_lp") || algorithm.equals("aharp_lp"); }

public boolean NeedPreAbort() { return algorithm.equals("harp_pa") || algorithm.equals("aharp_pa"); }

public boolean NeedLatencyPredictionAndPreAbort() { return algorithm.equals("harp_lppa") || algorithm.equals("aharp_lppa"); }


public boolean NeedLatencyPredict() {
return algorithm.equals("harp_lp") || algorithm.equals("aharp_lp");
}

public boolean NeedPreAbort() {
return algorithm.equals("harp_pa") || algorithm.equals("aharp_pa");
}

public boolean NeedLatencyPredictionAndPreAbort() {
return algorithm.equals("harp_lppa") || algorithm.equals("aharp_lppa");
}

public boolean asyncPreparation() {
return algorithm.equals("aharp") || algorithm.equals("aharp_lp") || algorithm.equals("aharp_pa") ||
algorithm.equals("aharp_lppa") || algorithm.equals("a");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.transactions;

import lombok.Getter;
Expand All @@ -8,33 +25,35 @@

@Slf4j
public class AgentAsyncXAManager {

static AgentAsyncXAManager Instance = new AgentAsyncXAManager();

@Getter
@Setter
private class XATransactionInfo {

XATransactionState state;
String errorInfo;

public XATransactionInfo(XATransactionState state) {
this.state = state;
}
}

public static AgentAsyncXAManager getInstance() {
return Instance;
}

private final HashMap<CustomXID, XATransactionInfo> XAStates = new HashMap<>();
private long globalTid = 0;

public synchronized long fetchAndAddGlobalTid() {
return globalTid++;
}

public void setStateByXid(CustomXID xid, XATransactionState state) {
if (XAStates.containsKey(xid)) {
assert(XAStates.get(xid).getState() == XATransactionState.ACTIVE);
assert (XAStates.get(xid).getState() == XATransactionState.ACTIVE);
if (state == XATransactionState.IDLE) {
// one phase commit
XAStates.get(xid).setState(state);
Expand All @@ -55,12 +74,12 @@ public void setStateByXid(CustomXID xid, XATransactionState state) {
XAStates.put(xid, info);
}
}

public void setErrorInfoByXid(CustomXID xid, String errorInfo) {
assert XAStates.containsKey(xid);
XAStates.get(xid).setErrorInfo(errorInfo);
}

public XATransactionState getStateByXid(CustomXID xid) {
if (XAStates.get(xid) == null) {
System.out.println(xid.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.transactions;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -10,14 +27,15 @@
@Getter
@RequiredArgsConstructor
public class AsyncMessageFromAgent implements Serializable {

private final String Xid;

private final XATransactionState state;

private final long currentTimeStamp;

private final String SQLExceptionString;

@JsonCreator
public AsyncMessageFromAgent(@JsonProperty("state") String state, @JsonProperty("currentTimeStamp") long currentTimeStamp,
@JsonProperty("xid") String xid, @JsonProperty("sqlexceptionString") String sqlexceptionString) {
Expand All @@ -26,7 +44,7 @@ public AsyncMessageFromAgent(@JsonProperty("state") String state, @JsonProperty(
this.Xid = xid;
this.SQLExceptionString = sqlexceptionString;
}

@Override
public String toString() {
return "message{" +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.transactions;

import java.util.Arrays;
Expand All @@ -10,29 +27,30 @@

@ToString
public class CustomXID {

byte[] myBqual;
int myFormatId;
byte[] myGtrid;
String originStr;

public CustomXID(byte[] gtrid, byte[] bqual, int formatId) {
this.myGtrid = gtrid;
this.myBqual = bqual;
this.myFormatId = formatId;
this.originStr = Arrays.toString(gtrid) + "," + Arrays.toString(bqual) + "," + formatId;
}

public CustomXID(byte[] gtrid, byte[] bqual) {
this(gtrid, bqual, 1);
}

public CustomXID(String gtrid, String bqual) {
this(gtrid.getBytes(), bqual.getBytes());
}

public CustomXID(String str) {
List<String> list = Arrays.asList(str.split(","));

if (list.size() == 1) {
this.myGtrid = list.get(0).getBytes();
this.myBqual = "".getBytes();
Expand All @@ -49,11 +67,12 @@ public CustomXID(String str) {
this.myFormatId = Integer.parseInt(list.get(2).substring(2), 16);
} else {
this.myFormatId = Integer.parseInt(list.get(2));
} }

}
}

originStr = str;
}

public CustomXID(Xid xid) {
if (xid instanceof MysqlXid) {
this.myGtrid = xid.getGlobalTransactionId();
Expand All @@ -62,22 +81,22 @@ public CustomXID(Xid xid) {
this.originStr = Arrays.toString(myGtrid) + "," + Arrays.toString(myBqual) + "," + myFormatId;
}
}

@Override
public boolean equals(Object obj) {
boolean ret = false;
if (obj instanceof CustomXID) {
CustomXID other = (CustomXID)obj;
CustomXID other = (CustomXID) obj;
ret = originStr.equals(other.originStr);
}

return ret;
}

public int hashCode() {
return (this.originStr.hashCode() / 100) * 100 + Integer.parseInt(String.valueOf(myFormatId)) % 100;
}

public String toString() {
return this.originStr;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.transactions;

public enum XATransactionState {
NO_TRANSACTION,

ACTIVE,

IDLE,

PREPARED,

COMMITTED,

FAILED,

ABORTED,

ROLLBACK_ONLY,

NUM_OF_STATES
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class ExecutionUnit {
private int networkLatency;

private int realExecuteLatency;

private long finishTime;

private boolean isHarp = false;
Expand Down Expand Up @@ -101,8 +101,10 @@ public long GetDelayTime() {
public void SetDelayTime(long delayTime) {
this.delayTime = delayTime;
}

public void SetFinishTime(long finishTime) { this.finishTime = finishTime; }

public void SetFinishTime(long finishTime) {
this.finishTime = finishTime;
}

public void CombineExecutionUnit(ExecutionUnit other) {
sqlUnit.CombineSQLUnit(other.getSqlUnit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void CombineSQLUnit(SQLUnit other) {
parameters.addAll(other.getParameters());
tableRouteMappers.addAll(other.getTableRouteMappers());
}

public void setLastQueryComment(boolean onePhase) {
if (onePhase) {
sql = "/*last one phase query*/" + sql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class XAShardingSphereTransactionManager implements ShardingSphereT

@Override
public void init(final Map<String, DatabaseType> databaseTypes, final Map<String, DataSource> dataSources, final String providerType) {
// dataSources.forEach((key, value) -> TypedSPILoader.getService(DataSourcePrivilegeChecker.class, databaseTypes.get(key).getType()).checkPrivilege(value));
// dataSources.forEach((key, value) -> TypedSPILoader.getService(DataSourcePrivilegeChecker.class, databaseTypes.get(key).getType()).checkPrivilege(value));
xaTransactionManagerProvider = TypedSPILoader.getService(XATransactionManagerProvider.class, providerType);
xaTransactionManagerProvider.init();
Map<String, ResourceDataSource> resourceDataSources = getResourceDataSources(dataSources);
Expand Down
Loading

0 comments on commit 3497085

Please sign in to comment.