Skip to content

Commit

Permalink
fix(interactive): The replay doesn't need to fill the kafka again. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored Nov 1, 2024
1 parent fa07f86 commit d22802b
Show file tree
Hide file tree
Showing 27 changed files with 338 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

public class CoordinatorConfig {
public static final Config<Long> SNAPSHOT_INCREASE_INTERVAL_MS =
Config.longConfig("snapshot.increase.interval.ms", 1000L);
Config.longConfig("snapshot.increase.interval.ms", 2000L);

public static final Config<Long> OFFSETS_PERSIST_INTERVAL_MS =
Config.longConfig("offsets.persist.interval.ms", 1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;

/**
* client to submit request to remote engine service
Expand All @@ -37,7 +38,8 @@ public ExecutionClient(ChannelFetcher<C> channelFetcher) {
public abstract void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
QueryTimeoutConfig timeoutConfig,
QueryLogger queryLogger)
throws Exception;

public abstract void close() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.graphscope.gaia.proto.StoredProcedure;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.client.common.Result;
import com.alibaba.graphscope.interactive.client.impl.DefaultSession;
Expand Down Expand Up @@ -56,7 +57,8 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher<URI> channelFetch
public void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
QueryTimeoutConfig timeoutConfig,
QueryLogger queryLogger)
throws Exception {
List<CompletableFuture> responseFutures = Lists.newArrayList();
for (URI httpURI : channelFetcher.fetch()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
import com.alibaba.pegasus.RpcChannel;
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
Expand Down Expand Up @@ -54,7 +55,8 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channe
public void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
QueryTimeoutConfig timeoutConfig,
QueryLogger queryLogger)
throws Exception {
if (rpcClientRef.get() == null) {
rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch()));
Expand Down Expand Up @@ -97,12 +99,13 @@ public void process(PegasusClient.JobResponse jobResponse) {
@Override
public void finish() {
listener.onCompleted();
logger.info("[compile]: received results from engine");
queryLogger.info("[compile]: received results from engine");
}

@Override
public void error(Status status) {
listener.onError(status.asException());
queryLogger.error("[compile]: fail to receive results from engine");
}
},
timeoutConfig.getChannelTimeoutMS());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.GraphConfig;
import com.alibaba.graphscope.common.ir.meta.GraphId;
import com.alibaba.graphscope.common.config.PlannerConfig;
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.IrMetaStats;
import com.alibaba.graphscope.common.ir.meta.IrMetaTracker;
Expand All @@ -46,21 +46,27 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable
private volatile IrMetaStats currentState;
// To manage the state changes of statistics resulting from different update operations.
private volatile StatsState statsState;
private volatile Boolean statsEnabled = null;
private final boolean fetchStats;

public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) {
super(dataReader, tracker);
this.scheduler = new ScheduledThreadPoolExecutor(2);
this.scheduler = new ScheduledThreadPoolExecutor(1);
this.scheduler.scheduleAtFixedRate(
() -> syncMeta(),
2000,
0,
GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
this.scheduler.scheduleAtFixedRate(
() -> syncStats(statsEnabled == null ? false : statsEnabled),
2000,
GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
this.fetchStats =
PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs)
&& PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO");
if (this.fetchStats) {
logger.info("start to schedule statistics fetch task");
this.scheduler.scheduleAtFixedRate(
() -> syncStats(),
2000,
GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
}
}

@Override
Expand Down Expand Up @@ -94,32 +100,18 @@ private synchronized void syncMeta() {
meta.getSchema(),
meta.getStoredProcedures(),
curStats);
boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId());
if (statsEnabled && this.statsState != StatsState.SYNCED
|| (!statsEnabled && this.statsState != StatsState.MOCKED)) {
logger.debug("start to sync stats");
syncStats(statsEnabled);
if (this.fetchStats && this.statsState != StatsState.SYNCED) {
logger.info("start to schedule statistics fetch task");
syncStats();
}
} catch (Throwable e) {
logger.warn("failed to read meta data, error is {}", e);
}
}

private boolean getStatsEnabled(GraphId graphId) {
try {
return this.statsEnabled == null
? this.reader.syncStatsEnabled(graphId)
: this.statsEnabled;
} catch (
Throwable e) { // if errors happen when reading stats enabled, we assume it is false
logger.warn("failed to read stats enabled, error is {}", e);
return false;
logger.warn("failed to read meta data", e);
}
}

private synchronized void syncStats(boolean statsEnabled) {
private synchronized void syncStats() {
try {
if (this.currentState != null && statsEnabled) {
if (this.currentState != null) {
GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId());
logger.debug("statistics from remote: {}", stats);
if (stats != null && stats.getVertexCount() != 0) {
Expand All @@ -137,7 +129,7 @@ private synchronized void syncStats(boolean statsEnabled) {
}
}
} catch (Throwable e) {
logger.warn("failed to read graph statistics, error is {}", e);
logger.warn("failed to read graph statistics, error is", e);
} finally {
try {
if (this.currentState != null
Expand All @@ -148,7 +140,7 @@ private synchronized void syncStats(boolean statsEnabled) {
this.statsState = StatsState.MOCKED;
}
} catch (Throwable t) {
logger.warn("failed to mock the glogue, error is {}", t);
logger.warn("failed to mock the glogue, error is", t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ public void execute(
jobName,
summary.getLogicalPlan(),
summary.getPhysicalPlan());
client.submit(request, listener, timeoutConfig);
client.submit(
request,
listener,
timeoutConfig,
statusCallback.getQueryLogger());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public ThrowingConsumer<Context> select(Context ctx) {
summary.getLogicalPlan(),
summary.getPhysicalPlan()),
listener,
timeoutConfig);
timeoutConfig,
statusCallback.getQueryLogger());
}
// request results from remote engine in a blocking way
listener.request();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ private JsonObject buildSimpleLog(boolean isSucceed, long elapsedMillis) {
private void fillLogDetail(JsonObject logJson, String errorMsg) {
try {
if (this.metricsCollector.getElapsedMillis() > this.printThreshold) {
// todo(siyuan): the invocation of the function can cause Exception when serializing
// a gremlin vertex to json format
fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis());
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public GremlinExecutor.LifeCycle get() {
summary.getLogicalPlan(),
summary.getPhysicalPlan()),
listener,
timeoutConfig);
timeoutConfig,
statusCallback.getQueryLogger());
}
// request results from remote engine in a blocking way
listener.request();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ protected AbstractResultProcessor(
msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
.orElse(settings.resultIterationBatchSize);
this.resultCollectors = new ArrayList<>(this.resultCollectorsBatchSize);
this.responseStreamIterator =
new StreamIterator<>(
FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs));
int capacity = FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs);
this.responseStreamIterator = new StreamIterator<>(capacity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ public GremlinResultProcessor(
this.statusCallback = statusCallback;
this.timeoutConfig = timeoutConfig;
this.reducer = Maps.newLinkedHashMap();
this.recordStreamIterator =
new StreamIterator<>(
FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs));
int capacity = FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs);
this.recordStreamIterator = new StreamIterator<>(capacity);
}

@Override
Expand Down
50 changes: 9 additions & 41 deletions interactive_engine/executor/store/mcsr/src/graph_partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,11 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
.unwrap()
.ends_with(".csv")
{
let input_path = vertex_file
.as_os_str()
.clone()
.to_str()
.unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
let input_path = vertex_file.as_os_str().to_str().unwrap();
let input_dir_path = self.input_dir.as_os_str().to_str().unwrap();
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down Expand Up @@ -258,23 +248,13 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
.unwrap()
.ends_with(".csv.gz")
{
let input_path = vertex_file
.as_os_str()
.clone()
.to_str()
.unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
let input_path = vertex_file.as_os_str().to_str().unwrap();
let input_dir_path = self.input_dir.as_os_str().to_str().unwrap();
let gz_loc = input_path.find(".gz").unwrap();
let input_path = input_path.split_at(gz_loc).0;
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down Expand Up @@ -327,18 +307,12 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
.unwrap()
.ends_with(".csv")
{
info!("{}", edge_file.as_os_str().clone().to_str().unwrap());
let input_path = edge_file.as_os_str().clone().to_str().unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
info!("{}", edge_file.as_os_str().to_str().unwrap());
let input_path = edge_file.as_os_str().to_str().unwrap();
let input_dir_path = self.input_dir.as_os_str().to_str().unwrap();
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down Expand Up @@ -373,19 +347,13 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
.unwrap()
.ends_with(".csv.gz")
{
let input_path = edge_file.as_os_str().clone().to_str().unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
let input_path = edge_file.as_os_str().to_str().unwrap();
let input_dir_path = self.input_dir.as_os_str().to_str().unwrap();
let gz_loc = input_path.find(".gz").unwrap();
let input_path = input_path.split_at(gz_loc).0;
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ public List<Long> replayRecords(long offset, long timestamp) {
return resp.getSnapshotIdList();
}

public List<Long> replayRecordsV2(long offset, long timestamp) {
ReplayRecordsRequestV2 req =
ReplayRecordsRequestV2.newBuilder()
.setOffset(offset)
.setTimestamp(timestamp)
.build();
ReplayRecordsResponseV2 resp = this.clientStub.replayRecordsV2(req);
return resp.getSnapshotIdList();
}

private long modifyVertex(Vertex vertex, WriteTypePb writeType) {
WriteRequestPb request = vertex.toWriteRequest(writeType);
return submit(request);
Expand Down
Loading

0 comments on commit d22802b

Please sign in to comment.