From 9f59bfd1e3aee73c332a81193b416e5868f88801 Mon Sep 17 00:00:00 2001 From: "zhongjian.xzj" Date: Mon, 4 Sep 2023 19:10:32 +0800 Subject: [PATCH] [opt](nereids) enable two phase partition topn opt --- ...lPartitionTopNToPhysicalPartitionTopN.java | 59 ++++++++++++++++--- .../LogicalWindowToPhysicalWindow.java | 36 +++++++---- .../plans/physical/PhysicalPartitionTopN.java | 52 ++++++++++++---- .../trees/plans/physical/PhysicalWindow.java | 5 ++ .../org/apache/doris/qe/SessionVariable.java | 9 +++ 5 files changed, 129 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java index b7975e7ca6be44..676741b84318b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java @@ -17,11 +17,16 @@ package org.apache.doris.nereids.rules.implementation; +import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.properties.RequireProperties; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableList; @@ -40,14 +45,52 @@ public Rule build() { .collect(ImmutableList.toImmutableList()) : ImmutableList.of(); - return new PhysicalPartitionTopN<>( - partitionTopN.getFunction(), - partitionTopN.getPartitionKeys(), - orderKeys, - partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), - partitionTopN.getLogicalProperties(), - partitionTopN.child()); + RequireProperties requireAny = RequireProperties.of(PhysicalProperties.ANY); + RequireProperties requireGather = RequireProperties.of(PhysicalProperties.GATHER); + + if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable() == null + || !ConnectContext.get().getSessionVariable().isEnableTwoPhasePartitionTopn()) { + PhysicalPartitionTopN localPartitionTopN = new PhysicalPartitionTopN<>( + partitionTopN.getFunction(), + partitionTopN.getPartitionKeys(), + orderKeys, + partitionTopN.hasGlobalLimit(), + partitionTopN.getPartitionLimit(), + partitionTopN.getLogicalProperties(), + requireAny, + partitionTopN.child()); + + return localPartitionTopN; + } else { + PhysicalPartitionTopN anyLocalPartitionTopN = new PhysicalPartitionTopN<>( + partitionTopN.getFunction(), + partitionTopN.getPartitionKeys(), + orderKeys, + partitionTopN.hasGlobalLimit(), + partitionTopN.getPartitionLimit(), + partitionTopN.getLogicalProperties(), + requireAny, + partitionTopN.child()); + + PhysicalPartitionTopN anyLocalGatherGlobalPartitionTopN = new PhysicalPartitionTopN<>( + partitionTopN.getFunction(), + partitionTopN.getPartitionKeys(), + orderKeys, + partitionTopN.hasGlobalLimit(), + partitionTopN.getPartitionLimit(), + anyLocalPartitionTopN.getLogicalProperties(), + requireGather, + anyLocalPartitionTopN); + + RequireProperties requireHash = RequireProperties.of( + PhysicalProperties.createHash(partitionTopN.getPartitionKeys(), ShuffleType.REQUIRE)); + + PhysicalPartitionTopN anyLocalHashGlobalPartitionTopN = anyLocalGatherGlobalPartitionTopN + .withRequire(requireHash) + .withPartitionExpressions(partitionTopN.getPartitionKeys()); + + return anyLocalHashGlobalPartitionTopN; + } }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java index 020fc6754d3dce..7a632ef4fd6270 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java @@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -165,23 +166,32 @@ private PhysicalWindow createPhysicalWindow(Plan root, WindowFrameGroup wi tempLogicalWindow.getLogicalProperties(), root); + boolean isEnableTwoPhasePartitionTopn = ConnectContext.get() != null + && ConnectContext.get().getSessionVariable() != null + && ConnectContext.get().getSessionVariable().isEnableTwoPhasePartitionTopn(); + if (windowFrameGroup.partitionKeys.isEmpty() && requiredOrderKeys.isEmpty()) { return physicalWindow.withRequirePropertiesAndChild(RequireProperties.of(PhysicalProperties.GATHER), root); - } - - // todo: WFGs in the same OKG only need same RequiredProperties - PhysicalProperties properties; - if (windowFrameGroup.partitionKeys.isEmpty()) { - properties = PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(requiredOrderKeys)); + } else if (isEnableTwoPhasePartitionTopn && !windowFrameGroup.partitionKeys.isEmpty() + && !requiredOrderKeys.isEmpty()) { + PhysicalProperties properties = PhysicalProperties.createHash( + windowFrameGroup.partitionKeys, ShuffleType.REQUIRE); + RequireProperties requireProperties = RequireProperties.of(properties); + return physicalWindow.withRequirePropertiesAndChild(requireProperties, root); } else { - properties = PhysicalProperties.createHash( - windowFrameGroup.partitionKeys, ShuffleType.REQUIRE); - // requiredOrderKeys contain partitionKeys, so there is no need to check if requiredOrderKeys.isEmpty() - properties = properties.withOrderSpec(new OrderSpec(requiredOrderKeys)); + // todo: WFGs in the same OKG only need same RequiredProperties + PhysicalProperties properties; + if (windowFrameGroup.partitionKeys.isEmpty()) { + properties = PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(requiredOrderKeys)); + } else { + properties = PhysicalProperties.createHash( + windowFrameGroup.partitionKeys, ShuffleType.REQUIRE); + // requiredOrderKeys contain partitionKeys, so there is no need to check if requiredOrderKeys.isEmpty() + properties = properties.withOrderSpec(new OrderSpec(requiredOrderKeys)); + } + RequireProperties requireProperties = RequireProperties.of(properties); + return physicalWindow.withRequirePropertiesAndChild(requireProperties, root); } - - RequireProperties requireProperties = RequireProperties.of(properties); - return physicalWindow.withRequirePropertiesAndChild(requireProperties, root); } /* ******************************************************************************************** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java index 4166e7d9037131..54b326729831a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java @@ -21,6 +21,8 @@ import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.properties.RequireProperties; +import org.apache.doris.nereids.properties.RequirePropertiesSupplier; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; @@ -42,18 +44,20 @@ /** * Physical partition-top-N plan. */ -public class PhysicalPartitionTopN extends PhysicalUnary implements PartitionTopN { +public class PhysicalPartitionTopN extends PhysicalUnary + implements PartitionTopN, RequirePropertiesSupplier> { private final WindowFuncType function; private final List partitionKeys; private final List orderKeys; private final Boolean hasGlobalLimit; private final long partitionLimit; + private final RequireProperties requireProperties; public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, - Boolean hasGlobalLimit, long partitionLimit, - LogicalProperties logicalProperties, CHILD_TYPE child) { + Boolean hasGlobalLimit, long partitionLimit, LogicalProperties logicalProperties, + RequireProperties requireProperties, CHILD_TYPE child) { this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - Optional.empty(), logicalProperties, child); + Optional.empty(), logicalProperties, requireProperties, child); } /** @@ -62,13 +66,14 @@ public PhysicalPartitionTopN(WindowFuncType function, List partition public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, Boolean hasGlobalLimit, long partitionLimit, Optional groupExpression, LogicalProperties logicalProperties, - CHILD_TYPE child) { + RequireProperties requireProperties, CHILD_TYPE child) { super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child); this.function = function; this.partitionKeys = ImmutableList.copyOf(partitionKeys); this.orderKeys = ImmutableList.copyOf(orderKeys); this.hasGlobalLimit = hasGlobalLimit; this.partitionLimit = partitionLimit; + this.requireProperties = Objects.requireNonNull(requireProperties, "requireProperties cannot be null"); } /** @@ -77,7 +82,8 @@ public PhysicalPartitionTopN(WindowFuncType function, List partition public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, Boolean hasGlobalLimit, long partitionLimit, Optional groupExpression, LogicalProperties logicalProperties, - PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + PhysicalProperties physicalProperties, Statistics statistics, + RequireProperties requireProperties, CHILD_TYPE child) { super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, physicalProperties, statistics, child); this.function = function; @@ -85,6 +91,7 @@ public PhysicalPartitionTopN(WindowFuncType function, List partition this.orderKeys = orderKeys; this.hasGlobalLimit = hasGlobalLimit; this.partitionLimit = partitionLimit; + this.requireProperties = requireProperties; } public WindowFuncType getFunction() { @@ -153,27 +160,32 @@ public PhysicalPartitionTopN withChildren(List children) { Preconditions.checkArgument(children.size() == 1); return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, groupExpression, getLogicalProperties(), physicalProperties, - statistics, children.get(0)); + statistics, requireProperties, children.get(0)); } @Override public PhysicalPartitionTopN withGroupExpression(Optional groupExpression) { return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - groupExpression, getLogicalProperties(), child()); + groupExpression, getLogicalProperties(), getRequireProperties(), child()); + } + + public PhysicalPartitionTopN withPartitionExpressions(List partitionExpressions) { + return new PhysicalPartitionTopN<>(function, partitionExpressions, orderKeys, hasGlobalLimit, partitionLimit, + groupExpression, getLogicalProperties(), getRequireProperties(), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - groupExpression, logicalProperties.get(), children.get(0)); + groupExpression, logicalProperties.get(), requireProperties, children.get(0)); } @Override public PhysicalPartitionTopN withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + groupExpression, getLogicalProperties(), physicalProperties, statistics, requireProperties, child()); } @Override @@ -196,6 +208,24 @@ public List computeOutput() { @Override public PhysicalPartitionTopN resetLogicalProperties() { return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - groupExpression, null, physicalProperties, statistics, child()); + groupExpression, null, physicalProperties, statistics, requireProperties, child()); + } + + @Override + public RequireProperties getRequireProperties() { + return requireProperties; + } + + @Override + public PhysicalPartitionTopN withRequireAndChildren( + RequireProperties requireProperties, List children) { + Preconditions.checkArgument(children.size() == 1); + return withRequirePropertiesAndChild(requireProperties, children.get(0)); + } + + public PhysicalPartitionTopN withRequirePropertiesAndChild( + RequireProperties requireProperties, C newChild) { + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + groupExpression, null, physicalProperties, statistics, requireProperties, newChild); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java index b1703f47496706..f24a37dc0f18dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java @@ -173,6 +173,11 @@ public PhysicalWindow withRequirePropertiesAndChild(RequireP getLogicalProperties(), physicalProperties, statistics, newChild); } + public PhysicalWindow withoutRequirePropertiesAndChild(C newChild) { + return new PhysicalWindow<>(windowFrameGroup, requireProperties, windowExpressions, Optional.empty(), + getLogicalProperties(), physicalProperties, statistics, newChild); + } + @Override public List computeOutput() { return new ImmutableList.Builder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 88c9d2fd725c0c..62c95490c1da0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -318,6 +318,8 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits"; public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt"; + + public static final String ENABLE_TWO_PHASE_PARTITION_TOPN = "enable_two_phase_partition_topn"; public static final String TOPN_OPT_LIMIT_THRESHOLD = "topn_opt_limit_threshold"; public static final String ENABLE_FILE_CACHE = "enable_file_cache"; @@ -946,6 +948,9 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) { @VariableMgr.VarAttr(name = ENABLE_ELIMINATE_SORT_NODE) public boolean enableEliminateSortNode = true; + @VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_PARTITION_TOPN) + public boolean isEnableTwoPhasePartitionTopn = true; + @VariableMgr.VarAttr(name = INTERNAL_SESSION) public boolean internalSession = false; @@ -1324,6 +1329,10 @@ public void setEnableTwoPhaseReadOpt(boolean enable) { enableTwoPhaseReadOpt = enable; } + public boolean isEnableTwoPhasePartitionTopn() { + return isEnableTwoPhasePartitionTopn; + } + public int getMaxExecutionTimeMS() { return maxExecutionTimeMS; }