From d22802b69a9ff1bfc711c87836ee7d254cc83d48 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Fri, 1 Nov 2024 18:50:44 +0800 Subject: [PATCH] fix(interactive): The replay doesn't need to fill the kafka again. (#4296) --- .../common/config/CoordinatorConfig.java | 2 +- .../common/client/ExecutionClient.java | 4 +- .../common/client/HttpExecutionClient.java | 4 +- .../common/client/RpcExecutionClient.java | 7 +- .../ir/meta/fetcher/DynamicIrMetaFetcher.java | 54 +++---- .../cypher/executor/GraphQueryExecutor.java | 6 +- .../processor/IrTestOpProcessor.java | 3 +- .../gremlin/plugin/QueryStatusCallback.java | 2 - .../plugin/processor/LifeCycleSupplier.java | 3 +- .../processor/AbstractResultProcessor.java | 5 +- .../resultx/GremlinResultProcessor.java | 5 +- .../store/mcsr/src/graph_partitioner.rs | 50 ++----- .../graphscope/groot/sdk/GrootClient.java | 10 ++ .../groot/frontend/ClientService.java | 45 ++++++ .../groot/frontend/FrontendStoreClient.java | 21 +++ .../groot/frontend/write/KafkaAppender.java | 8 +- .../groot/store/FrontendStoreService.java | 22 ++- .../groot/store/KafkaProcessor.java | 133 +++++++++++++++--- .../groot/store/SnapshotSortQueue.java | 18 ++- .../graphscope/groot/store/StoreService.java | 6 +- .../graphscope/groot/store/WriterAgent.java | 39 ++--- .../graphscope/groot/wal/LogEntry.java | 6 +- .../graphscope/groot/servers/Store.java | 8 +- proto/groot/frontend_store_service.proto | 1 + proto/groot/sdk/client_service.proto | 1 + proto/groot/sdk/model.proto | 9 ++ python/graphscope/client/connection.py | 9 ++ 27 files changed, 338 insertions(+), 143 deletions(-) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index 2dcbbaf704f2..f3d11169dfa4 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -17,7 +17,7 @@ public class CoordinatorConfig { public static final Config SNAPSHOT_INCREASE_INTERVAL_MS = - Config.longConfig("snapshot.increase.interval.ms", 1000L); + Config.longConfig("snapshot.increase.interval.ms", 2000L); public static final Config OFFSETS_PERSIST_INTERVAL_MS = Config.longConfig("offsets.persist.interval.ms", 1000L); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java index 2fe515709b8d..089726d7fe0f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; /** * client to submit request to remote engine service @@ -37,7 +38,8 @@ public ExecutionClient(ChannelFetcher channelFetcher) { public abstract void submit( ExecutionRequest request, ExecutionResponseListener listener, - QueryTimeoutConfig timeoutConfig) + QueryTimeoutConfig timeoutConfig, + QueryLogger queryLogger) throws Exception; public abstract void close() throws Exception; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java index b8971bba6335..bb5157653f85 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java @@ -25,6 +25,7 @@ import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gaia.proto.StoredProcedure; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.graphscope.interactive.client.Session; import com.alibaba.graphscope.interactive.client.common.Result; import com.alibaba.graphscope.interactive.client.impl.DefaultSession; @@ -56,7 +57,8 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher channelFetch public void submit( ExecutionRequest request, ExecutionResponseListener listener, - QueryTimeoutConfig timeoutConfig) + QueryTimeoutConfig timeoutConfig, + QueryLogger queryLogger) throws Exception { List responseFutures = Lists.newArrayList(); for (URI httpURI : channelFetcher.fetch()) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java index 2920d2d5accd..1ef68e17a1a0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java @@ -23,6 +23,7 @@ import com.alibaba.graphscope.common.config.PegasusConfig; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; import com.alibaba.graphscope.gaia.proto.IrResult; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.pegasus.RpcChannel; import com.alibaba.pegasus.RpcClient; import com.alibaba.pegasus.intf.ResultProcessor; @@ -54,7 +55,8 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher channe public void submit( ExecutionRequest request, ExecutionResponseListener listener, - QueryTimeoutConfig timeoutConfig) + QueryTimeoutConfig timeoutConfig, + QueryLogger queryLogger) throws Exception { if (rpcClientRef.get() == null) { rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch())); @@ -97,12 +99,13 @@ public void process(PegasusClient.JobResponse jobResponse) { @Override public void finish() { listener.onCompleted(); - logger.info("[compile]: received results from engine"); + queryLogger.info("[compile]: received results from engine"); } @Override public void error(Status status) { listener.onError(status.asException()); + queryLogger.error("[compile]: fail to receive results from engine"); } }, timeoutConfig.getChannelTimeoutMS()); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index 931e78aae887..992895c4fdaa 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -20,7 +20,7 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.GraphConfig; -import com.alibaba.graphscope.common.ir.meta.GraphId; +import com.alibaba.graphscope.common.config.PlannerConfig; import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.IrMetaStats; import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; @@ -46,21 +46,27 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable private volatile IrMetaStats currentState; // To manage the state changes of statistics resulting from different update operations. private volatile StatsState statsState; - private volatile Boolean statsEnabled = null; + private final boolean fetchStats; public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) { super(dataReader, tracker); - this.scheduler = new ScheduledThreadPoolExecutor(2); + this.scheduler = new ScheduledThreadPoolExecutor(1); this.scheduler.scheduleAtFixedRate( () -> syncMeta(), - 2000, + 0, GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs), TimeUnit.MILLISECONDS); - this.scheduler.scheduleAtFixedRate( - () -> syncStats(statsEnabled == null ? false : statsEnabled), - 2000, - GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs), - TimeUnit.MILLISECONDS); + this.fetchStats = + PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) + && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); + if (this.fetchStats) { + logger.info("start to schedule statistics fetch task"); + this.scheduler.scheduleAtFixedRate( + () -> syncStats(), + 2000, + GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs), + TimeUnit.MILLISECONDS); + } } @Override @@ -94,32 +100,18 @@ private synchronized void syncMeta() { meta.getSchema(), meta.getStoredProcedures(), curStats); - boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); - if (statsEnabled && this.statsState != StatsState.SYNCED - || (!statsEnabled && this.statsState != StatsState.MOCKED)) { - logger.debug("start to sync stats"); - syncStats(statsEnabled); + if (this.fetchStats && this.statsState != StatsState.SYNCED) { + logger.info("start to schedule statistics fetch task"); + syncStats(); } } catch (Throwable e) { - logger.warn("failed to read meta data, error is {}", e); - } - } - - private boolean getStatsEnabled(GraphId graphId) { - try { - return this.statsEnabled == null - ? this.reader.syncStatsEnabled(graphId) - : this.statsEnabled; - } catch ( - Throwable e) { // if errors happen when reading stats enabled, we assume it is false - logger.warn("failed to read stats enabled, error is {}", e); - return false; + logger.warn("failed to read meta data", e); } } - private synchronized void syncStats(boolean statsEnabled) { + private synchronized void syncStats() { try { - if (this.currentState != null && statsEnabled) { + if (this.currentState != null) { GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId()); logger.debug("statistics from remote: {}", stats); if (stats != null && stats.getVertexCount() != 0) { @@ -137,7 +129,7 @@ private synchronized void syncStats(boolean statsEnabled) { } } } catch (Throwable e) { - logger.warn("failed to read graph statistics, error is {}", e); + logger.warn("failed to read graph statistics, error is", e); } finally { try { if (this.currentState != null @@ -148,7 +140,7 @@ private synchronized void syncStats(boolean statsEnabled) { this.statsState = StatsState.MOCKED; } } catch (Throwable t) { - logger.warn("failed to mock the glogue, error is {}", t); + logger.warn("failed to mock the glogue, error is", t); } } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index 4771cc39d3a0..1d6eeaaacc25 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -185,7 +185,11 @@ public void execute( jobName, summary.getLogicalPlan(), summary.getPhysicalPlan()); - client.submit(request, listener, timeoutConfig); + client.submit( + request, + listener, + timeoutConfig, + statusCallback.getQueryLogger()); } }; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java index 6a3eee75c803..da476cdf85c4 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/processor/IrTestOpProcessor.java @@ -187,7 +187,8 @@ public ThrowingConsumer select(Context ctx) { summary.getLogicalPlan(), summary.getPhysicalPlan()), listener, - timeoutConfig); + timeoutConfig, + statusCallback.getQueryLogger()); } // request results from remote engine in a blocking way listener.request(); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java index 2f45930159a1..ba63ffdedcd9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java @@ -114,8 +114,6 @@ private JsonObject buildSimpleLog(boolean isSucceed, long elapsedMillis) { private void fillLogDetail(JsonObject logJson, String errorMsg) { try { if (this.metricsCollector.getElapsedMillis() > this.printThreshold) { - // todo(siyuan): the invocation of the function can cause Exception when serializing - // a gremlin vertex to json format fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis()); } } catch (Throwable t) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java index 2b21ef46d244..1289c9caaef9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java @@ -118,7 +118,8 @@ public GremlinExecutor.LifeCycle get() { summary.getLogicalPlan(), summary.getPhysicalPlan()), listener, - timeoutConfig); + timeoutConfig, + statusCallback.getQueryLogger()); } // request results from remote engine in a blocking way listener.request(); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index 419286a5532d..b75f83d89166 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -73,9 +73,8 @@ protected AbstractResultProcessor( msg.optionalArgs(Tokens.ARGS_BATCH_SIZE) .orElse(settings.resultIterationBatchSize); this.resultCollectors = new ArrayList<>(this.resultCollectorsBatchSize); - this.responseStreamIterator = - new StreamIterator<>( - FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs)); + int capacity = FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs); + this.responseStreamIterator = new StreamIterator<>(capacity); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java index b1fe008adced..0f6524cab81c 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java @@ -59,9 +59,8 @@ public GremlinResultProcessor( this.statusCallback = statusCallback; this.timeoutConfig = timeoutConfig; this.reducer = Maps.newLinkedHashMap(); - this.recordStreamIterator = - new StreamIterator<>( - FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs)); + int capacity = FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs); + this.recordStreamIterator = new StreamIterator<>(capacity); } @Override diff --git a/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs b/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs index 7d6519193767..3e1b39ce51f9 100644 --- a/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs +++ b/interactive_engine/executor/store/mcsr/src/graph_partitioner.rs @@ -212,21 +212,11 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv") { - let input_path = vertex_file - .as_os_str() - .clone() - .to_str() - .unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .clone() - .to_str() - .unwrap(); + let input_path = vertex_file.as_os_str().to_str().unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) @@ -258,23 +248,13 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv.gz") { - let input_path = vertex_file - .as_os_str() - .clone() - .to_str() - .unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .clone() - .to_str() - .unwrap(); + let input_path = vertex_file.as_os_str().to_str().unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let gz_loc = input_path.find(".gz").unwrap(); let input_path = input_path.split_at(gz_loc).0; let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) @@ -327,18 +307,12 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv") { - info!("{}", edge_file.as_os_str().clone().to_str().unwrap()); - let input_path = edge_file.as_os_str().clone().to_str().unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .clone() - .to_str() - .unwrap(); + info!("{}", edge_file.as_os_str().to_str().unwrap()); + let input_path = edge_file.as_os_str().to_str().unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) @@ -373,19 +347,13 @@ impl GraphPartitioner { .unwrap() .ends_with(".csv.gz") { - let input_path = edge_file.as_os_str().clone().to_str().unwrap(); - let input_dir_path = self - .input_dir - .as_os_str() - .clone() - .to_str() - .unwrap(); + let input_path = edge_file.as_os_str().to_str().unwrap(); + let input_dir_path = self.input_dir.as_os_str().to_str().unwrap(); let gz_loc = input_path.find(".gz").unwrap(); let input_path = input_path.split_at(gz_loc).0; let output_path = if let Some(pos) = input_path.find(input_dir_path) { self.partition_dir.join( input_path - .clone() .split_at(pos + input_dir_path.len() + 1) .1, ) diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index 971dea16d561..dbec657b1174 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -101,6 +101,16 @@ public List replayRecords(long offset, long timestamp) { return resp.getSnapshotIdList(); } + public List replayRecordsV2(long offset, long timestamp) { + ReplayRecordsRequestV2 req = + ReplayRecordsRequestV2.newBuilder() + .setOffset(offset) + .setTimestamp(timestamp) + .build(); + ReplayRecordsResponseV2 resp = this.clientStub.replayRecordsV2(req); + return resp.getSnapshotIdList(); + } + private long modifyVertex(Vertex vertex, WriteTypePb writeType) { WriteRequestPb request = vertex.toWriteRequest(writeType); return submit(request); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 48f64d721751..b616ddc64df5 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -535,4 +535,49 @@ private void finish(Throwable t) { }); } } + + @Override + public void replayRecordsV2( + ReplayRecordsRequestV2 request, + StreamObserver responseObserver) { + ReplayRecordsResponseV2.Builder response = ReplayRecordsResponseV2.newBuilder(); + logger.info("replay records v2"); + int storeCount = this.metaService.getStoreCount(); + AtomicInteger counter = new AtomicInteger(storeCount); + AtomicBoolean finished = new AtomicBoolean(false); + + for (int i = 0; i < storeCount; i++) { + this.frontendStoreClients + .getClient(i) + .replayRecordsV2( + request, + new CompletionCallback() { + @Override + public void onCompleted(ReplayRecordsResponseV2 res) { + response.mergeFrom(res); + if (!finished.get() && counter.decrementAndGet() == 0) { + finish(null); + } + } + + @Override + public void onError(Throwable t) { + logger.error("failed to replay records", t); + finish(t); + } + + private void finish(Throwable t) { + if (finished.getAndSet(true)) { + return; + } + if (t != null) { + responseObserver.onError(t); + } else { + responseObserver.onNext(response.build()); + responseObserver.onCompleted(); + } + } + }); + } + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java index 0fa6aad2ca80..8d5e3fd75990 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/FrontendStoreClient.java @@ -135,4 +135,25 @@ public void onError(Throwable t) { public void onCompleted() {} }); } + + public void replayRecordsV2( + ReplayRecordsRequestV2 request, CompletionCallback callback) { + getStub() + .replayRecordsV2( + request, + new StreamObserver() { + @Override + public void onNext(ReplayRecordsResponseV2 value) { + callback.onCompleted(value); + } + + @Override + public void onError(Throwable t) { + callback.onError(t); + } + + @Override + public void onCompleted() {} + }); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java index bb2f848843b5..24a6c708ed3f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java @@ -240,13 +240,14 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc types.add(OperationType.ADD_EDGE_TYPE_PROPERTIES); logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp); - long batchSnapshotId = 0; + List ids = new ArrayList<>(); int replayCount = 0; try (LogWriter logWriter = this.logService.createWriter()) { for (int storeId = 0; storeId < storeCount; ++storeId) { try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) { + long batchSnapshotId = ingestSnapshotId.get(); ReadLogEntry readLogEntry; while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { LogEntry logEntry = readLogEntry.getLogEntry(); @@ -255,15 +256,16 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc if (batch.getOperationCount() == 0) { continue; } - logWriter.append(storeId, new LogEntry(ingestSnapshotId.get(), batch)); + logWriter.append(storeId, new LogEntry(batchSnapshotId, batch)); replayCount++; } + ids.add(batchSnapshotId + 1); } } } logger.info("replay DML records finished. total replayed [{}] records", replayCount); - return List.of(batchSnapshotId); + return ids; } /** diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java index 621ad6faccdb..3f1005e0e31b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/FrontendStoreService.java @@ -20,14 +20,17 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.List; import java.util.Map; public class FrontendStoreService extends FrontendStoreServiceGrpc.FrontendStoreServiceImplBase { private final StoreService storeService; + private final KafkaProcessor processor; - public FrontendStoreService(StoreService storeService) { + public FrontendStoreService(StoreService storeService, KafkaProcessor processor) { this.storeService = storeService; + this.processor = processor; } @Override @@ -127,4 +130,21 @@ public void getState( responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } + + @Override + public void replayRecordsV2( + ReplayRecordsRequestV2 request, + StreamObserver responseObserver) { + try { + long offset = request.getOffset(); + long ts = request.getTimestamp(); + List si = this.processor.replayDMLRecordsFrom(offset, ts); + responseObserver.onNext( + ReplayRecordsResponseV2.newBuilder().addAllSnapshotId(si).build()); + responseObserver.onCompleted(); + } catch (IOException e) { + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()); + } + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index d99e97c2a31b..6394628f6bc4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -16,9 +16,7 @@ import com.alibaba.graphscope.groot.operation.OperationBlob; import com.alibaba.graphscope.groot.operation.OperationType; import com.alibaba.graphscope.groot.operation.StoreDataBatch; -import com.alibaba.graphscope.groot.wal.LogEntry; -import com.alibaba.graphscope.groot.wal.LogReader; -import com.alibaba.graphscope.groot.wal.LogService; +import com.alibaba.graphscope.groot.wal.*; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -31,6 +29,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class KafkaProcessor { @@ -56,7 +56,11 @@ public class KafkaProcessor { private volatile boolean shouldStop = true; List typesDDL; - BlockingQueue> queue; + BlockingQueue> writeQueue; + BlockingQueue replayQueue; + + private AtomicBoolean replayInProgress; + private AtomicLong latestSnapshotId; public KafkaProcessor( Configs configs, @@ -76,7 +80,10 @@ public KafkaProcessor( offsetsPersistIntervalMs = CoordinatorConfig.OFFSETS_PERSIST_INTERVAL_MS.get(configs); int queueSize = StoreConfig.STORE_QUEUE_BUFFER_SIZE.get(configs); - queue = new ArrayBlockingQueue<>(queueSize); + writeQueue = new ArrayBlockingQueue<>(queueSize); + replayQueue = new ArrayBlockingQueue<>(queueSize); + latestSnapshotId = new AtomicLong(-1); + replayInProgress = new AtomicBoolean(false); } public void start() { @@ -203,20 +210,24 @@ public void pollBatches() { // -1 stands for poll from latest try (LogReader reader = logService.createReader(storeId, -1)) { while (!shouldStop) { - ConsumerRecords records = reader.getLatestUpdates(); - for (ConsumerRecord record : records) { - queue.add(record); + try { + ConsumerRecords records = reader.getLatestUpdates(); + for (ConsumerRecord record : records) { + writeQueue.put(record); + } + } catch (InterruptedException e) { + throw new InternalException(e); + } catch (Exception ex) { + ex.printStackTrace(); } } - } catch (IOException e) { - throw new InternalException(e); + } catch (IOException ex) { + ex.printStackTrace(); } } - private void processRecord(ConsumerRecord record) { + private void processRecord(long offset, LogEntry logEntry) { int partitionCount = metaService.getPartitionCount(); - long offset = record.offset(); - LogEntry logEntry = record.value(); OperationBatch operationBatch = logEntry.getOperationBatch(); if (isSecondary) { // only catch up the schema updates operationBatch = Utils.extractOperations(operationBatch, typesDDL); @@ -228,7 +239,7 @@ private void processRecord(ConsumerRecord record) { StoreDataBatch.Builder builder = StoreDataBatch.newBuilder() .requestId("") - .queueId(storeId) + .queueId(0) .snapshotId(snapshotId) .traceId(operationBatch.getTraceId()) .offset(offset); @@ -263,11 +274,14 @@ public void replayWAL() throws IOException { logger.warn("It may not be expected to replay from the 0 offset, skipped"); return; } + int replayCount = 0; try (LogReader logReader = this.logService.createReader(storeId, replayFrom)) { + // replayInProgress.set(true); ConsumerRecord record; while ((record = logReader.readNextRecord()) != null) { - queue.put(record); + // writeQueue.put(new ReadLogEntry(record.offset(), record.value())); + writeQueue.put(record); replayCount++; if (replayCount % 10000 == 0) { logger.info("replayed {} records", replayCount); @@ -276,17 +290,44 @@ public void replayWAL() throws IOException { } catch (InterruptedException e) { throw new RuntimeException(e); } + + // } finally { + // replayInProgress.set(false); + // } logger.info("replayWAL finished. total replayed [{}] records", replayCount); } private void processRecords() { - while (true) { - try { - ConsumerRecord record = queue.take(); - processRecord(record); - } catch (InterruptedException e) { - e.printStackTrace(); + try { + while (true) { + long offset; + LogEntry logEntry; + if (replayInProgress.get() || !replayQueue.isEmpty()) { + if (replayQueue.isEmpty()) { + Thread.sleep(10); + continue; + } + ReadLogEntry readLogEntry = replayQueue.take(); + offset = readLogEntry.getOffset(); + logEntry = readLogEntry.getLogEntry(); + logEntry.setSnapshotId(latestSnapshotId.get()); + // logger.info("polled from replay queue, offset {}, id {}", + // offset, logEntry.getSnapshotId()); + + } else { + ConsumerRecord record = writeQueue.take(); + offset = record.offset(); + logEntry = record.value(); + latestSnapshotId.set(logEntry.getSnapshotId()); + // logger.info("polled from write queue, offset {}, id {}", + // offset, latestSnapshotId.get()); + + } + processRecord(offset, logEntry); } + } catch (InterruptedException ex) { + ex.printStackTrace(); + throw new RuntimeException(ex); } } @@ -305,4 +346,54 @@ private List prepareDDLTypes() { types.add(OperationType.MARKER); // For advance ID return types; } + + private List prepareDMLTypes() { + List types = new ArrayList<>(); + types.add(OperationType.OVERWRITE_VERTEX); + types.add(OperationType.UPDATE_VERTEX); + types.add(OperationType.DELETE_VERTEX); + types.add(OperationType.OVERWRITE_EDGE); + types.add(OperationType.UPDATE_EDGE); + types.add(OperationType.DELETE_EDGE); + types.add(OperationType.CLEAR_VERTEX_PROPERTIES); + types.add(OperationType.CLEAR_EDGE_PROPERTIES); + types.add(OperationType.ADD_VERTEX_TYPE_PROPERTIES); + types.add(OperationType.ADD_EDGE_TYPE_PROPERTIES); + return types; + } + + public List replayDMLRecordsFrom(long offset, long timestamp) throws IOException { + List types = prepareDMLTypes(); + logger.info("replay DML records of from offset [{}], ts [{}]", offset, timestamp); + // Note this clear is necessary, as those records would be a subset of record range in + // new reader + replayInProgress.set(true); + writeQueue.clear(); + long batchSnapshotId; + int replayCount = 0; + try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) { + ReadLogEntry readLogEntry; + batchSnapshotId = latestSnapshotId.get(); + while (!shouldStop && (readLogEntry = logReader.readNext()) != null) { + LogEntry logEntry = readLogEntry.getLogEntry(); + OperationBatch batch = Utils.extractOperations(logEntry.getOperationBatch(), types); + if (batch.getOperationCount() == 0) { + continue; + } + ReadLogEntry entry = + new ReadLogEntry(readLogEntry.getOffset(), batchSnapshotId, batch); + replayQueue.put(entry); + replayCount++; + if (replayCount % 10000 == 0) { + logger.info("replayed {} records", replayCount); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + replayInProgress.set(false); + } + logger.info("replay DML records finished. total replayed [{}] records", replayCount); + return List.of(batchSnapshotId + 1); + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java index cb86eaf5ec11..4f9547d5ce7e 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java @@ -68,11 +68,14 @@ public boolean offerQueue(int queueId, StoreDataBatch entry) throws InterruptedE if (innerQueue == null) { throw new InvalidArgumentException("invalid queueId [" + queueId + "]"); } - boolean res = innerQueue.offer(entry, this.queueWaitMs, TimeUnit.MILLISECONDS); - if (res) { - this.size.incrementAndGet(); - } - return res; + // boolean res = innerQueue.offer(entry, this.queueWaitMs, TimeUnit.MILLISECONDS); + // if (res) {x + // this.size.incrementAndGet(); + // } + // return res; + innerQueue.put(entry); + this.size.incrementAndGet(); + return true; } public StoreDataBatch poll() throws InterruptedException { @@ -108,7 +111,10 @@ public StoreDataBatch poll() throws InterruptedException { } long snapshotId = entry.getSnapshotId(); - if (snapshotId == this.currentPollSnapshotId) { + // allow for a short duration inconsistent, due to different frontend may have minor + // difference in timing + if (snapshotId == this.currentPollSnapshotId + || currentPollSnapshotId - snapshotId < 10) { this.size.decrementAndGet(); return entry; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 136d0639ae61..861d0a4790ed 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -17,6 +17,7 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.config.StoreConfig; +import com.alibaba.graphscope.groot.common.exception.ExternalStorageErrorException; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.common.exception.IllegalStateException; import com.alibaba.graphscope.groot.common.exception.InternalException; @@ -223,13 +224,16 @@ public boolean batchWrite(StoreDataBatch storeDataBatch) long snapshotId = storeDataBatch.getSnapshotId(); List> dataBatch = storeDataBatch.getDataBatch(); AtomicBoolean hasDdl = new AtomicBoolean(false); - int maxRetry = 10; + int maxRetry = 5; for (Map partitionToBatch : dataBatch) { while (!shouldStop && partitionToBatch.size() != 0 && maxRetry > 0) { partitionToBatch = writeStore(snapshotId, partitionToBatch, hasDdl); maxRetry--; } } + if (maxRetry == 0) { + throw new ExternalStorageErrorException("batchWrite failed after 5 attempts"); + } return hasDdl.get(); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index edde700c84d9..aad4ffa45112 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -15,6 +15,7 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; +import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.common.util.ThreadFactoryUtils; import com.alibaba.graphscope.groot.coordinator.SnapshotInfo; import com.alibaba.graphscope.groot.meta.MetaService; @@ -47,7 +48,8 @@ public class WriterAgent { private final RoleClients snapshotCommitter; private volatile boolean shouldStop = true; - private SnapshotSortQueue bufferQueue; + // private SnapshotSortQueue bufferQueue; + private BlockingQueue bufferQueue; private volatile long lastCommitSI; private volatile long consumeSI; private volatile long consumeDdlSnapshotId; @@ -68,6 +70,9 @@ public WriterAgent( this.metaService = metaService; this.snapshotCommitter = snapshotCommitter; this.availSnapshotInfoRef = new AtomicReference<>(); + // this.bufferQueue = new SnapshotSortQueue(this.configs, this.metaService); + int queueSize = StoreConfig.STORE_QUEUE_BUFFER_SIZE.get(configs); + this.bufferQueue = new ArrayBlockingQueue<>(queueSize); initMetrics(); } @@ -78,7 +83,6 @@ public void start() { this.availSnapshotInfoRef.set(new SnapshotInfo(0, 0)); this.shouldStop = false; - this.bufferQueue = new SnapshotSortQueue(this.configs, this.metaService); this.consumedQueueOffsets = new ArrayList<>(this.queueCount); for (int i = 0; i < this.queueCount; i++) { this.consumedQueueOffsets.add(-1L); @@ -135,15 +139,15 @@ public void stop() { public boolean writeStore(StoreDataBatch storeDataBatch) throws InterruptedException { // logger.info("writeStore {}", storeDataBatch.toProto()); // int queueId = storeDataBatch.getQueueId(); - boolean suc = this.bufferQueue.offerQueue(0, storeDataBatch); + // boolean suc = this.bufferQueue.offerQueue(0, storeDataBatch); // logger.debug("Buffer queue: {}, {}", suc, this.bufferQueue.innerQueueSizes()); - return suc; + this.bufferQueue.put(storeDataBatch); + return true; } public boolean writeStore2(List storeDataBatches) throws InterruptedException { for (StoreDataBatch storeDataBatch : storeDataBatches) { - int queueId = storeDataBatch.getQueueId(); - if (!this.bufferQueue.offerQueue(queueId, storeDataBatch)) { + if (!writeStore(storeDataBatch)) { return false; } } @@ -158,7 +162,7 @@ private void processBatches() { continue; } long batchSI = batch.getSnapshotId(); - logger.debug("polled one batch [" + batchSI + "]"); + // logger.debug("polled one batch [" + batchSI + "]"); boolean hasDdl = writeEngineWithRetry(batch); if (this.consumeSI < batchSI) { SnapshotInfo availSInfo = this.availSnapshotInfoRef.get(); @@ -167,16 +171,15 @@ private void processBatches() { this.consumeSI = batchSI; this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI)); this.commitExecutor.execute(this::asyncCommit); - } else { // a flurry of batches with same snapshot ID - logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); } + // else { // a flurry of batches with same snapshot ID + // logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); + // } if (hasDdl) { this.consumeDdlSnapshotId = batchSI; } // this.consumedQueueOffsets.set(batch.getQueueId(), batch.getOffset()); this.consumedQueueOffsets.set(0, batch.getOffset()); - } catch (InterruptedException e) { - logger.error("processBatches interrupted"); } catch (Exception e) { logger.error("error in processBatches, ignore", e); } @@ -198,19 +201,17 @@ private void asyncCommit() { } catch (Exception e) { logger.warn("commit failed. SI {}, offset {}. ignored", curSI, queueOffsets, e); } - } else { - logger.warn("curSI {} <= lastCommitSI {}, ignored", curSI, lastCommitSI); } } private boolean writeEngineWithRetry(StoreDataBatch storeDataBatch) { - while (!shouldStop) { - try { - return this.storeService.batchWrite(storeDataBatch); - } catch (Exception e) { - logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); - } + // while (!shouldStop) { + try { + return this.storeService.batchWrite(storeDataBatch); + } catch (Exception e) { + logger.error("writeEngine failed: batch {}.", storeDataBatch.toProto(), e); } + // } return false; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java index a34bfdff5891..10a064f1e00d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogEntry.java @@ -19,7 +19,7 @@ import java.util.Objects; public class LogEntry { - private final long snapshotId; + private long snapshotId; private final OperationBatch operationBatch; public LogEntry(long snapshotId, OperationBatch operationBatch) { @@ -48,6 +48,10 @@ public LogEntryPb toProto() { .build(); } + public void setSnapshotId(long snapshotId) { + this.snapshotId = snapshotId; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java index ae86914a8aac..5882be772147 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Store.java @@ -65,10 +65,13 @@ public Store(Configs configs) { this.writerAgent = new WriterAgent(configs, this.storeService, this.metaService, snapshotCommitter); + this.processor = new KafkaProcessor(configs, metaService, writerAgent, logService); + this.backupAgent = new BackupAgent(configs, this.storeService); StoreBackupService storeBackupService = new StoreBackupService(this.backupAgent); StoreSchemaService storeSchemaService = new StoreSchemaService(this.storeService); - FrontendStoreService storeIngestService = new FrontendStoreService(this.storeService); + FrontendStoreService frontendStoreService = + new FrontendStoreService(this.storeService, this.processor); StoreSnapshotService storeSnapshotService = new StoreSnapshotService(this.storeService); this.rpcServer = new RpcServer( @@ -76,13 +79,12 @@ public Store(Configs configs) { localNodeProvider, storeBackupService, storeSchemaService, - storeIngestService, + frontendStoreService, storeSnapshotService); IrServiceProducer serviceProducer = new IrServiceProducer(configs); this.executorService = serviceProducer.makeExecutorService(storeService, metaService, discoveryFactory); this.partitionService = new PartitionService(configs, storeService); - this.processor = new KafkaProcessor(configs, metaService, writerAgent, logService); } @Override diff --git a/proto/groot/frontend_store_service.proto b/proto/groot/frontend_store_service.proto index 0280a60d9b00..0f07fd4982d9 100644 --- a/proto/groot/frontend_store_service.proto +++ b/proto/groot/frontend_store_service.proto @@ -27,4 +27,5 @@ service FrontendStoreService { rpc compactDB(CompactDBRequest) returns(CompactDBResponse); rpc reopenSecondary(ReopenSecondaryRequest) returns (ReopenSecondaryResponse); rpc GetState(GetStoreStateRequest) returns (GetStoreStateResponse); + rpc replayRecordsV2(ReplayRecordsRequestV2) returns(ReplayRecordsResponseV2); } diff --git a/proto/groot/sdk/client_service.proto b/proto/groot/sdk/client_service.proto index 3591054a2282..ed9d2a276e15 100644 --- a/proto/groot/sdk/client_service.proto +++ b/proto/groot/sdk/client_service.proto @@ -35,6 +35,7 @@ service Client { rpc getStoreState(GetStoreStateRequest) returns (GetStoreStateResponse); rpc compactDB(CompactDBRequest) returns (CompactDBResponse); rpc reopenSecondary(ReopenSecondaryRequest) returns (ReopenSecondaryResponse); + rpc replayRecordsV2(ReplayRecordsRequestV2) returns(ReplayRecordsResponseV2); } message GetSchemaRequest { diff --git a/proto/groot/sdk/model.proto b/proto/groot/sdk/model.proto index 81ddf100c245..8c6eb31a4e02 100644 --- a/proto/groot/sdk/model.proto +++ b/proto/groot/sdk/model.proto @@ -250,3 +250,12 @@ message ReopenSecondaryRequest { message ReopenSecondaryResponse { bool success = 1; } + +message ReplayRecordsRequestV2 { + int64 offset = 1; + int64 timestamp = 2; +} + +message ReplayRecordsResponseV2 { + repeated int64 snapshot_id = 1; +} \ No newline at end of file diff --git a/python/graphscope/client/connection.py b/python/graphscope/client/connection.py index 751540c6f9ef..814789571073 100644 --- a/python/graphscope/client/connection.py +++ b/python/graphscope/client/connection.py @@ -213,6 +213,15 @@ def reopen_secondary(self): ) return response.success + def replay_records_v2(self, offset: int, timestamp: int): + request = model_pb2.ReplayRecordsRequestV2() + request.offset = offset + request.timestamp = timestamp + response = self._client_service_stub.replayRecordsV2( + request, metadata=self._metadata + ) + return response.snapshot_id + def _encode_metadata(self, username, password): if not (username and password): return None