Skip to content

Commit

Permalink
[Improvement](shuffle) Use a knob to decide whether a serial exchange… (
Browse files Browse the repository at this point in the history
apache#44676)

… should be used

This improvement was completed in apache#43199 and reverted by apache#44075 due to performance fallback. After fixing it, this improvement is re-submited.

A new knob to control a exchange node should be serial or not.
For example, a partitioned hash join should be executed like below:
```
┌────────────────────────────┐                  ┌────────────────────────────┐
│                            │                  │                            │
│Exchange(HASH PARTITIONED N)│                  │Exchange(HASH PARTITIONED N)│
│                            │                  │                            │
└────────────────────────────┴─────────┬────────┴────────────────────────────┘
                                       │                                      
                                       │                                      
                                       │                                      
                                       │                                      
                                       │                                      
                                       │                                      
                                ┌──────▼──────┐                               
                                │             │                               
                                │ HASH  JOIN  │                               
                                │             │                               
                                └─────────────┘                               
```          

After turning on this knob, the real plan should be:
```
  ┌──────────────────────────────┐                        ┌──────────────────────────────┐    
  │                              │                        │                              │    
  │ Exchange (HASH PARTITIONED 1)│                        │ Exchange (HASH PARTITIONED 1)│    
  │                              │                        │                              │    
  └────────────┬─────────────────┘                        └────────────┬─────────────────┘    
               │                                                       │                      
               │                                                       │                      
               │                                                       │                      
               │                                                       │                      
               │                                                       │                      
┌──────────────▼─────────────────────┐                  ┌──────────────▼─────────────────────┐
│                                    │                  │                                    │
│ Local  Exchange(HASH PARTITIONED N)│                  │ Local  Exchange(HASH PARTITIONED N)│
│              1 -> N                │                  │              1 -> N                │
└────────────────────────────────────┴─────────┬────────┴────────────────────────────────────┴
                                               │                                              
                                               │                                              
                                               │                                              
                                               │                                              
                                               │                                              
                                               │                                              
                                        ┌──────▼──────┐                                       
                                        │             │                                       
                                        │ HASH  JOIN  │                                       
                                        │             │                                       
                                        └─────────────┘                                       
```

For large cluster, X (mappers) * Y (reducers) rpc channels can be reduced to X (mappers) * Z (BEs).
  • Loading branch information
Gabriel39 authored Nov 28, 2024
1 parent 1219273 commit d77bfa0
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExchangeNode;
Expand Down Expand Up @@ -169,6 +170,10 @@ public void setMergeInfo(SortInfo info) {

@Override
protected void toThrift(TPlanNode msg) {
// If this fragment has another scan node, this exchange node is serial or not should be decided by the scan
// node.
msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode())
&& fragment.useSerialSource(ConnectContext.get()));
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
msg.exchange_node = new TExchangeNode();
for (TupleId tid : tupleIds) {
Expand Down Expand Up @@ -228,11 +233,17 @@ public void setRightChildOfBroadcastHashJoin(boolean value) {
*/
@Override
public boolean isSerialOperator() {
return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null;
return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange()
|| partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null;
}

@Override
public boolean hasSerialChildren() {
return isSerialOperator();
}

@Override
public boolean hasSerialScanChildren() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,15 +515,13 @@ public boolean useSerialSource(ConnectContext context) {
&& !hasNullAwareLeftAntiJoin()
// If planRoot is not a serial operator and has serial children, we can use serial source and improve
// parallelism of non-serial operators.
&& sink instanceof DataStreamSink && !planRoot.isSerialOperator()
&& planRoot.hasSerialChildren();
// For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are
// serial.
&& (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator()
&& planRoot.hasSerialChildren()));
}

public int getNumBackends() {
return numBackends;
}

public void setNumBackends(int numBackends) {
this.numBackends = numBackends;
public boolean hasSerialScanNode() {
return planRoot.hasSerialScanChildren();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1388,4 +1388,11 @@ public boolean hasSerialChildren() {
}
return children.stream().allMatch(PlanNode::hasSerialChildren);
}

public boolean hasSerialScanChildren() {
if (children.isEmpty()) {
return false;
}
return children.stream().anyMatch(PlanNode::hasSerialScanChildren);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,4 +861,9 @@ public boolean isSerialOperator() {
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends()
|| (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
}

@Override
public boolean hasSerialScanChildren() {
return isSerialOperator();
}
}
10 changes: 2 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1886,17 +1886,11 @@ protected void computeFragmentHosts() throws Exception {
return scanNode.getId().asInt() == planNodeId;
}).findFirst();

/**
* Ignore storage data distribution iff:
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
* 2. Use Nereids planner.
*/
boolean sharedScan = true;
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean ignoreStorageDataDistribution = node.isPresent()
&& fragment.useSerialSource(context);
if (node.isPresent() && ignoreStorageDataDistribution) {
boolean ignoreStorageDataDistribution = fragment.useSerialSource(context);
if (ignoreStorageDataDistribution) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and no conjuncts, only need 1 instance to save cpu and
// mem resource
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution";

public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";

public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan";

// Limit the max count of scanners to prevent generate too many scanners.
Expand Down Expand Up @@ -1112,6 +1114,10 @@ public enum IgnoreSplitType {
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = true;

@VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean useSerialExchange = false;

@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
Expand Down Expand Up @@ -2353,6 +2359,7 @@ public void initFuzzyModeVariables() {
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.useSerialExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
Expand Down Expand Up @@ -4563,6 +4570,10 @@ public boolean isEnableCooldownReplicaAffinity() {
return enableCooldownReplicaAffinity;
}

public boolean isUseSerialExchange() {
return useSerialExchange && getEnableLocalExchange();
}

public void setDisableInvertedIndexV1ForVaraint(boolean disableInvertedIndexV1ForVaraint) {
this.disableInvertedIndexV1ForVaraint = disableInvertedIndexV1ForVaraint;
}
Expand Down

0 comments on commit d77bfa0

Please sign in to comment.