Skip to content

Commit

Permalink
[opt](nereids) enable two phase partition topn opt apache#23870
Browse files Browse the repository at this point in the history
Enable two phase partition topn optimization, instead of original full sort at the second phase.
E.g, partial plan of tpcds q67 is as following and a full sort after exchange will have performance impact, especially if the window column's ndv is very high and the number of window is huge.

------PhysicalTopN
--------filter((rk <= 100))
----------PhysicalWindow
------------PhysicalQuickSort
--------------PhysicalDistribute
----------------PhysicalPartitionTopN
------------------PhysicalProject

Under this scenario, the second phase full sort can be transformed to a global PhysicalPartitionTopN and reduce the cost from full sort. The plan will be optimized to the following:

------PhysicalTopN
--------filter((rk <= 100))
----------PhysicalWindow
------------PhysicalPartitionTopN
--------------PhysicalDistribute
----------------PhysicalPartitionTopN
------------------PhysicalProject
  • Loading branch information
xzj7019 authored Sep 15, 2023
1 parent 23f01dd commit 00bb32c
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,18 @@ public PhysicalProperties visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep
public PhysicalProperties visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
PhysicalProperties childOutputProperty = childrenOutputProperties.get(0);
return new PhysicalProperties(childOutputProperty.getDistributionSpec());
DistributionSpec childDistSpec = childrenOutputProperties.get(0).getDistributionSpec();

if (partitionTopN.getPhase().isTwoPhaseLocal() || partitionTopN.getPhase().isOnePhaseGlobal()) {
return new PhysicalProperties(childDistSpec);
} else {
Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal(),
"partition topn phase is not two phase global");
Preconditions.checkState(childDistSpec instanceof DistributionSpecHash,
"child dist spec is not hash spec");

return new PhysicalProperties(childDistSpec, new OrderSpec(partitionTopN.getOrderKeys()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down Expand Up @@ -153,6 +154,23 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan>
return true;
}

@Override
public Boolean visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, Void context) {
if (partitionTopN.getPhase().isOnePhaseGlobal() && children.get(0).getPlan() instanceof PhysicalDistribute) {
// one phase partition topn, if the child is an enforced distribution, discard this
// and use two phase candidate.
return false;
} else if (partitionTopN.getPhase().isTwoPhaseGlobal()
&& !(children.get(0).getPlan() instanceof PhysicalDistribute)) {
// two phase partition topn, if global's child is not distribution, which means
// the local distribution has met final requirement, discard this candidate.
return false;
} else {
visit(partitionTopN, context);
return true;
}
}

@Override
public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void context) {
// do not process must shuffle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

Expand Down Expand Up @@ -250,6 +252,21 @@ public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort,
return null;
}

@Override
public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
if (partitionTopN.getPhase().isTwoPhaseLocal()) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal()
|| partitionTopN.getPhase().isOnePhaseGlobal(),
"partition topn phase is not two phase global or one phase global");
PhysicalProperties properties = PhysicalProperties.createHash(partitionTopN.getPartitionKeys(),
ShuffleType.REQUIRE);
addRequestPropertyToChildren(properties);
}
return null;
}

@Override
public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;

import com.google.common.collect.ImmutableList;
Expand All @@ -33,21 +36,85 @@
public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalPartitionTopN().then(partitionTopN -> {
List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
? partitionTopN.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(ImmutableList.toImmutableList()) :
return logicalPartitionTopN().thenApplyMulti(ctx -> generatePhysicalPartitionTopn(ctx.root))
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
}

private List<PhysicalPartitionTopN<? extends Plan>> generatePhysicalPartitionTopn(
LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
if (logicalPartitionTopN.getPartitionKeys().isEmpty()) {
// if no partition by keys, use local partition topn combined with further full sort
List<OrderKey> orderKeys = !logicalPartitionTopN.getOrderKeys().isEmpty()
? logicalPartitionTopN.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(ImmutableList.toImmutableList()) :
ImmutableList.of();

return new PhysicalPartitionTopN<>(
partitionTopN.getFunction(),
partitionTopN.getPartitionKeys(),
PhysicalPartitionTopN<Plan> onePhaseLocalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
orderKeys,
partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(),
partitionTopN.getLogicalProperties(),
partitionTopN.child());
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
logicalPartitionTopN.child(0));

return ImmutableList.of(onePhaseLocalPartitionTopN);
} else {
// if partition by keys exist, the order keys will be set as original partition keys combined with
// orderby keys, to meet upper window operator's order requirement.
ImmutableList<OrderKey> fullOrderKeys = getAllOrderKeys(logicalPartitionTopN);
PhysicalPartitionTopN<Plan> onePhaseGlobalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
fullOrderKeys,
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.ONE_PHASE_GLOBAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
logicalPartitionTopN.child(0));

PhysicalPartitionTopN<Plan> twoPhaseLocalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
fullOrderKeys,
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
logicalPartitionTopN.child(0));

PhysicalPartitionTopN<Plan> twoPhaseGlobalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
fullOrderKeys,
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.TWO_PHASE_GLOBAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
twoPhaseLocalPartitionTopN);

return ImmutableList.of(onePhaseGlobalPartitionTopN, twoPhaseGlobalPartitionTopN);
}
}

private ImmutableList<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
ImmutableList.Builder<OrderKey> builder = ImmutableList.builder();

if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) {
builder.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey -> {
return new OrderKey(partitionKey, true, false);
}).collect(ImmutableList.toImmutableList()));
}

if (!logicalPartitionTopN.getOrderKeys().isEmpty()) {
builder.addAll(logicalPartitionTopN.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(ImmutableList.toImmutableList())
);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans;

/**
* Represents different phase of partition topn and map it to the
* enum of partition topn phase definition of stale optimizer.
*/
public enum PartitionTopnPhase {
ONE_PHASE_GLOBAL_PTOPN("OnePhaseGlobalPartitionTopn"),
TWO_PHASE_LOCAL_PTOPN("TwoPhaseLocalPartitionTopn"),
TWO_PHASE_GLOBAL_PTOPN("TwoPhaseGlobalPartitionTopn");
private final String name;
PartitionTopnPhase(String name) {
this.name = name;
}

public boolean isOnePhaseGlobal() {
return this == ONE_PHASE_GLOBAL_PTOPN;
}

public boolean isTwoPhaseLocal() {
return this == TWO_PHASE_LOCAL_PTOPN;
}

public boolean isTwoPhaseGlobal() {
return this == TWO_PHASE_GLOBAL_PTOPN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.doris.nereids.trees.plans;

/**
* Represents different phase of agg and map it to the
* enum of agg phase definition of stale optimizer.
* Represents different phase of sort and map it to the
* enum of sort phase definition of stale optimizer.
*/
public enum SortPhase {
MERGE_SORT("MergeSort"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.WindowFuncType;
Expand All @@ -48,19 +49,20 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
private final List<OrderKey> orderKeys;
private final Boolean hasGlobalLimit;
private final long partitionLimit;
private final PartitionTopnPhase phase;

public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase,
LogicalProperties logicalProperties, CHILD_TYPE child) {
this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
Optional.empty(), logicalProperties, child);
}

/**
* Constructor of PhysicalPartitionTopN.
*/
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
Expand All @@ -69,22 +71,25 @@ public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partition
this.orderKeys = ImmutableList.copyOf(orderKeys);
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.phase = phase;
}

/**
* Constructor of PhysicalPartitionTopN.
*/
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
PhysicalProperties physicalProperties, Statistics statistics,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, physicalProperties,
statistics, child);
this.function = function;
this.partitionKeys = partitionKeys;
this.orderKeys = orderKeys;
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.phase = phase;
}

public WindowFuncType getFunction() {
Expand All @@ -110,6 +115,10 @@ public long getPartitionLimit() {
return partitionLimit;
}

public PartitionTopnPhase getPhase() {
return phase;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -125,7 +134,8 @@ public boolean equals(Object o) {
return Objects.equals(this.function, that.function)
&& Objects.equals(this.partitionKeys, that.partitionKeys)
&& Objects.equals(this.orderKeys, that.orderKeys) && this.hasGlobalLimit == that.hasGlobalLimit
&& this.partitionLimit == that.partitionLimit;
&& this.partitionLimit == that.partitionLimit
&& this.phase == that.phase;
}

@Override
Expand All @@ -152,28 +162,28 @@ public List<? extends Expression> getExpressions() {
public PhysicalPartitionTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit,
partitionLimit, groupExpression, getLogicalProperties(), physicalProperties,
partitionLimit, phase, groupExpression, getLogicalProperties(), physicalProperties,
statistics, children.get(0));
}

@Override
public PhysicalPartitionTopN<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), child());
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, getLogicalProperties(), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, logicalProperties.get(), children.get(0));
}

@Override
public PhysicalPartitionTopN<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}

@Override
Expand All @@ -184,7 +194,8 @@ public String toString() {
"orderKeys", orderKeys,
"hasGlobalLimit", hasGlobalLimit,
"partitionLimit", partitionLimit,
"stats", statistics
"stats", statistics,
"phase", phase
);
}

Expand All @@ -195,7 +206,7 @@ public List<Slot> computeOutput() {

@Override
public PhysicalPartitionTopN<CHILD_TYPE> resetLogicalProperties() {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, null, physicalProperties, statistics, child());
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, null, physicalProperties, statistics, child());
}
}
Loading

0 comments on commit 00bb32c

Please sign in to comment.