Skip to content

Commit

Permalink
[opt](mtmv) Optimize plan generate when create mtmv and use mtmv cach…
Browse files Browse the repository at this point in the history
…e when collect table of mtmv
  • Loading branch information
seawinde committed Nov 29, 2024
1 parent 08b187b commit 3bee5b5
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 120 deletions.
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ public class MTMVCache {
// The materialized view plan which should be optimized by the same rules to query
// and will remove top sink and unused sort
private final Plan logicalPlan;
// The original plan of mv def sql
// The original rewritten plan of mv def sql
private final Plan originalPlan;
// The analyzed plan of mv def sql, which is used by tableCollector,should not be optimized by rbo
private final Plan analyzedPlan;
private final Statistics statistics;
private final StructInfo structInfo;

public MTMVCache(Plan logicalPlan, Plan originalPlan, Statistics statistics, StructInfo structInfo) {
public MTMVCache(Plan logicalPlan, Plan originalPlan, Plan analyzedPlan,
Statistics statistics, StructInfo structInfo) {
this.logicalPlan = logicalPlan;
this.originalPlan = originalPlan;
this.analyzedPlan = analyzedPlan;
this.statistics = statistics;
this.structInfo = structInfo;
}
Expand All @@ -71,6 +75,10 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getAnalyzedPlan() {
return analyzedPlan;
}

public Statistics getStatistics() {
return statistics;
}
Expand Down Expand Up @@ -117,7 +125,7 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
Optional<StructInfo> structInfoOptional = MaterializationContext.constructStructInfo(mvPlan, originPlan,
planner.getCascadesContext(),
new BitSet());
return new MTMVCache(mvPlan, originPlan, needCost
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(), needCost
? planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
structInfoOptional.orElseGet(() -> null));
}
Expand Down
31 changes: 9 additions & 22 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -98,31 +96,20 @@ private static void setCatalogAndDb(ConnectContext ctx, MTMV mtmv) {
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = getPlanBySql(mtmv.getQuerySql(), ctx);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
return generateMTMVRelation(plan);
Plan plan = getAnalyzePlanBySql(mtmv.getQuerySql(), ctx);
return generateMTMVRelation(plan, ctx);
}

public static MTMVRelation generateMTMVRelation(Plan plan) {
return new MTMVRelation(getBaseTables(plan, true), getBaseTables(plan, false), getBaseViews(plan));
public static MTMVRelation generateMTMVRelation(Plan plan, ConnectContext connectContext) {
return new MTMVRelation(getBaseTables(plan, true, connectContext),
getBaseTables(plan, false, connectContext), getBaseViews(plan));
}

private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand) {
private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand, ConnectContext connectContext) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets
.newHashSet(TableType.values()), expand);
.newHashSet(TableType.values()), expand, connectContext);
plan.accept(TableCollector.INSTANCE, collectorContext);
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
Expand All @@ -140,7 +127,7 @@ private static Set<BaseTableInfo> transferTableIfToInfo(Set<TableIf> tables) {
return result;
}

private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
private static Plan getAnalyzePlanBySql(String querySql, ConnectContext ctx) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
Expand All @@ -153,7 +140,7 @@ private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
ctx.setStatementContext(new StatementContext());
try {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
} finally {
ctx.setStatementContext(original);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
*/
protected void doInitMaterializationContext(CascadesContext cascadesContext) {
// Only collect the table or mv which query use directly, to avoid useless mv partition in rewrite
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), false);
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), false,
cascadesContext.getConnectContext());
try {
Plan rewritePlan = cascadesContext.getRewritePlan();
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
collectorContext.setConnectContext(cascadesContext.getConnectContext());
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init table collect fail, current queryId is %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
return new MTMVCache(mvPlan, originPlan,
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(),
planner.getCascadesContext().getMemo().getRoot().getStatistics(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,21 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
NereidsPlanner planner = new NereidsPlanner(statementContext);
// this is for expression column name infer when not use alias
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
Set<String> tempDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames();
ctx.getSessionVariable().setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
Plan plan;
try {
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} finally {
// after operate, roll back the disable rules
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
// can not contain Random function
Expand All @@ -265,8 +277,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
throw new AnalysisException("can not contain invalid expression");
}
getRelation(planner);
this.mvPartitionInfo = mvPartitionDefinition
.analyzeAndTransferToMTMVPartitionInfo(planner, ctx, logicalQuery);
this.mvPartitionInfo = mvPartitionDefinition.analyzeAndTransferToMTMVPartitionInfo(planner, ctx);
this.partitionDesc = generatePartitionDesc(ctx);
getColumns(plan, ctx, mvPartitionInfo.getPartitionCol(), distribution);
analyzeKeys();
Expand Down Expand Up @@ -311,24 +322,9 @@ private void analyzeKeys() {
}
}

// Should use analyzed plan for collect views and tables
private void getRelation(NereidsPlanner planner) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
ConnectContext ctx = planner.getCascadesContext().getConnectContext();
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
this.relation = MTMVPlanUtil.generateMTMVRelation(planner.getAnalyzedPlan(), planner.getConnectContext());
}

private PartitionDesc generatePartitionDesc(ConnectContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,14 @@
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.Sets;

Expand All @@ -72,11 +67,9 @@ public class MTMVPartitionDefinition {
*
* @param planner planner
* @param ctx ctx
* @param logicalQuery logicalQuery
* @return MTMVPartitionInfo
*/
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx,
LogicalPlan logicalQuery) {
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx) {
MTMVPartitionInfo mtmvPartitionInfo = new MTMVPartitionInfo(partitionType);
if (this.partitionType == MTMVPartitionType.SELF_MANAGE) {
return mtmvPartitionInfo;
Expand All @@ -100,7 +93,7 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
timeUnit = null;
}
mtmvPartitionInfo.setPartitionCol(partitionColName);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName, timeUnit);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, partitionColName, timeUnit);
mtmvPartitionInfo.setRelatedCol(relatedTableInfo.getColumn());
mtmvPartitionInfo.setRelatedTable(relatedTableInfo.getTableInfo());
if (relatedTableInfo.getPartitionExpression().isPresent()) {
Expand All @@ -125,47 +118,33 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
return mtmvPartitionInfo;
}

private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx, LogicalPlan
logicalQuery,
String partitionColName,
String timeUnit) {
// Should use rewritten plan without view and subQuery to get related partition table
private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx,
String partitionColName, String timeUnit) {
CascadesContext cascadesContext = planner.getCascadesContext();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);

RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, planner.getRewrittenPlan(), cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
Plan mvRewrittenPlan =
planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, mvRewrittenPlan, cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty()));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty()));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}

if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
}

private static List<Expr> convertToLegacyArguments(List<Expression> children) {
Expand Down
Loading

0 comments on commit 3bee5b5

Please sign in to comment.