Skip to content

Commit

Permalink
[fix](local shuffle) Fix bucket local shuffle (#44459)
Browse files Browse the repository at this point in the history
Data in different buckets should be distributed into all tasks after
bucket-hash local exchange.

Before:
```
                                                                      ┌─────────────────────────────────────────┐
                                                                      │ ┌─────────┐   ┌───────────────────────┐ │
                                                                      │ │Bucket 0 │   │                       │ │
                                                                      │ └─────────┘   │                       │ │
                                                                      │               │ LOCAL EXCHANGE SOURCE │ │
                                                               ┌──────► ┌─────────┐   │    (BUCKET HASH)      │ │
                                                               │      │ │Bucket 1 │   │                       │ │
                                                               │      │ └─────────┘   └───────────────────────┘ │
                                                               │      └─────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐        │                                                 
│ ┌─────────┐  ┌───────┐     ┌─────────────────────┐  │        │                                                 
│ │Bucket 0 │  │       │     │                     │  │        │                                                 
│ └─────────┘  │       │     │                     │  │        │                                                 
│              │ SCAN  │     │ LOCAL EXCHANGE SINK │  ├────────┤                                                 
│ ┌─────────┐  │       │     │    (BUCKET HASH)    │  │        │                                                 
│ │Bucket 1 │  │       │     │                     │  │        │                                                 
│ └─────────┘  └───────┘     └─────────────────────┘  │        │                                                 
└─────────────────────────────────────────────────────┘        │      ┌─────────────────────────────────────────┐
                                                               │      │               ┌───────────────────────┐ │
                                                               │      │               │                       │ │
                                                               │      │               │                       │ │
                                                               │      │               │ LOCAL EXCHANGE SOURCE │ │
                                                               └──────►               │    (BUCKET HASH)      │ │
                                                                      │               │                       │ │
                                                                      │               └───────────────────────┘ │
                                                                      └─────────────────────────────────────────┘
```

After
```
                                                                                                                                   
                                                                                                                                   
                                                                                                                                   
                                                                             ┌─────────────────────────────────────────┐           
                                                                             │ ┌─────────┐   ┌───────────────────────┐ │           
                                                                             │ │Bucket 0 │   │                       │ │           
                                                                             │ └─────────┘   │                       │ │           
                                                                             │               │ LOCAL EXCHANGE SOURCE │ │           
                                                                      ┌──────►               │    (BUCKET HASH)      │ │           
                                                                      │      │               │                       │ │           
                                                                      │      │               └───────────────────────┘ │           
                                                                      │      └─────────────────────────────────────────┘           
       ┌─────────────────────────────────────────────────────┐        │                                                            
       │ ┌─────────┐  ┌───────┐     ┌─────────────────────┐  │        │                                                            
       │ │Bucket 0 │  │       │     │                     │  │        │                                                            
       │ └─────────┘  │       │     │                     │  │        │                                                            
       │              │ SCAN  │     │ LOCAL EXCHANGE SINK │  ├────────┤                                                            
       │ ┌─────────┐  │       │     │    (BUCKET HASH)    │  │        │                                                            
       │ │Bucket 1 │  │       │     │                     │  │        │                                                            
       │ └─────────┘  └───────┘     └─────────────────────┘  │        │                                                            
       └─────────────────────────────────────────────────────┘        │      ┌─────────────────────────────────────────┐           
                                                                      │      │               ┌───────────────────────┐ │           
                                                                      │      │               │                       │ │           
                                                                      │      │  ┌─────────┐  │                       │ │           
                                                                      │      │  │Bucket 1 │  │ LOCAL EXCHANGE SOURCE │ │           
                                                                      └──────►  └─────────┘  │    (BUCKET HASH)      │ │           
                                                                             │               │                       │ │           
                                                                             │               └───────────────────────┘ │           
                                                                             └─────────────────────────────────────────┘           
                                                                                                                                   
                                                                                                                                   
                                                                                                                                   
```
  • Loading branch information
Gabriel39 authored Nov 25, 2024
1 parent 7f436ae commit 1c3789e
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 24 deletions.
7 changes: 4 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,8 @@ void IRuntimeFilter::update_state() {
// In pipelineX, runtime filters will be ready or timeout before open phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
COUNTER_SET(_wait_timer,
int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS));
_rf_state_atomic = RuntimeFilterState::TIME_OUT;
}
}
Expand All @@ -1292,7 +1293,7 @@ PrimitiveType IRuntimeFilter::column_type() const {

void IRuntimeFilter::signal() {
DCHECK(is_consumer());
COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS));
_rf_state_atomic.store(RuntimeFilterState::READY);
if (!_filter_timer.empty()) {
for (auto& timer : _filter_timer) {
Expand Down Expand Up @@ -1539,7 +1540,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_time) {
_profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type()));
_profile->add_info_string("LocalMergeTime",
std::to_string(local_merge_time / 1000000000.0) + " s");
std::to_string((double)local_merge_time / NANOS_PER_SEC) + " s");
}

std::string IRuntimeFilter::debug_string() const {
Expand Down
14 changes: 6 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,6 @@ protected void computeFragmentHosts() throws Exception {

FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params);
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
instanceParam.perNodeSharedScans.put(planNodeId, sharedScan);
params.instanceExecParams.add(instanceParam);
}
params.ignoreDataDistribution = sharedScan;
Expand Down Expand Up @@ -2757,13 +2756,11 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
null, addressScanRange.getKey(), 0, params);

for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true);
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
Expand All @@ -2775,6 +2772,12 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
params.instanceExecParams.add(new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params));
}
int index = 0;
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) {
params.instanceExecParams.get(index % params.instanceExecParams.size())
.addBucketSeq(nodeScanRangeMap.first);
index++;
}
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
Expand Down Expand Up @@ -3131,10 +3134,8 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam = instanceExecParams.get(i);
Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
Map<Integer, Boolean> perNodeSharedScans = instanceExecParam.perNodeSharedScans;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
perNodeSharedScans = Maps.newHashMap();
}
if (!res.containsKey(instanceExecParam.host)) {
TPipelineFragmentParams params = new TPipelineFragmentParams();
Expand Down Expand Up @@ -3162,7 +3163,6 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {

params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setPerNodeSharedScans(perNodeSharedScans);
params.setTotalInstances(instanceExecParams.size());
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
Expand All @@ -3187,7 +3187,6 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {

localParams.setFragmentInstanceId(instanceExecParam.instanceId);
localParams.setPerNodeScanRanges(scanRanges);
localParams.setPerNodeSharedScans(perNodeSharedScans);
localParams.setSenderId(i);
localParams.setBackendNum(backendNum++);
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
Expand Down Expand Up @@ -3335,7 +3334,6 @@ static class FInstanceExecParam {
TUniqueId instanceId;
TNetworkAddress host;
Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newHashMap();
Map<Integer, Boolean> perNodeSharedScans = Maps.newHashMap();

int perFragmentInstanceIdx;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private static void setScanSourceParam(

boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
if (isLocalShuffle && ((LocalShuffleAssignedJob) instance).receiveDataFromLocal) {
// save thrift rpc message size, don't need perNodeScanRanges and perNodeSharedScans,
// save thrift rpc message size, don't need perNodeScanRanges,
// but the perNodeScanRanges is required rpc field
instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap());
return;
Expand Down Expand Up @@ -459,30 +459,26 @@ private static void ignoreDataDistribution(TPipelineFragmentParams currentFragme

private static PerNodeScanParams computeDefaultScanSourceParam(DefaultScanSource defaultScanSource) {
Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newLinkedHashMap();
Map<Integer, Boolean> perNodeSharedScans = Maps.newLinkedHashMap();
for (Entry<ScanNode, ScanRanges> kv : defaultScanSource.scanNodeToScanRanges.entrySet()) {
int scanNodeId = kv.getKey().getId().asInt();
perNodeScanRanges.put(scanNodeId, kv.getValue().params);
perNodeSharedScans.put(scanNodeId, true);
}

return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans);
return new PerNodeScanParams(perNodeScanRanges);
}

private static PerNodeScanParams computeBucketScanSourceParam(BucketScanSource bucketScanSource) {
Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newLinkedHashMap();
Map<Integer, Boolean> perNodeSharedScans = Maps.newLinkedHashMap();
for (Entry<Integer, Map<ScanNode, ScanRanges>> kv :
bucketScanSource.bucketIndexToScanNodeToTablets.entrySet()) {
Map<ScanNode, ScanRanges> scanNodeToRanges = kv.getValue();
for (Entry<ScanNode, ScanRanges> kv2 : scanNodeToRanges.entrySet()) {
int scanNodeId = kv2.getKey().getId().asInt();
List<TScanRangeParams> scanRanges = perNodeScanRanges.computeIfAbsent(scanNodeId, ArrayList::new);
scanRanges.addAll(kv2.getValue().params);
perNodeSharedScans.put(scanNodeId, true);
}
}
return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans);
return new PerNodeScanParams(perNodeScanRanges);
}

private static Map<Integer, Integer> computeBucketIdToInstanceId(
Expand Down Expand Up @@ -562,12 +558,9 @@ private static void filterInstancesWhichReceiveDataFromRemote(

private static class PerNodeScanParams {
Map<Integer, List<TScanRangeParams>> perNodeScanRanges;
Map<Integer, Boolean> perNodeSharedScans;

public PerNodeScanParams(Map<Integer, List<TScanRangeParams>> perNodeScanRanges,
Map<Integer, Boolean> perNodeSharedScans) {
public PerNodeScanParams(Map<Integer, List<TScanRangeParams>> perNodeScanRanges) {
this.perNodeScanRanges = perNodeScanRanges;
this.perNodeSharedScans = perNodeSharedScans;
}
}
}
4 changes: 2 additions & 2 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ struct TPipelineInstanceParams {
4: optional i32 sender_id
5: optional TRuntimeFilterParams runtime_filter_params
6: optional i32 backend_num
7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
8: optional list<i32> topn_filter_source_node_ids // deprecated after we set topn_filter_descs
9: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs
}
Expand Down Expand Up @@ -820,7 +820,7 @@ struct TPipelineFragmentParams {
33: optional i32 num_local_sink
34: optional i32 num_buckets
35: optional map<i32, i32> bucket_seq_to_instance_idx
36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
37: optional i32 parallel_instances
38: optional i32 total_instances
39: optional map<i32, i32> shuffle_idx_to_instance_idx
Expand Down

0 comments on commit 1c3789e

Please sign in to comment.