Skip to content

Commit

Permalink
[opt](mtmv) Optimize plan generate when create mtmv and use mtmv cach…
Browse files Browse the repository at this point in the history
…e when collect table of mtmv (apache#44786)

Optimize plan generate when create mtmv and use mtmv cache when collect
table of mtmv
1. Reuse plans when creating materialized views to minimize plan
generation overhead.
2. During recursive base table resolution for MTMVs, prioritize MTMV
cache lookup. Fall back to real-time generation only when cache miss
occurs.
  • Loading branch information
seawinde committed Dec 6, 2024
1 parent df43cdd commit fe58833
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 120 deletions.
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ public class MTMVCache {
// The materialized view plan which should be optimized by the same rules to query
// and will remove top sink and unused sort
private final Plan logicalPlan;
// The original plan of mv def sql
// The original rewritten plan of mv def sql
private final Plan originalPlan;
// The analyzed plan of mv def sql, which is used by tableCollector,should not be optimized by rbo
private final Plan analyzedPlan;
private final Statistics statistics;
private final StructInfo structInfo;

public MTMVCache(Plan logicalPlan, Plan originalPlan, Statistics statistics, StructInfo structInfo) {
public MTMVCache(Plan logicalPlan, Plan originalPlan, Plan analyzedPlan,
Statistics statistics, StructInfo structInfo) {
this.logicalPlan = logicalPlan;
this.originalPlan = originalPlan;
this.analyzedPlan = analyzedPlan;
this.statistics = statistics;
this.structInfo = structInfo;
}
Expand All @@ -71,6 +75,10 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getAnalyzedPlan() {
return analyzedPlan;
}

public Statistics getStatistics() {
return statistics;
}
Expand Down Expand Up @@ -117,7 +125,7 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
Optional<StructInfo> structInfoOptional = MaterializationContext.constructStructInfo(mvPlan, originPlan,
planner.getCascadesContext(),
new BitSet());
return new MTMVCache(mvPlan, originPlan, needCost
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(), needCost
? planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
structInfoOptional.orElseGet(() -> null));
}
Expand Down
31 changes: 9 additions & 22 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -104,31 +102,20 @@ private static void setCatalogAndDb(ConnectContext ctx, MTMV mtmv) {
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = getPlanBySql(mtmv.getQuerySql(), ctx);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
return generateMTMVRelation(plan);
Plan plan = getAnalyzePlanBySql(mtmv.getQuerySql(), ctx);
return generateMTMVRelation(plan, ctx);
}

public static MTMVRelation generateMTMVRelation(Plan plan) {
return new MTMVRelation(getBaseTables(plan, true), getBaseTables(plan, false), getBaseViews(plan));
public static MTMVRelation generateMTMVRelation(Plan plan, ConnectContext connectContext) {
return new MTMVRelation(getBaseTables(plan, true, connectContext),
getBaseTables(plan, false, connectContext), getBaseViews(plan));
}

private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand) {
private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand, ConnectContext connectContext) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets
.newHashSet(TableType.values()), expand);
.newHashSet(TableType.values()), expand, connectContext);
plan.accept(TableCollector.INSTANCE, collectorContext);
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
Expand All @@ -146,7 +133,7 @@ private static Set<BaseTableInfo> transferTableIfToInfo(Set<TableIf> tables) {
return result;
}

private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
private static Plan getAnalyzePlanBySql(String querySql, ConnectContext ctx) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
Expand All @@ -159,7 +146,7 @@ private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
ctx.setStatementContext(new StatementContext());
try {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
} finally {
ctx.setStatementContext(original);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ protected void doInitMaterializationContext(CascadesContext cascadesContext) {
return;
}
// Only collect the table or mv which query use directly, to avoid useless mv partition in rewrite
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), false);
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), false,
cascadesContext.getConnectContext());
try {
Plan rewritePlan = cascadesContext.getRewritePlan();
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
collectorContext.setConnectContext(cascadesContext.getConnectContext());
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init table collect fail, current queryId is %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
return new MTMVCache(mvPlan, originPlan,
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(),
planner.getCascadesContext().getMemo().getRoot().getStatistics(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,21 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
NereidsPlanner planner = new NereidsPlanner(statementContext);
// this is for expression column name infer when not use alias
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
Set<String> tempDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames();
ctx.getSessionVariable().setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
Plan plan;
try {
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} finally {
// after operate, roll back the disable rules
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
// can not contain Random function
Expand All @@ -268,8 +280,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
throw new AnalysisException("can not contain invalid expression");
}
getRelation(planner);
this.mvPartitionInfo = mvPartitionDefinition
.analyzeAndTransferToMTMVPartitionInfo(planner, ctx, logicalQuery);
this.mvPartitionInfo = mvPartitionDefinition.analyzeAndTransferToMTMVPartitionInfo(planner, ctx);
this.partitionDesc = generatePartitionDesc(ctx);
getColumns(plan, ctx, mvPartitionInfo.getPartitionCol(), distribution);
analyzeKeys();
Expand Down Expand Up @@ -314,24 +325,9 @@ private void analyzeKeys() {
}
}

// Should use analyzed plan for collect views and tables
private void getRelation(NereidsPlanner planner) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
ConnectContext ctx = planner.getCascadesContext().getConnectContext();
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
this.relation = MTMVPlanUtil.generateMTMVRelation(planner.getAnalyzedPlan(), planner.getConnectContext());
}

private PartitionDesc generatePartitionDesc(ConnectContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,14 @@
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.Sets;

Expand All @@ -72,11 +67,9 @@ public class MTMVPartitionDefinition {
*
* @param planner planner
* @param ctx ctx
* @param logicalQuery logicalQuery
* @return MTMVPartitionInfo
*/
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx,
LogicalPlan logicalQuery) {
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx) {
MTMVPartitionInfo mtmvPartitionInfo = new MTMVPartitionInfo(partitionType);
if (this.partitionType == MTMVPartitionType.SELF_MANAGE) {
return mtmvPartitionInfo;
Expand All @@ -100,7 +93,7 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
timeUnit = null;
}
mtmvPartitionInfo.setPartitionCol(partitionColName);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName, timeUnit);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, partitionColName, timeUnit);
mtmvPartitionInfo.setRelatedCol(relatedTableInfo.getColumn());
mtmvPartitionInfo.setRelatedTable(relatedTableInfo.getTableInfo());
if (relatedTableInfo.getPartitionExpression().isPresent()) {
Expand All @@ -125,47 +118,33 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
return mtmvPartitionInfo;
}

private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx, LogicalPlan
logicalQuery,
String partitionColName,
String timeUnit) {
// Should use rewritten plan without view and subQuery to get related partition table
private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx,
String partitionColName, String timeUnit) {
CascadesContext cascadesContext = planner.getCascadesContext();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);

RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, planner.getRewrittenPlan(), cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
Plan mvRewrittenPlan =
planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, mvRewrittenPlan, cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty()));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty()));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}

if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
}

private static List<Expr> convertToLegacyArguments(List<Expression> children) {
Expand Down
Loading

0 comments on commit fe58833

Please sign in to comment.