Skip to content

Commit

Permalink
Merge branch 'main' into ir_metric_tool
Browse files Browse the repository at this point in the history
  • Loading branch information
shirly121 committed Nov 21, 2024
2 parents b272f5a + cd7879c commit 79f4ccc
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.GraphConfig;
import com.alibaba.graphscope.common.config.PlannerConfig;
import com.alibaba.graphscope.common.ir.meta.GraphId;
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.meta.IrMetaStats;
import com.alibaba.graphscope.common.ir.meta.IrMetaTracker;
Expand All @@ -46,26 +47,27 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable
private volatile IrMetaStats currentState;
// To manage the state changes of statistics resulting from different update operations.
private volatile StatsState statsState;
private final boolean fetchStats;
private volatile Boolean statsEnabled = null;

public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) {
super(dataReader, tracker);
this.scheduler = new ScheduledThreadPoolExecutor(1);
this.scheduler.scheduleAtFixedRate(
() -> syncMeta(),
0,
GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
this.fetchStats =
long schemaIntervalMS = GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs);
if (schemaIntervalMS > 0) {
logger.info("start to schedule the schema sync task per {} ms", schemaIntervalMS);
this.scheduler.scheduleAtFixedRate(
() -> syncMeta(), schemaIntervalMS, schemaIntervalMS, TimeUnit.MILLISECONDS);
}
boolean isCBOMode =
PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs)
&& PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO");
if (this.fetchStats) {
logger.info("start to schedule statistics fetch task");
&& PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equalsIgnoreCase("CBO");
long statsIntervalMS = GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs);
if (!isCBOMode || statsIntervalMS <= 0) {
this.statsEnabled = false;
} else {
logger.info("start to schedule the stats sync task per {} ms", statsIntervalMS);
this.scheduler.scheduleAtFixedRate(
() -> syncStats(),
2000,
GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs),
TimeUnit.MILLISECONDS);
() -> syncStats(), statsIntervalMS, statsIntervalMS, TimeUnit.MILLISECONDS);
}
}

Expand All @@ -80,7 +82,6 @@ private synchronized void syncMeta() {
logger.debug(
"schema from remote: {}",
(meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON));
GraphStatistics curStats;
// if the graph id or schema version is changed, we need to update the statistics
if (this.currentState == null
|| !this.currentState.getGraphId().equals(meta.getGraphId())
Expand All @@ -89,58 +90,74 @@ private synchronized void syncMeta() {
.getVersion()
.equals(meta.getSchema().getVersion())) {
this.statsState = StatsState.INITIALIZED;
curStats = null;
} else {
curStats = this.currentState.getStatistics();
this.currentState =
new IrMetaStats(
meta.getGraphId(),
meta.getSnapshotId(),
meta.getSchema(),
meta.getStoredProcedures(),
null);
}
this.currentState =
new IrMetaStats(
meta.getGraphId(),
meta.getSnapshotId(),
meta.getSchema(),
meta.getStoredProcedures(),
curStats);
if (this.fetchStats && this.statsState != StatsState.SYNCED) {
logger.info("start to schedule statistics fetch task");
boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId());
if (statsEnabled && this.statsState != StatsState.SYNCED
|| (!statsEnabled && this.statsState != StatsState.MOCKED)) {
logger.debug("start to sync stats");
syncStats();
}
} catch (Throwable e) {
logger.warn("failed to read meta data", e);
logger.warn("failed to read meta data, error is {}", e);
}
}

private boolean getStatsEnabled(GraphId graphId) {
try {
this.statsEnabled =
(this.statsEnabled == null)
? this.reader.syncStatsEnabled(graphId)
: this.statsEnabled;
return this.statsEnabled;
} catch (
Throwable e) { // if errors happen when reading stats enabled, we assume it is false
logger.warn("failed to read stats enabled, error is {}", e);
return false;
}
}

private synchronized void syncStats() {
try {
if (this.currentState != null) {
GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId());
logger.debug("statistics from remote: {}", stats);
if (stats != null && stats.getVertexCount() != 0) {
this.currentState =
new IrMetaStats(
this.currentState.getSnapshotId(),
this.currentState.getSchema(),
this.currentState.getStoredProcedures(),
stats);
if (tracker != null) {
logger.debug("start to update the glogue");
tracker.onChanged(this.currentState);
boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId());
if (statsEnabled) {
GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId());
logger.debug("statistics from remote: {}", stats);
if (stats != null && stats.getVertexCount() != 0) {
this.currentState =
new IrMetaStats(
this.currentState.getSnapshotId(),
this.currentState.getSchema(),
this.currentState.getStoredProcedures(),
stats);
if (tracker != null) {
logger.info("start to update the glogue");
tracker.onChanged(this.currentState);
}
this.statsState = StatsState.SYNCED;
}
this.statsState = StatsState.SYNCED;
}
}
} catch (Throwable e) {
logger.warn("failed to read graph statistics, error is", e);
logger.warn("failed to read graph statistics, error is {}", e);
} finally {
try {
if (this.currentState != null
&& tracker != null
&& this.statsState == StatsState.INITIALIZED) {
logger.debug("start to mock the glogue");
logger.info("start to mock the glogue");
tracker.onChanged(this.currentState);
this.statsState = StatsState.MOCKED;
}
} catch (Throwable t) {
logger.warn("failed to mock the glogue, error is", t);
logger.warn("failed to mock the glogue, error is {}", t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ public void onMatch(RelOptRuleCall relOptRuleCall) {
// specific optimization for relational DB scenario.
// 3. `JoinByEdge`: Split the pattern by edge, convert a triangle pattern to `JoinByEdge` to
// support optimizations in Neo4j.
if (getMaxEdgeNum(graphPattern.getPattern()) > 2) {
(new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity))
.addDecompositions();
}
(new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity)).addDecompositions();
if (config.getForeignKeyMeta() != null) {
(new JoinByForeignKey(graphPattern, mq, decompositionQueue, queueCapacity))
.addDecompositions();
Expand Down Expand Up @@ -311,10 +308,13 @@ public JoinByVertex(

@Override
public void addDecompositions() {
List<GraphJoinDecomposition> queues = initDecompositions();
while (!queues.isEmpty()) {
List<GraphJoinDecomposition> nextCompositions = getDecompositions(queues.remove(0));
queues.addAll(nextCompositions);
if (getMaxEdgeNum(graphPattern.getPattern()) > 2) {
List<GraphJoinDecomposition> queues = initDecompositions();
while (!queues.isEmpty()) {
List<GraphJoinDecomposition> nextCompositions =
getDecompositions(queues.remove(0));
queues.addAll(nextCompositions);
}
}
addPxdInnerVDecompositions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -59,4 +60,9 @@ protected RelOptCost upperBoundForInputs(RelNode mExpr, RelOptCost upperBound) {
if (relCost == null) return null;
return cost.plus(relCost);
}

@Override
public void registerSchema(RelOptSchema schema) {
// do nothing
}
}

0 comments on commit 79f4ccc

Please sign in to comment.