From d77bfa09d85d9759965c109cc50ee89e8fbde499 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 28 Nov 2024 16:33:16 +0800 Subject: [PATCH] =?UTF-8?q?[Improvement](shuffle)=20Use=20a=20knob=20to=20?= =?UTF-8?q?decide=20whether=20a=20serial=20exchange=E2=80=A6=20(#44676)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … should be used This improvement was completed in #43199 and reverted by #44075 due to performance fallback. After fixing it, this improvement is re-submited. A new knob to control a exchange node should be serial or not. For example, a partitioned hash join should be executed like below: ``` ┌────────────────────────────┐ ┌────────────────────────────┐ │ │ │ │ │Exchange(HASH PARTITIONED N)│ │Exchange(HASH PARTITIONED N)│ │ │ │ │ └────────────────────────────┴─────────┬────────┴────────────────────────────┘ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ HASH JOIN │ │ │ └─────────────┘ ``` After turning on this knob, the real plan should be: ``` ┌──────────────────────────────┐ ┌──────────────────────────────┐ │ │ │ │ │ Exchange (HASH PARTITIONED 1)│ │ Exchange (HASH PARTITIONED 1)│ │ │ │ │ └────────────┬─────────────────┘ └────────────┬─────────────────┘ │ │ │ │ │ │ │ │ │ │ ┌──────────────▼─────────────────────┐ ┌──────────────▼─────────────────────┐ │ │ │ │ │ Local Exchange(HASH PARTITIONED N)│ │ Local Exchange(HASH PARTITIONED N)│ │ 1 -> N │ │ 1 -> N │ └────────────────────────────────────┴─────────┬────────┴────────────────────────────────────┴ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ HASH JOIN │ │ │ └─────────────┘ ``` For large cluster, X (mappers) * Y (reducers) rpc channels can be reduced to X (mappers) * Z (BEs). --- .../org/apache/doris/planner/ExchangeNode.java | 13 ++++++++++++- .../org/apache/doris/planner/PlanFragment.java | 14 ++++++-------- .../java/org/apache/doris/planner/PlanNode.java | 7 +++++++ .../java/org/apache/doris/planner/ScanNode.java | 5 +++++ .../main/java/org/apache/doris/qe/Coordinator.java | 10 ++-------- .../java/org/apache/doris/qe/SessionVariable.java | 11 +++++++++++ 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 1ca1db56bfc7c6..cb6628b01c556b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; @@ -169,6 +170,10 @@ public void setMergeInfo(SortInfo info) { @Override protected void toThrift(TPlanNode msg) { + // If this fragment has another scan node, this exchange node is serial or not should be decided by the scan + // node. + msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode()) + && fragment.useSerialSource(ConnectContext.get())); msg.node_type = TPlanNodeType.EXCHANGE_NODE; msg.exchange_node = new TExchangeNode(); for (TupleId tid : tupleIds) { @@ -228,11 +233,17 @@ public void setRightChildOfBroadcastHashJoin(boolean value) { */ @Override public boolean isSerialOperator() { - return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null; + return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange() + || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null; } @Override public boolean hasSerialChildren() { return isSerialOperator(); } + + @Override + public boolean hasSerialScanChildren() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index fef3de9b696e9e..ab5307c07e904a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -515,15 +515,13 @@ public boolean useSerialSource(ConnectContext context) { && !hasNullAwareLeftAntiJoin() // If planRoot is not a serial operator and has serial children, we can use serial source and improve // parallelism of non-serial operators. - && sink instanceof DataStreamSink && !planRoot.isSerialOperator() - && planRoot.hasSerialChildren(); + // For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are + // serial. + && (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator() + && planRoot.hasSerialChildren())); } - public int getNumBackends() { - return numBackends; - } - - public void setNumBackends(int numBackends) { - this.numBackends = numBackends; + public boolean hasSerialScanNode() { + return planRoot.hasSerialScanChildren(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 14bd34e93e1f43..73768435154b76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1388,4 +1388,11 @@ public boolean hasSerialChildren() { } return children.stream().allMatch(PlanNode::hasSerialChildren); } + + public boolean hasSerialScanChildren() { + if (children.isEmpty()) { + return false; + } + return children.stream().anyMatch(PlanNode::hasSerialScanChildren); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index a2583868346704..b4033a0535ef3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -861,4 +861,9 @@ public boolean isSerialOperator() { < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends() || (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle()); } + + @Override + public boolean hasSerialScanChildren() { + return isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3a6f6e4f84069d..262b5836689ab0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1886,17 +1886,11 @@ protected void computeFragmentHosts() throws Exception { return scanNode.getId().asInt() == planNodeId; }).findFirst(); - /** - * Ignore storage data distribution iff: - * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. - * 2. Use Nereids planner. - */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean ignoreStorageDataDistribution = node.isPresent() - && fragment.useSerialSource(context); - if (node.isPresent() && ignoreStorageDataDistribution) { + boolean ignoreStorageDataDistribution = fragment.useSerialSource(context); + if (ignoreStorageDataDistribution) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and no conjuncts, only need 1 instance to save cpu and // mem resource diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 71b746c7907262..115614a41873a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -268,6 +268,8 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; + public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange"; + public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan"; // Limit the max count of scanners to prevent generate too many scanners. @@ -1112,6 +1114,10 @@ public enum IgnoreSplitType { varType = VariableAnnotation.EXPERIMENTAL, needForward = true) private boolean ignoreStorageDataDistribution = true; + @VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private boolean useSerialExchange = false; + @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {"是否在pipelineX引擎上开启local shuffle优化", @@ -2353,6 +2359,7 @@ public void initFuzzyModeVariables() { this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); @@ -4563,6 +4570,10 @@ public boolean isEnableCooldownReplicaAffinity() { return enableCooldownReplicaAffinity; } + public boolean isUseSerialExchange() { + return useSerialExchange && getEnableLocalExchange(); + } + public void setDisableInvertedIndexV1ForVaraint(boolean disableInvertedIndexV1ForVaraint) { this.disableInvertedIndexV1ForVaraint = disableInvertedIndexV1ForVaraint; }