diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 1ca1db56bfc7c6..cb6628b01c556b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; @@ -169,6 +170,10 @@ public void setMergeInfo(SortInfo info) { @Override protected void toThrift(TPlanNode msg) { + // If this fragment has another scan node, this exchange node is serial or not should be decided by the scan + // node. + msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode()) + && fragment.useSerialSource(ConnectContext.get())); msg.node_type = TPlanNodeType.EXCHANGE_NODE; msg.exchange_node = new TExchangeNode(); for (TupleId tid : tupleIds) { @@ -228,11 +233,17 @@ public void setRightChildOfBroadcastHashJoin(boolean value) { */ @Override public boolean isSerialOperator() { - return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null; + return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange() + || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null; } @Override public boolean hasSerialChildren() { return isSerialOperator(); } + + @Override + public boolean hasSerialScanChildren() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index fef3de9b696e9e..ab5307c07e904a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -515,15 +515,13 @@ public boolean useSerialSource(ConnectContext context) { && !hasNullAwareLeftAntiJoin() // If planRoot is not a serial operator and has serial children, we can use serial source and improve // parallelism of non-serial operators. - && sink instanceof DataStreamSink && !planRoot.isSerialOperator() - && planRoot.hasSerialChildren(); + // For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are + // serial. + && (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator() + && planRoot.hasSerialChildren())); } - public int getNumBackends() { - return numBackends; - } - - public void setNumBackends(int numBackends) { - this.numBackends = numBackends; + public boolean hasSerialScanNode() { + return planRoot.hasSerialScanChildren(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 14bd34e93e1f43..73768435154b76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1388,4 +1388,11 @@ public boolean hasSerialChildren() { } return children.stream().allMatch(PlanNode::hasSerialChildren); } + + public boolean hasSerialScanChildren() { + if (children.isEmpty()) { + return false; + } + return children.stream().anyMatch(PlanNode::hasSerialScanChildren); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index a2583868346704..b4033a0535ef3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -861,4 +861,9 @@ public boolean isSerialOperator() { < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends() || (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle()); } + + @Override + public boolean hasSerialScanChildren() { + return isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3a6f6e4f84069d..262b5836689ab0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1886,17 +1886,11 @@ protected void computeFragmentHosts() throws Exception { return scanNode.getId().asInt() == planNodeId; }).findFirst(); - /** - * Ignore storage data distribution iff: - * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. - * 2. Use Nereids planner. - */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean ignoreStorageDataDistribution = node.isPresent() - && fragment.useSerialSource(context); - if (node.isPresent() && ignoreStorageDataDistribution) { + boolean ignoreStorageDataDistribution = fragment.useSerialSource(context); + if (ignoreStorageDataDistribution) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and no conjuncts, only need 1 instance to save cpu and // mem resource diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 71b746c7907262..115614a41873a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -268,6 +268,8 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; + public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange"; + public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan"; // Limit the max count of scanners to prevent generate too many scanners. @@ -1112,6 +1114,10 @@ public enum IgnoreSplitType { varType = VariableAnnotation.EXPERIMENTAL, needForward = true) private boolean ignoreStorageDataDistribution = true; + @VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private boolean useSerialExchange = false; + @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {"是否在pipelineX引擎上开启local shuffle优化", @@ -2353,6 +2359,7 @@ public void initFuzzyModeVariables() { this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); @@ -4563,6 +4570,10 @@ public boolean isEnableCooldownReplicaAffinity() { return enableCooldownReplicaAffinity; } + public boolean isUseSerialExchange() { + return useSerialExchange && getEnableLocalExchange(); + } + public void setDisableInvertedIndexV1ForVaraint(boolean disableInvertedIndexV1ForVaraint) { this.disableInvertedIndexV1ForVaraint = disableInvertedIndexV1ForVaraint; }