Skip to content

Commit

Permalink
[opt](mtmv) Support rewritten by mv when query has limit or topN whic…
Browse files Browse the repository at this point in the history
…h could be pushed but mv doesn't have
  • Loading branch information
seawinde committed Dec 18, 2024
1 parent 0c97e04 commit c6f7ccf
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
Expand Down Expand Up @@ -428,6 +429,10 @@ private Pair<BitSet, Long> buildForMv(Plan plan) {
return Pair.of(child.first, child.second);
}

if (StructInfo.isValidLimit(plan)) {
return this.buildForMv(plan.child(0));
}

// process Other Node
int idx = this.addStructInfoNode(plan);
return Pair.of(new BitSet(), LongBitmap.newBitmap(idx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
Expand All @@ -55,11 +56,14 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer;
Expand Down Expand Up @@ -653,6 +657,15 @@ public static PlanCheckContext of(Set<JoinType> supportJoinTypes) {
}
}

/**
* Limit, topN or partitionTopN which is pushed down are valid
* */
public static boolean isValidLimit(Plan plan) {
return (plan instanceof LogicalLimit && ((LogicalLimit<?>) plan).getPhase() == LimitPhase.LOCAL)
|| (plan instanceof LogicalPartitionTopN && ((LogicalPartitionTopN<?>) plan).isPushed())
|| (plan instanceof LogicalTopN && ((LogicalTopN<?>) plan).isPushed());
}

/**
* PlanPatternChecker, this is used to check the plan pattern is valid or not
*/
Expand Down Expand Up @@ -692,7 +705,7 @@ public Boolean visit(Plan plan, PlanCheckContext checkContext) {
|| plan instanceof LogicalSort
|| plan instanceof LogicalAggregate
|| plan instanceof GroupPlan
|| plan instanceof LogicalRepeat) {
|| plan instanceof LogicalRepeat || isValidLimit(plan)) {
return doVisit(plan, checkContext);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public List<Rule> buildRules() {
return null;
}
List<OrderKey> orderKeys = Lists.newArrayList(orderKeysOpt.get());
return new LogicalTopN<>(orderKeys, limit.getLimit(), limit.getOffset(), agg);
return new LogicalTopN<>(orderKeys, limit.getLimit(), limit.getOffset(), agg,
false);
}).toRule(RuleType.LIMIT_AGG_TO_TOPN_AGG),
//limit->project->agg to topn->project->agg
logicalLimit(logicalProject(logicalAggregate()))
Expand All @@ -82,7 +83,7 @@ public List<Rule> buildRules() {

if (outputAllGroupKeys(limit, agg)) {
result = new LogicalTopN<>(orderKeys, limit.getLimit(),
limit.getOffset(), project);
limit.getOffset(), project, false);
} else {
// add the first group by key to topn, and prune this key by upper project
// topn order keys are prefix of group by keys
Expand All @@ -100,7 +101,7 @@ public List<Rule> buildRules() {
project = project.withProjects(bottomProjections);
}
LogicalTopN topn = new LogicalTopN<>(orderKeys, limit.getLimit(),
limit.getOffset(), project);
limit.getOffset(), project, false);
if (shouldPruneFirstGroupByKey) {
List<NamedExpression> limitOutput = limit.getOutput().stream()
.map(e -> (NamedExpression) e).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public List<Rule> buildRules() {
return new LogicalTopN<>(sort.getOrderKeys(),
limit.getLimit(),
limit.getOffset(),
sort.child(0));
sort.child(0), false);
}).toRule(RuleType.LIMIT_SORT_TO_TOP_N),
// limit -> proj -> sort ==> proj -> topN
logicalLimit(logicalProject(logicalSort()))
Expand All @@ -53,7 +53,7 @@ public List<Rule> buildRules() {
LogicalTopN<Plan> topN = new LogicalTopN<>(sort.getOrderKeys(),
limit.getLimit(),
limit.getOffset(),
sort.child(0));
sort.child(0), false);
return project.withChildren(Lists.newArrayList(topN));
}).toRule(RuleType.LIMIT_SORT_TO_TOP_N)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Rule build() {
long newOffset = offset + childOffset;
// choose min limit
long newLimit = Math.min(limit, childLimit);
return topN.withLimitChild(newLimit, newOffset, childTopN.child());
return topN.withLimitChild(newLimit, newOffset, childTopN.child(), false);
}).toRule(RuleType.MERGE_TOP_N);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private Plan pushTopNThroughJoin(LogicalTopN<? extends Plan> topN, LogicalJoin<P
if (!pushedOrderKeys.isEmpty()) {
LogicalTopN<Plan> left = topN.withLimitOrderKeyAndChild(
topN.getLimit() + topN.getOffset(), 0, pushedOrderKeys,
PlanUtils.distinct(join.left()));
PlanUtils.distinct(join.left()), true);
return join.withChildren(left, join.right());
}
return null;
Expand All @@ -109,7 +109,7 @@ private Plan pushTopNThroughJoin(LogicalTopN<? extends Plan> topN, LogicalJoin<P
if (!pushedOrderKeys.isEmpty()) {
LogicalTopN<Plan> right = topN.withLimitOrderKeyAndChild(
topN.getLimit() + topN.getOffset(), 0, pushedOrderKeys,
PlanUtils.distinct(join.right()));
PlanUtils.distinct(join.right()), true);
return join.withChildren(join.left(), right);
}
return null;
Expand All @@ -122,14 +122,14 @@ private Plan pushTopNThroughJoin(LogicalTopN<? extends Plan> topN, LogicalJoin<P
if (!leftPushedOrderKeys.isEmpty()) {
leftChild = topN.withLimitOrderKeyAndChild(
topN.getLimit() + topN.getOffset(), 0, leftPushedOrderKeys,
PlanUtils.distinct(join.left()));
PlanUtils.distinct(join.left()), true);
}
List<OrderKey> rightPushedOrderKeys = getPushedOrderKeys(groupBySlots,
join.right().getOutputSet(), topN.getOrderKeys());
if (!rightPushedOrderKeys.isEmpty()) {
rightChild = topN.withLimitOrderKeyAndChild(
topN.getLimit() + topN.getOffset(), 0, rightPushedOrderKeys,
PlanUtils.distinct(join.right()));
PlanUtils.distinct(join.right()), true);
}
if (leftChild == join.left() && rightChild == join.right()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public List<Rule> buildRules() {
ExpressionUtils.replace(orderKey.getExpr(), replaceMap)))
.collect(ImmutableList.toImmutableList());
newChildren.add(new LogicalTopN<>(orderKeys, topN.getLimit() + topN.getOffset(), 0,
PlanUtils.distinct(child)));
PlanUtils.distinct(child), true));
}
if (union.children().equals(newChildren)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,27 @@ private Plan pushLimitThroughJoin(LogicalTopN<? extends Plan> topN, LogicalJoin<
case LEFT_OUTER_JOIN:
if (join.left().getOutputSet().containsAll(orderbySlots)) {
return join.withChildren(
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.left()),
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.left(), true),
join.right());
}
return null;
case RIGHT_OUTER_JOIN:
if (join.right().getOutputSet().containsAll(orderbySlots)) {
return join.withChildren(
join.left(),
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.right()));
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.right(), true));
}
return null;
case CROSS_JOIN:

if (join.left().getOutputSet().containsAll(orderbySlots)) {
return join.withChildren(
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.left()),
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.left(), true),
join.right());
} else if (join.right().getOutputSet().containsAll(orderbySlots)) {
return join.withChildren(
join.left(),
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.right()));
topN.withLimitChild(topN.getLimit() + topN.getOffset(), 0, join.right(), true));
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public List<Rule> buildRules() {
ExpressionUtils.replace(orderKey.getExpr(), replaceMap)))
.collect(ImmutableList.toImmutableList());
newChildren.add(
new LogicalTopN<>(orderKeys, topN.getLimit() + topN.getOffset(), 0, child));
new LogicalTopN<>(orderKeys, topN.getLimit() + topN.getOffset(), 0,
child, true));
}
if (union.children().equals(newChildren)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public Plan visitLogicalTopN(LogicalTopN<? extends Plan> topN, DeepCopierContext
.map(o -> new OrderKey(ExpressionDeepCopier.INSTANCE.deepCopy(o.getExpr(), context),
o.isAsc(), o.isNullFirst()))
.collect(ImmutableList.toImmutableList());
return new LogicalTopN<>(orderKeys, topN.getLimit(), topN.getOffset(), child);
return new LogicalTopN<>(orderKeys, topN.getLimit(), topN.getOffset(), child, false);
}

@Override
Expand All @@ -241,7 +241,7 @@ public Plan visitLogicalPartitionTopN(LogicalPartitionTopN<? extends Plan> parti
.map(o -> (OrderExpression) ExpressionDeepCopier.INSTANCE.deepCopy(o, context))
.collect(ImmutableList.toImmutableList());
return new LogicalPartitionTopN<>(partitionTopN.getFunction(), partitionKeys, orderKeys,
partitionTopN.hasGlobalLimit(), partitionTopN.getPartitionLimit(), child);
partitionTopN.hasGlobalLimit(), partitionTopN.getPartitionLimit(), child, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,20 @@ public class LogicalPartitionTopN<CHILD_TYPE extends Plan> extends LogicalUnary<
private final List<OrderExpression> orderKeys;
private final boolean hasGlobalLimit;
private final long partitionLimit;
private final boolean pushed;

public LogicalPartitionTopN(WindowExpression windowExpr, boolean hasGlobalLimit, long partitionLimit,
CHILD_TYPE child) {
CHILD_TYPE child, boolean pushed) {
this(windowExpr.getFunction(), windowExpr.getPartitionKeys(), windowExpr.getOrderKeys(),
hasGlobalLimit, partitionLimit, Optional.empty(),
Optional.empty(), child);
Optional.empty(), child, pushed);
}

public LogicalPartitionTopN(WindowFuncType windowFuncType, List<Expression> partitionKeys,
List<OrderExpression> orderKeys, boolean hasGlobalLimit, long partitionLimit,
CHILD_TYPE child) {
CHILD_TYPE child, boolean pushed) {
this(windowFuncType, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
Optional.empty(), Optional.empty(), child);
Optional.empty(), Optional.empty(), child, pushed);
}

/**
Expand All @@ -72,7 +73,8 @@ public LogicalPartitionTopN(WindowFuncType windowFuncType, List<Expression> part
public LogicalPartitionTopN(WindowFuncType windowFuncType, List<Expression> partitionKeys,
List<OrderExpression> orderKeys, boolean hasGlobalLimit,
long partitionLimit, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child,
boolean pushed) {
super(PlanType.LOGICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
this.function = windowFuncType;
this.partitionKeys = ImmutableList.copyOf(Objects.requireNonNull(partitionKeys,
Expand All @@ -81,14 +83,15 @@ public LogicalPartitionTopN(WindowFuncType windowFuncType, List<Expression> part
"orderKeys can not be null"));
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.pushed = pushed;
}

/**
* Constructor for LogicalPartitionTopN.
*/
public LogicalPartitionTopN(Expression expr, List<Expression> partitionKeys, List<OrderExpression> orderKeys,
boolean hasGlobalLimit, long partitionLimit, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child, boolean pushed) {
super(PlanType.LOGICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
if (expr instanceof RowNumber) {
this.function = WindowFuncType.ROW_NUMBER;
Expand All @@ -104,6 +107,7 @@ public LogicalPartitionTopN(Expression expr, List<Expression> partitionKeys, Lis
Objects.requireNonNull(orderKeys, "orderKeys can not be null"));
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.pushed = pushed;
}

@Override
Expand Down Expand Up @@ -134,6 +138,10 @@ public long getPartitionLimit() {
return partitionLimit;
}

public boolean isPushed() {
return pushed;
}

@Override
public String toString() {
return Utils.toSqlString("LogicalPartitionTopN",
Expand Down Expand Up @@ -180,27 +188,27 @@ public List<? extends Expression> getExpressions() {
public LogicalPartitionTopN<Plan> withPartitionKeysAndOrderKeys(
List<Expression> partitionKeys, List<OrderExpression> orderKeys) {
return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
Optional.empty(), Optional.of(getLogicalProperties()), child());
Optional.empty(), Optional.of(getLogicalProperties()), child(), this.pushed);
}

@Override
public LogicalPartitionTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
children.get(0));
children.get(0), this.pushed);
}

@Override
public LogicalPartitionTopN<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, Optional.of(getLogicalProperties()), child());
groupExpression, Optional.of(getLogicalProperties()), child(), this.pushed);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, logicalProperties, children.get(0));
groupExpression, logicalProperties, children.get(0), this.pushed);
}
}
Loading

0 comments on commit c6f7ccf

Please sign in to comment.