Skip to content

Commit

Permalink
Pick some pr to branch 21 #42279 #44164 (#44369)
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde authored Nov 22, 2024
1 parent 92a0919 commit 5e9bda6
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,25 +244,7 @@ private LogicalPlan makeOlapScan(TableIf table, UnboundRelation unboundRelation,
} else {
// it's a duplicate, unique or hash distribution agg table
// add delete sign filter on olap scan if needed
if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
&& !ConnectContext.get().getSessionVariable().skipDeleteSign()) {
// table qualifier is catalog.db.table, we make db.table.column
Slot deleteSlot = null;
for (Slot slot : scan.getOutput()) {
if (slot.getName().equals(Column.DELETE_SIGN)) {
deleteSlot = slot;
break;
}
}
Preconditions.checkArgument(deleteSlot != null);
Expression conjunct = new EqualTo(new TinyIntLiteral((byte) 0), deleteSlot);
if (!((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
scan = scan.withPreAggStatus(
PreAggStatus.off(Column.DELETE_SIGN + " is used as conjuncts."));
}
return new LogicalFilter<>(Sets.newHashSet(conjunct), scan);
}
return scan;
return checkAndAddDeleteSignFilter(scan, ConnectContext.get(), (OlapTable) table);
}
}

Expand Down Expand Up @@ -370,6 +352,32 @@ private Optional<LogicalPlan> handleMetaTable(TableIf table, UnboundRelation unb
return Optional.empty();
}

/**
* Add delete sign filter on olap scan if need.
*/
public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan scan, ConnectContext connectContext,
OlapTable olapTable) {
if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
&& !connectContext.getSessionVariable().skipDeleteSign()) {
// table qualifier is catalog.db.table, we make db.table.column
Slot deleteSlot = null;
for (Slot slot : scan.getOutput()) {
if (slot.getName().equals(Column.DELETE_SIGN)) {
deleteSlot = slot;
break;
}
}
Preconditions.checkArgument(deleteSlot != null);
Expression conjunct = new EqualTo(new TinyIntLiteral((byte) 0), deleteSlot);
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
scan = scan.withPreAggStatus(PreAggStatus.off(
Column.DELETE_SIGN + " is used as conjuncts."));
}
return new LogicalFilter<>(Sets.newHashSet(conjunct), scan);
}
return scan;
}

private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelation,
List<String> qualifiedTableName, CascadesContext cascadesContext) {
// for create view stmt replace tableName to ctl.db.tableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
continue;
}
Plan rewrittenPlan;
Plan mvScan = materializationContext.getScanPlan();
Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext);
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
Expand All @@ -254,15 +254,14 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
rewrittenPlan = new LogicalFilter<>(Sets.newLinkedHashSet(rewriteCompensatePredicates), mvScan);
}
boolean checkResult = rewriteQueryByViewPreCheck(matchMode, queryStructInfo,
viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext);
if (!checkResult) {
continue;
}
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
// If rewrite successfully, try to get mv read lock to avoid data inconsistent,
// try to get lock which should added before RBO
if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) {
cascadesContext.getStatementContext()
.addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv());
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
Expand Down Expand Up @@ -374,9 +373,9 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
materializationContext.clearScanPlan(cascadesContext);
}
return rewriteResults;
}
Expand Down Expand Up @@ -527,6 +526,16 @@ protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
return Pair.of(mvPartitionNeedRemoveNameMap, baseTablePartitionNeedUnionNameMap);
}

/**
* Query rewrite result may output origin plan , this will cause loop.
* if return origin plan, need add check hear.
*/
protected boolean rewriteQueryByViewPreCheck(MatchMode matchMode, StructInfo queryStructInfo,
StructInfo viewStructInfo, SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan,
MaterializationContext materializationContext) {
return true;
}

/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
Expand Down Expand Up @@ -56,8 +57,7 @@ public class AsyncMaterializationContext extends MaterializationContext {
*/
public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List<Table> baseTables,
List<Table> baseViews, CascadesContext cascadesContext, StructInfo structInfo) {
super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, cascadesContext),
cascadesContext, structInfo);
super(mvPlan, mvOriginalPlan, cascadesContext, structInfo);
this.mtmv = mtmv;
}

Expand All @@ -67,7 +67,8 @@ public MTMV getMtmv() {

@Override
Plan doGenerateScanPlan(CascadesContext cascadesContext) {
return MaterializedViewUtils.generateMvScanPlan(this.mtmv, cascadesContext);
return MaterializedViewUtils.generateMvScanPlan(this.mtmv, this.mtmv.getBaseIndexId(),
this.mtmv.getPartitionIds(), PreAggStatus.on(), cascadesContext);
}

@Override
Expand Down Expand Up @@ -107,7 +108,8 @@ public Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascades
return Optional.empty();
}
RelationId relationId = null;
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan().collectFirst(LogicalOlapScan.class::isInstance);
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null, cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (logicalOlapScan.isPresent()) {
relationId = logicalOlapScan.get().getRelationId();
}
Expand All @@ -127,7 +129,14 @@ boolean isFinalChosen(Relation relation) {
);
}

public Plan getScanPlan() {
@Override
public Plan getScanPlan(StructInfo queryInfo, CascadesContext cascadesContext) {
// If try to get scan plan or rewrite successfully, try to get mv read lock to avoid meta data inconsistent,
// try to get lock which should added before RBO
if (!this.isSuccess()) {
cascadesContext.getStatementContext().addTableReadLock(this.getMtmv());
}
super.getScanPlan(queryInfo, cascadesContext);
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,33 +105,20 @@ public abstract class MaterializationContext {
/**
* MaterializationContext, this contains necessary info for query rewriting by materialization
*/
public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
public MaterializationContext(Plan plan, Plan originalPlan,
CascadesContext cascadesContext, StructInfo structInfo) {
this.plan = plan;
this.originalPlan = originalPlan;
this.scanPlan = scanPlan;

StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement();
this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain()
&& ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
}
}
// Construct materialization struct info, catch exception which may cause planner roll back
this.structInfo = structInfo == null
? constructStructInfo(plan, originalPlan, cascadesContext, new BitSet()).orElseGet(() -> null)
: structInfo;
this.available = this.structInfo != null;
if (available) {
this.planOutputShuttledExpressions = this.structInfo.getPlanOutputShuttledExpressions();
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
scanPlanOutput);
}
}

Expand Down Expand Up @@ -176,17 +163,19 @@ public void addMatchedGroup(GroupId groupId, boolean rewriteSuccess) {
* if MaterializationContext is already rewritten successfully, then should generate new scan plan in later
* query rewrite, because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
* This method should be called when query rewrite successfully
*/
public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
public void tryGenerateScanPlan(CascadesContext cascadesContext) {
if (!this.isAvailable()) {
return;
}
this.scanPlan = doGenerateScanPlan(cascadesContext);
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
// Materialization output expression shuttle, this will be used to expression rewrite
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(this.planOutputShuttledExpressions,
scanPlanOutput);
// This is used by normalize statistics column expression
Map<Expression, Expression> regeneratedMapping = new HashMap<>();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
Expand All @@ -195,6 +184,17 @@ public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
this.exprToScanExprMapping = regeneratedMapping;
}

/**
* Should clear scan plan after materializationContext is already rewritten successfully,
* Because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
*/
public void clearScanPlan(CascadesContext cascadesContext) {
this.scanPlan = null;
this.shuttledExprToScanExprMapping = null;
this.exprToScanExprMapping = null;
}

public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) {
queryToMaterializationSlotMappingCache.put(relationMapping, slotMapping);
}
Expand Down Expand Up @@ -275,7 +275,11 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getScanPlan() {
public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) {
if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null
|| this.exprToScanExprMapping == null) {
tryGenerateScanPlan(cascadesContext);
}
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.TableIdentifier;
Expand All @@ -30,6 +30,7 @@
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.StructInfoMap;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.Alias;
Expand Down Expand Up @@ -212,19 +213,24 @@ public static List<StructInfo> extractStructInfo(Plan plan, Plan originalPlan, C
* when query rewrite, because one plan may hit the materialized view repeatedly and the mv scan output
* should be different
*/
public static Plan generateMvScanPlan(MTMV materializedView, CascadesContext cascadesContext) {
return new LogicalOlapScan(
public static Plan generateMvScanPlan(OlapTable table, long indexId,
List<Long> partitionIds,
PreAggStatus preAggStatus,
CascadesContext cascadesContext) {
LogicalOlapScan olapScan = new LogicalOlapScan(
cascadesContext.getStatementContext().getNextRelationId(),
materializedView,
materializedView.getFullQualifiers(),
table,
ImmutableList.of(table.getQualifiedDbName()),
ImmutableList.of(),
materializedView.getPartitionIds(),
materializedView.getBaseIndexId(),
PreAggStatus.on(),
partitionIds,
indexId,
preAggStatus,
ImmutableList.of(),
// this must be empty, or it will be used to sample
ImmutableList.of(),
Optional.empty());
return BindRelation.checkAndAddDeleteSignFilter(olapScan, cascadesContext.getConnectContext(),
olapScan.getTable());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean isMVPar
.analyze()
.rewrite();
// scan plan output will be refreshed after mv rewrite successfully, so need tmp store
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0).getScanPlan().getOutputSet();
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0)
.getScanPlan(null, c1).getOutputSet();
tmpPlanChecker
.optimize()
.printlnBestPlanTree();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,49 @@
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query12_0_before --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query12_0_after --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

Loading

0 comments on commit 5e9bda6

Please sign in to comment.