From b7d3173692b3b1e9887d4b92bb861901d0580b57 Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Thu, 14 Mar 2024 22:05:08 +0800 Subject: [PATCH] [opt](Nereids) opt distinct agg without group by key plan #32236 --- .../ChildrenPropertiesRegulator.java | 22 ++++++++-- .../rules/analysis/NormalizeAggregate.java | 44 +++++++++---------- .../nereids_clickbench_shape_p0/query6.out | 11 +++-- 3 files changed, 47 insertions(+), 30 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index a70d72c565c707..7c5374ebd212ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -107,12 +107,23 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate if (!agg.getAggregateParam().canBeBanned) { return true; } - // forbid one phase agg on distribute and three or four stage distinct agg inter by distribute - if ((agg.getAggMode() == AggMode.INPUT_TO_RESULT || agg.getAggMode() == AggMode.BUFFER_TO_BUFFER) - && children.get(0).getPlan() instanceof PhysicalDistribute) { + // forbid one phase agg on distribute + if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && children.get(0).getPlan() instanceof PhysicalDistribute) { // this means one stage gather agg, usually bad pattern return false; } + // forbid three or four stage distinct agg inter by distribute + if (agg.getAggMode() == AggMode.BUFFER_TO_BUFFER && children.get(0).getPlan() instanceof PhysicalDistribute) { + // if distinct without group by key, we prefer three or four stage distinct agg + // because the second phase of multi-distinct only have one instance, and it is slow generally. + if (agg.getGroupByExpressions().size() == 1 + && agg.getOutputExpressions().size() == 1) { + return true; + } + return false; + + } + // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle // TODO: this is forbid good plan after cte reuse by mistake if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER @@ -160,6 +171,11 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate return false; } } + // if distinct without group by key, we prefer three or four stage distinct agg + // because the second phase of multi-distinct only have one instance, and it is slow generally. + if (agg.getOutputExpressions().size() == 1 && agg.getGroupByExpressions().isEmpty()) { + return false; + } } } // process must shuffle diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java index a8c3445261da30..1105fe2da72be1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java @@ -30,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.SubqueryExpr; import org.apache.doris.nereids.trees.expressions.WindowExpression; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; -import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalHaving; @@ -113,27 +112,27 @@ public List buildRules() { private LogicalPlan normalizeAgg(LogicalAggregate aggregate, Optional> having) { // The LogicalAggregate node may contain window agg functions and usual agg functions - // we call window agg functions as window-agg and usual agg functions as trival-agg for short + // we call window agg functions as window-agg and usual agg functions as trivial-agg for short // This rule simplify LogicalAggregate node by: // 1. Push down some exprs from old LogicalAggregate node to a new child LogicalProject Node, - // 2. create a new LogicalAggregate with normalized group by exprs and trival-aggs + // 2. create a new LogicalAggregate with normalized group by exprs and trivial-aggs // 3. Pull up normalized old LogicalAggregate's output exprs to a new parent LogicalProject Node // Push down exprs: // 1. all group by exprs - // 2. child contains subquery expr in trival-agg - // 3. child contains window expr in trival-agg - // 4. all input slots of trival-agg - // 5. expr(including subquery) in distinct trival-agg + // 2. child contains subquery expr in trivial-agg + // 3. child contains window expr in trivial-agg + // 4. all input slots of trivial-agg + // 5. expr(including subquery) in distinct trivial-agg // Normalize LogicalAggregate's output. // 1. normalize group by exprs by outputs of bottom LogicalProject - // 2. normalize trival-aggs by outputs of bottom LogicalProject + // 2. normalize trivial-aggs by outputs of bottom LogicalProject // 3. build normalized agg outputs // Pull up exprs: // normalize all output exprs in old LogicalAggregate to build a parent project node, typically includes: // 1. simple slots // 2. aliases // a. alias with no aggs child - // b. alias with trival-agg child + // b. alias with trivial-agg child // c. alias with window-agg // Push down exprs: @@ -141,13 +140,13 @@ private LogicalPlan normalizeAgg(LogicalAggregate aggregate, Optional groupingByExprs = ImmutableSet.copyOf(aggregate.getGroupByExpressions()); - // collect all trival-agg + // collect all trivial-agg List aggregateOutput = aggregate.getOutputExpressions(); List aggFuncs = CollectNonWindowedAggFuncs.collect(aggregateOutput); // split non-distinct agg child as two part - // TRUE part 1: need push down itself, if it contains subqury or window expression - // FALSE part 2: need push down its input slots, if it DOES NOT contain subqury or window expression + // TRUE part 1: need push down itself, if it contains subquery or window expression + // FALSE part 2: need push down its input slots, if it DOES NOT contain subquery or window expression Map> categorizedNoDistinctAggsChildren = aggFuncs.stream() .filter(aggFunc -> !aggFunc.isDistinct()) .flatMap(agg -> agg.children().stream()) @@ -159,10 +158,9 @@ private LogicalPlan normalizeAgg(LogicalAggregate aggregate, Optional> categorizedDistinctAggsChildren = aggFuncs.stream() - .filter(aggFunc -> aggFunc.isDistinct()).flatMap(agg -> agg.children().stream()) - .collect(Collectors.groupingBy( - child -> !(child instanceof SlotReference || child instanceof Literal), - Collectors.toSet())); + .filter(AggregateFunction::isDistinct) + .flatMap(agg -> agg.children().stream()) + .collect(Collectors.groupingBy(child -> !(child instanceof SlotReference), Collectors.toSet())); Set needPushSelf = Sets.union( categorizedNoDistinctAggsChildren.getOrDefault(true, new HashSet<>()), @@ -176,20 +174,20 @@ private LogicalPlan normalizeAgg(LogicalAggregate aggregate, Optional allPushDownExprs = Sets.union(groupingByExprs, Sets.union(needPushSelf, needPushInputSlots)); NormalizeToSlotContext bottomSlotContext = NormalizeToSlotContext.buildContext(existsAlias, allPushDownExprs); Set pushedGroupByExprs = bottomSlotContext.pushDownToNamedExpression(groupingByExprs); - Set pushedTrivalAggChildren = + Set pushedTrivialAggChildren = bottomSlotContext.pushDownToNamedExpression(needPushSelf); - Set pushedTrivalAggInputSlots = + Set pushedTrivialAggInputSlots = bottomSlotContext.pushDownToNamedExpression(needPushInputSlots); Set bottomProjects = Sets.union(pushedGroupByExprs, - Sets.union(pushedTrivalAggChildren, pushedTrivalAggInputSlots)); + Sets.union(pushedTrivialAggChildren, pushedTrivialAggInputSlots)); // create bottom project Plan bottomPlan; @@ -215,7 +213,7 @@ private LogicalPlan normalizeAgg(LogicalAggregate aggregate, Optional normalizedGroupExprs = bottomSlotContext.normalizeToUseSlotRef(groupingByExprs); - // normalize trival-aggs by bottomProjects + // normalize trivial-aggs by bottomProjects List normalizedAggFuncs = bottomSlotContext.normalizeToUseSlotRef(aggFuncs); if (normalizedAggFuncs.stream().anyMatch(agg -> !agg.children().isEmpty() @@ -237,7 +235,7 @@ private LogicalPlan normalizeAgg(LogicalAggregate aggregate, Optional newAggregate = aggregate.withNormalized(normalizedGroupExprs, normalizedAggOutput, bottomPlan); // create upper projects by normalize all output exprs in old LogicalAggregate diff --git a/regression-test/data/nereids_clickbench_shape_p0/query6.out b/regression-test/data/nereids_clickbench_shape_p0/query6.out index 904169f68cb62a..75ba24ac143f06 100644 --- a/regression-test/data/nereids_clickbench_shape_p0/query6.out +++ b/regression-test/data/nereids_clickbench_shape_p0/query6.out @@ -1,9 +1,12 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !ckbench_shape_6 -- PhysicalResultSink ---hashAgg[GLOBAL] +--hashAgg[DISTINCT_GLOBAL] ----PhysicalDistribute[DistributionSpecGather] -------hashAgg[LOCAL] ---------PhysicalProject -----------PhysicalOlapScan[hits] +------hashAgg[DISTINCT_LOCAL] +--------hashAgg[GLOBAL] +----------PhysicalDistribute[DistributionSpecHash] +------------hashAgg[LOCAL] +--------------PhysicalProject +----------------PhysicalOlapScan[hits]