Skip to content

Commit

Permalink
release version 1.0 for GeoTP
Browse files Browse the repository at this point in the history
  • Loading branch information
QiYuZhuang committed Jan 28, 2024
1 parent 3497085 commit 6f8615e
Show file tree
Hide file tree
Showing 25 changed files with 359 additions and 101 deletions.
3 changes: 2 additions & 1 deletion default_table_size
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
usertable 1000001
usertable 1000001
warehouse 401
10 changes: 7 additions & 3 deletions distribution/proxy/src/main/resources/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,15 @@ if [[ $1 == -a ]] || [[ $1 == -p ]] || [[ $1 == -c ]] || [[ $1 == -f ]] || [[ $1
done

elif [ "$1" == "--harp" ]; then
HARP="alg=harp"
HARP="--alg=harp"
elif [ "$1" == "--aharp" ]; then
HARP="alg=aharp"
HARP="--alg=aharp"
elif [ "$1" == "--aharp_lppa" ]; then
HARP="--alg=aharp_lppa"
elif [ "$1" == "--aharp_pa" ]; then
HARP="--alg=aharp_pa"
elif [ "$1" == "--a" ]; then
HARP="alg=a"
HARP="--alg=a"

elif [ $# == 1 ]; then
PORT=$1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public void registerTable(String tableName) {
}
}

public boolean isRegisterTable(String tableName) {
for (String each : DefaultTableNameToSize.keySet()) {
if (each.equals(tableName)) {
return true;
}
}
return false;
}

private int getDefaultTableSize(String tableName) {
if (DefaultTableNameToSize.get(tableName) != null) {
return DefaultTableNameToSize.get(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@Setter
public final class LockMetaData {

private static final int countThreshold = 128;
private static final int countThreshold = 32;
private static final double alpha = 0.75;

private int readCount;
Expand Down Expand Up @@ -62,7 +62,7 @@ public LockMetaData(double networkLatency) {
writeLatency = 0.01;
latency = 0.01;
startTime = System.nanoTime();
networkThreshold = networkLatency * 2; // hyper-parameter, 1RTT
networkThreshold = networkLatency * 2; // hyper-parameter, 2 * RTT
}

public synchronized void incCount() {
Expand Down Expand Up @@ -91,7 +91,7 @@ public synchronized double nonBlockProbability() {
}
// TODO:
// System.out.println("successCount: " + successCount + "; count: " + count + "; processing: " + processing);
return Math.pow(successCount * 1.0 / count, processing);
return Math.pow(successCount * 1.0 / count, Math.max(processing - 1, 0));
}

public synchronized void updateLatency(double newLatency) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public void SetDataSourceIp(String src, String ip) {
}
}

public boolean IsPostgreSQLNode(String resource_name) {
for (Map.Entry<String, String> each : srcToIp.entrySet()) {
if (resource_name.contains(each.getKey())) {
// find the ip of postgreSQL node
if (each.getValue().equals("10.77.70.89") || each.getValue().equals("10.77.70.90") || each.getValue().equals("127.0.0.1")) {
return true;
}
}
}
return false;
}

public double GetLatency(String src) {
if (latencies.containsKey(src)) {
return latencies.get(src)[windowSize];
Expand Down Expand Up @@ -103,7 +115,10 @@ public void SetAlgorithm(String alg) {
}

public boolean NeedDelay() {
return algorithm.equals("harp") || algorithm.equals("aharp");
return algorithm.equals("harp") || algorithm.equals("aharp") ||
algorithm.equals("harp_lp") || algorithm.equals("aharp_lp") ||
algorithm.equals("harp_pa") || algorithm.equals("aharp_pa") ||
algorithm.equals("harp_lppa") || algorithm.equals("aharp_lppa");
}

public boolean NeedLatencyPredict() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class AgentAsyncXAManager {
Expand All @@ -34,6 +34,7 @@ private class XATransactionInfo {

XATransactionState state;
String errorInfo;
boolean preAbort = false;

public XATransactionInfo(XATransactionState state) {
this.state = state;
Expand All @@ -44,7 +45,7 @@ public static AgentAsyncXAManager getInstance() {
return Instance;
}

private final HashMap<CustomXID, XATransactionInfo> XAStates = new HashMap<>();
private final ConcurrentHashMap<CustomXID, XATransactionInfo> XAStates = new ConcurrentHashMap<>(2048, 0.75f, 256);
private long globalTid = 0;

public synchronized long fetchAndAddGlobalTid() {
Expand All @@ -66,6 +67,7 @@ public void setStateByXid(CustomXID xid, XATransactionState state) {
XAStates.get(xid).setState(state);
} else {
assert state == XATransactionState.ROLLBACK_ONLY;
XAStates.get(xid).setState(state);
log.info("rollback only" + xid);
}
} else {
Expand All @@ -82,8 +84,13 @@ public void setErrorInfoByXid(CustomXID xid, String errorInfo) {

public XATransactionState getStateByXid(CustomXID xid) {
if (XAStates.get(xid) == null) {
System.out.println(xid.toString());
System.out.println("XA manager can not find" + xid.toString());
}
return XAStates.get(xid).getState();
}

public void clearStateByXid(CustomXID xid) {
System.out.println("XA state size: " + XAStates.size());
XAStates.remove(xid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Arrays;
import java.util.List;

import com.mysql.jdbc.jdbc2.optional.MysqlXid;
import com.mysql.cj.jdbc.MysqlXid;
import lombok.ToString;

import javax.transaction.xa.Xid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public void SetDelayTime(long delayTime) {
this.delayTime = delayTime;
}

public void clearAnalysis() {
networkLatency = 0;
localExecuteLatency = 0;
abortProbability = 0.0;
delayTime = 0;
}

public void SetFinishTime(long finishTime) {
this.finishTime = finishTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void CombineSQLUnit(SQLUnit other) {

public void setLastQueryComment(boolean onePhase) {
if (onePhase) {
sql = "/*last one phase query*/" + sql;
// sql = "/*last one phase query*/" + sql;
} else {
sql = "/*last query*/" + sql;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread) throws SQLException {
// TODO It is better to judge whether need sane result before execute, can avoid exception thrown
Collection<T> result = new LinkedList<>();
long startTime = System.nanoTime();
for (JDBCExecutionUnit each : executionUnits) {
T executeResult = execute(each, isTrunkThread);
if (null != executeResult) {
result.add(executeResult);
}
}
System.out.println("JDBCExecutorCallback execution time: " + (System.nanoTime() - startTime) / 1000000 + " ms");
return result;
}

Expand All @@ -76,6 +78,7 @@ public final Collection<T> execute(final Collection<JDBCExecutionUnit> execution
*/
@SneakyThrows
private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread) throws SQLException {
long start = System.nanoTime();
SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
DatabaseType storageType = storageTypes.get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName());
DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(jdbcExecutionUnit.getStorageResource().getConnection().getMetaData(), storageType);
Expand All @@ -102,6 +105,7 @@ private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTru
executionUnit.setRealExecuteLatency((int) (executeTime / 1000000));
sqlExecutionHook.finishSuccess();
finishReport(jdbcExecutionUnit);
System.out.println("execute time: " + (System.nanoTime() - start) / 1000000 + " ms");
return result;
} catch (final SQLException ex) {
if (!storageType.equals(protocolType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ private SQLExecutionUnitBuilder getCachedSqlExecutionUnitBuilder(final String ty
@Override
protected List<ExecutionGroup<T>> group(final String dataSourceName, final List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException {
List<ExecutionGroup<T>> result = new LinkedList<>();
long startTime = System.nanoTime();
List<C> connections = connectionManager.getConnections(dataSourceName, sqlUnitGroups.size(), connectionMode);
System.out.println("Get connection time: " + (System.nanoTime() - startTime) / 1000000 + " ms;");
int count = 0;
for (List<SQLUnit> each : sqlUnitGroups) {
result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.transaction.exception.TransactionTimeoutException;
import org.apache.shardingsphere.transaction.spi.ShardingSphereTransactionManager;
import org.apache.shardingsphere.transaction.xa.jta.datasource.XATransactionDataSource;
import org.apache.shardingsphere.transaction.xa.harp.manager.CustomTransactionImp;
import org.apache.shardingsphere.transaction.xa.jta.datasource.checker.DataSourcePrivilegeChecker;
import org.apache.shardingsphere.transaction.xa.spi.XATransactionManagerProvider;

Expand Down Expand Up @@ -53,6 +54,14 @@ public final class XAShardingSphereTransactionManager implements ShardingSphereT

private XATransactionManagerProvider xaTransactionManagerProvider;

@SneakyThrows
public void setXATransactionPreAbort(boolean preAbort) {
// System.out.println("set pre abort: " + preAbort);
if (xaTransactionManagerProvider.getTransactionManager().getTransaction() instanceof CustomTransactionImp) {
((CustomTransactionImp) xaTransactionManagerProvider.getTransactionManager().getTransaction()).setPreAbort(preAbort);
}
}

@Override
public void init(final Map<String, DatabaseType> databaseTypes, final Map<String, DataSource> dataSources, final String providerType) {
// dataSources.forEach((key, value) -> TypedSPILoader.getService(DataSourcePrivilegeChecker.class, databaseTypes.get(key).getType()).checkPrivilege(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.transaction.xa.jta.connection.dialect;

import com.mysql.jdbc.jdbc2.optional.JDBC4MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -45,7 +45,7 @@ class MySQLXAConnectionWrapperTest {
@Test
void assertWrap() throws SQLException {
XAConnection actual = TypedSPILoader.getService(XAConnectionWrapper.class, databaseType.getType()).wrap(createXADataSource(), mockConnection());
assertThat(actual.getXAResource(), instanceOf(JDBC4MysqlXAConnection.class));
assertThat(actual.getXAResource(), instanceOf(MysqlXAConnection.class));
}

private XADataSource createXADataSource() {
Expand All @@ -55,7 +55,7 @@ private XADataSource createXADataSource() {

private Connection mockConnection() throws SQLException {
Connection result = mock(Connection.class);
when(result.unwrap(com.mysql.jdbc.Connection.class)).thenReturn(mock(com.mysql.jdbc.Connection.class));
when(result.unwrap(com.mysql.cj.MysqlConnection.class)).thenReturn(mock(com.mysql.cj.MysqlConnection.class));
return result;
}
}
Loading

0 comments on commit 6f8615e

Please sign in to comment.