Skip to content

Commit

Permalink
[opt](nereids) enable two phase partition topn opt
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjian.xzj authored and zhongjian.xzj committed Sep 4, 2023
1 parent 21aea76 commit 9f59bfd
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package org.apache.doris.nereids.rules.implementation;

import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.properties.RequireProperties;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

Expand All @@ -40,14 +45,52 @@ public Rule build() {
.collect(ImmutableList.toImmutableList()) :
ImmutableList.of();

return new PhysicalPartitionTopN<>(
partitionTopN.getFunction(),
partitionTopN.getPartitionKeys(),
orderKeys,
partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(),
partitionTopN.getLogicalProperties(),
partitionTopN.child());
RequireProperties requireAny = RequireProperties.of(PhysicalProperties.ANY);
RequireProperties requireGather = RequireProperties.of(PhysicalProperties.GATHER);

if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable() == null
|| !ConnectContext.get().getSessionVariable().isEnableTwoPhasePartitionTopn()) {
PhysicalPartitionTopN<Plan> localPartitionTopN = new PhysicalPartitionTopN<>(
partitionTopN.getFunction(),
partitionTopN.getPartitionKeys(),
orderKeys,
partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(),
partitionTopN.getLogicalProperties(),
requireAny,
partitionTopN.child());

return localPartitionTopN;
} else {
PhysicalPartitionTopN<Plan> anyLocalPartitionTopN = new PhysicalPartitionTopN<>(
partitionTopN.getFunction(),
partitionTopN.getPartitionKeys(),
orderKeys,
partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(),
partitionTopN.getLogicalProperties(),
requireAny,
partitionTopN.child());

PhysicalPartitionTopN<Plan> anyLocalGatherGlobalPartitionTopN = new PhysicalPartitionTopN<>(
partitionTopN.getFunction(),
partitionTopN.getPartitionKeys(),
orderKeys,
partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(),
anyLocalPartitionTopN.getLogicalProperties(),
requireGather,
anyLocalPartitionTopN);

RequireProperties requireHash = RequireProperties.of(
PhysicalProperties.createHash(partitionTopN.getPartitionKeys(), ShuffleType.REQUIRE));

PhysicalPartitionTopN<Plan> anyLocalHashGlobalPartitionTopN = anyLocalGatherGlobalPartitionTopN
.withRequire(requireHash)
.withPartitionExpressions(partitionTopN.getPartitionKeys());

return anyLocalHashGlobalPartitionTopN;
}
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -165,23 +166,32 @@ private PhysicalWindow<Plan> createPhysicalWindow(Plan root, WindowFrameGroup wi
tempLogicalWindow.getLogicalProperties(),
root);

boolean isEnableTwoPhasePartitionTopn = ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable() != null
&& ConnectContext.get().getSessionVariable().isEnableTwoPhasePartitionTopn();

if (windowFrameGroup.partitionKeys.isEmpty() && requiredOrderKeys.isEmpty()) {
return physicalWindow.withRequirePropertiesAndChild(RequireProperties.of(PhysicalProperties.GATHER), root);
}

// todo: WFGs in the same OKG only need same RequiredProperties
PhysicalProperties properties;
if (windowFrameGroup.partitionKeys.isEmpty()) {
properties = PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(requiredOrderKeys));
} else if (isEnableTwoPhasePartitionTopn && !windowFrameGroup.partitionKeys.isEmpty()
&& !requiredOrderKeys.isEmpty()) {
PhysicalProperties properties = PhysicalProperties.createHash(
windowFrameGroup.partitionKeys, ShuffleType.REQUIRE);
RequireProperties requireProperties = RequireProperties.of(properties);
return physicalWindow.withRequirePropertiesAndChild(requireProperties, root);
} else {
properties = PhysicalProperties.createHash(
windowFrameGroup.partitionKeys, ShuffleType.REQUIRE);
// requiredOrderKeys contain partitionKeys, so there is no need to check if requiredOrderKeys.isEmpty()
properties = properties.withOrderSpec(new OrderSpec(requiredOrderKeys));
// todo: WFGs in the same OKG only need same RequiredProperties
PhysicalProperties properties;
if (windowFrameGroup.partitionKeys.isEmpty()) {
properties = PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(requiredOrderKeys));
} else {
properties = PhysicalProperties.createHash(
windowFrameGroup.partitionKeys, ShuffleType.REQUIRE);
// requiredOrderKeys contain partitionKeys, so there is no need to check if requiredOrderKeys.isEmpty()
properties = properties.withOrderSpec(new OrderSpec(requiredOrderKeys));
}
RequireProperties requireProperties = RequireProperties.of(properties);
return physicalWindow.withRequirePropertiesAndChild(requireProperties, root);
}

RequireProperties requireProperties = RequireProperties.of(properties);
return physicalWindow.withRequirePropertiesAndChild(requireProperties, root);
}

/* ********************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.properties.RequireProperties;
import org.apache.doris.nereids.properties.RequirePropertiesSupplier;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
Expand All @@ -42,18 +44,20 @@
/**
* Physical partition-top-N plan.
*/
public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> implements PartitionTopN {
public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE>
implements PartitionTopN, RequirePropertiesSupplier<PhysicalPartitionTopN<CHILD_TYPE>> {
private final WindowFuncType function;
private final List<Expression> partitionKeys;
private final List<OrderKey> orderKeys;
private final Boolean hasGlobalLimit;
private final long partitionLimit;
private final RequireProperties requireProperties;

public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
LogicalProperties logicalProperties, CHILD_TYPE child) {
Boolean hasGlobalLimit, long partitionLimit, LogicalProperties logicalProperties,
RequireProperties requireProperties, CHILD_TYPE child) {
this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
Optional.empty(), logicalProperties, child);
Optional.empty(), logicalProperties, requireProperties, child);
}

/**
Expand All @@ -62,13 +66,14 @@ public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partition
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
RequireProperties requireProperties, CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
this.function = function;
this.partitionKeys = ImmutableList.copyOf(partitionKeys);
this.orderKeys = ImmutableList.copyOf(orderKeys);
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.requireProperties = Objects.requireNonNull(requireProperties, "requireProperties cannot be null");
}

/**
Expand All @@ -77,14 +82,16 @@ public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partition
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
PhysicalProperties physicalProperties, Statistics statistics,
RequireProperties requireProperties, CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, physicalProperties,
statistics, child);
this.function = function;
this.partitionKeys = partitionKeys;
this.orderKeys = orderKeys;
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.requireProperties = requireProperties;
}

public WindowFuncType getFunction() {
Expand Down Expand Up @@ -153,27 +160,32 @@ public PhysicalPartitionTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit,
partitionLimit, groupExpression, getLogicalProperties(), physicalProperties,
statistics, children.get(0));
statistics, requireProperties, children.get(0));
}

@Override
public PhysicalPartitionTopN<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), child());
groupExpression, getLogicalProperties(), getRequireProperties(), child());
}

public PhysicalPartitionTopN<CHILD_TYPE> withPartitionExpressions(List<Expression> partitionExpressions) {
return new PhysicalPartitionTopN<>(function, partitionExpressions, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), getRequireProperties(), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, logicalProperties.get(), children.get(0));
groupExpression, logicalProperties.get(), requireProperties, children.get(0));
}

@Override
public PhysicalPartitionTopN<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
groupExpression, getLogicalProperties(), physicalProperties, statistics, requireProperties, child());
}

@Override
Expand All @@ -196,6 +208,24 @@ public List<Slot> computeOutput() {
@Override
public PhysicalPartitionTopN<CHILD_TYPE> resetLogicalProperties() {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, null, physicalProperties, statistics, child());
groupExpression, null, physicalProperties, statistics, requireProperties, child());
}

@Override
public RequireProperties getRequireProperties() {
return requireProperties;
}

@Override
public PhysicalPartitionTopN<Plan> withRequireAndChildren(
RequireProperties requireProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return withRequirePropertiesAndChild(requireProperties, children.get(0));
}

public <C extends Plan> PhysicalPartitionTopN<C> withRequirePropertiesAndChild(
RequireProperties requireProperties, C newChild) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, null, physicalProperties, statistics, requireProperties, newChild);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ public <C extends Plan> PhysicalWindow<C> withRequirePropertiesAndChild(RequireP
getLogicalProperties(), physicalProperties, statistics, newChild);
}

public <C extends Plan> PhysicalWindow<C> withoutRequirePropertiesAndChild(C newChild) {
return new PhysicalWindow<>(windowFrameGroup, requireProperties, windowExpressions, Optional.empty(),
getLogicalProperties(), physicalProperties, statistics, newChild);
}

@Override
public List<Slot> computeOutput() {
return new ImmutableList.Builder<Slot>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits";

public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt";

public static final String ENABLE_TWO_PHASE_PARTITION_TOPN = "enable_two_phase_partition_topn";
public static final String TOPN_OPT_LIMIT_THRESHOLD = "topn_opt_limit_threshold";

public static final String ENABLE_FILE_CACHE = "enable_file_cache";
Expand Down Expand Up @@ -946,6 +948,9 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) {
@VariableMgr.VarAttr(name = ENABLE_ELIMINATE_SORT_NODE)
public boolean enableEliminateSortNode = true;

@VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_PARTITION_TOPN)
public boolean isEnableTwoPhasePartitionTopn = true;

@VariableMgr.VarAttr(name = INTERNAL_SESSION)
public boolean internalSession = false;

Expand Down Expand Up @@ -1324,6 +1329,10 @@ public void setEnableTwoPhaseReadOpt(boolean enable) {
enableTwoPhaseReadOpt = enable;
}

public boolean isEnableTwoPhasePartitionTopn() {
return isEnableTwoPhasePartitionTopn;
}

public int getMaxExecutionTimeMS() {
return maxExecutionTimeMS;
}
Expand Down

0 comments on commit 9f59bfd

Please sign in to comment.