From 1c3789e47998ea2ebbe445ddbb447f95a2b18c4a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 25 Nov 2024 12:02:25 +0800 Subject: [PATCH] [fix](local shuffle) Fix bucket local shuffle (#44459) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Data in different buckets should be distributed into all tasks after bucket-hash local exchange. Before: ``` ┌─────────────────────────────────────────┐ │ ┌─────────┐ ┌───────────────────────┐ │ │ │Bucket 0 │ │ │ │ │ └─────────┘ │ │ │ │ │ LOCAL EXCHANGE SOURCE │ │ ┌──────► ┌─────────┐ │ (BUCKET HASH) │ │ │ │ │Bucket 1 │ │ │ │ │ │ └─────────┘ └───────────────────────┘ │ │ └─────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────┐ │ │ ┌─────────┐ ┌───────┐ ┌─────────────────────┐ │ │ │ │Bucket 0 │ │ │ │ │ │ │ │ └─────────┘ │ │ │ │ │ │ │ │ SCAN │ │ LOCAL EXCHANGE SINK │ ├────────┤ │ ┌─────────┐ │ │ │ (BUCKET HASH) │ │ │ │ │Bucket 1 │ │ │ │ │ │ │ │ └─────────┘ └───────┘ └─────────────────────┘ │ │ └─────────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────┐ │ │ ┌───────────────────────┐ │ │ │ │ │ │ │ │ │ │ │ │ │ │ LOCAL EXCHANGE SOURCE │ │ └──────► │ (BUCKET HASH) │ │ │ │ │ │ │ └───────────────────────┘ │ └─────────────────────────────────────────┘ ``` After ``` ┌─────────────────────────────────────────┐ │ ┌─────────┐ ┌───────────────────────┐ │ │ │Bucket 0 │ │ │ │ │ └─────────┘ │ │ │ │ │ LOCAL EXCHANGE SOURCE │ │ ┌──────► │ (BUCKET HASH) │ │ │ │ │ │ │ │ │ └───────────────────────┘ │ │ └─────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────┐ │ │ ┌─────────┐ ┌───────┐ ┌─────────────────────┐ │ │ │ │Bucket 0 │ │ │ │ │ │ │ │ └─────────┘ │ │ │ │ │ │ │ │ SCAN │ │ LOCAL EXCHANGE SINK │ ├────────┤ │ ┌─────────┐ │ │ │ (BUCKET HASH) │ │ │ │ │Bucket 1 │ │ │ │ │ │ │ │ └─────────┘ └───────┘ └─────────────────────┘ │ │ └─────────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────┐ │ │ ┌───────────────────────┐ │ │ │ │ │ │ │ │ ┌─────────┐ │ │ │ │ │ │Bucket 1 │ │ LOCAL EXCHANGE SOURCE │ │ └──────► └─────────┘ │ (BUCKET HASH) │ │ │ │ │ │ │ └───────────────────────┘ │ └─────────────────────────────────────────┘ ``` --- be/src/exprs/runtime_filter.cpp | 7 ++++--- .../java/org/apache/doris/qe/Coordinator.java | 14 ++++++-------- .../doris/qe/runtime/ThriftPlansBuilder.java | 15 ++++----------- gensrc/thrift/PaloInternalService.thrift | 4 ++-- 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 24333360ff6254..bac14b616b2ce6 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1273,7 +1273,8 @@ void IRuntimeFilter::update_state() { // In pipelineX, runtime filters will be ready or timeout before open phase. if (expected == RuntimeFilterState::NOT_READY) { DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms); - COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_); + COUNTER_SET(_wait_timer, + int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS)); _rf_state_atomic = RuntimeFilterState::TIME_OUT; } } @@ -1292,7 +1293,7 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); - COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_); + COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS)); _rf_state_atomic.store(RuntimeFilterState::READY); if (!_filter_timer.empty()) { for (auto& timer : _filter_timer) { @@ -1539,7 +1540,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_time) { _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); _profile->add_info_string("LocalMergeTime", - std::to_string(local_merge_time / 1000000000.0) + " s"); + std::to_string((double)local_merge_time / NANOS_PER_SEC) + " s"); } std::string IRuntimeFilter::debug_string() const { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index acd0fbe0daeaaf..e508efde42dbf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1935,7 +1935,6 @@ protected void computeFragmentHosts() throws Exception { FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); - instanceParam.perNodeSharedScans.put(planNodeId, sharedScan); params.instanceExecParams.add(instanceParam); } params.ignoreDataDistribution = sharedScan; @@ -2757,13 +2756,11 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc null, addressScanRange.getKey(), 0, params); for (Pair>> nodeScanRangeMap : scanRange) { - instanceParam.addBucketSeq(nodeScanRangeMap.first); for (Map.Entry> nodeScanRange : nodeScanRangeMap.second.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { range.put(nodeScanRange.getKey(), Lists.newArrayList()); instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); - instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true); } range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()) @@ -2775,6 +2772,12 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc params.instanceExecParams.add(new FInstanceExecParam( null, addressScanRange.getKey(), 0, params)); } + int index = 0; + for (Pair>> nodeScanRangeMap : scanRange) { + params.instanceExecParams.get(index % params.instanceExecParams.size()) + .addBucketSeq(nodeScanRangeMap.first); + index++; + } } else { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { @@ -3131,10 +3134,8 @@ Map toThrift(int backendNum) { for (int i = 0; i < instanceExecParams.size(); ++i) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); Map> scanRanges = instanceExecParam.perNodeScanRanges; - Map perNodeSharedScans = instanceExecParam.perNodeSharedScans; if (scanRanges == null) { scanRanges = Maps.newHashMap(); - perNodeSharedScans = Maps.newHashMap(); } if (!res.containsKey(instanceExecParam.host)) { TPipelineFragmentParams params = new TPipelineFragmentParams(); @@ -3162,7 +3163,6 @@ Map toThrift(int backendNum) { params.setFileScanParams(fileScanRangeParamsMap); params.setNumBuckets(fragment.getBucketNum()); - params.setPerNodeSharedScans(perNodeSharedScans); params.setTotalInstances(instanceExecParams.size()); if (ignoreDataDistribution) { params.setParallelInstances(parallelTasksNum); @@ -3187,7 +3187,6 @@ Map toThrift(int backendNum) { localParams.setFragmentInstanceId(instanceExecParam.instanceId); localParams.setPerNodeScanRanges(scanRanges); - localParams.setPerNodeSharedScans(perNodeSharedScans); localParams.setSenderId(i); localParams.setBackendNum(backendNum++); localParams.setRuntimeFilterParams(new TRuntimeFilterParams()); @@ -3335,7 +3334,6 @@ static class FInstanceExecParam { TUniqueId instanceId; TNetworkAddress host; Map> perNodeScanRanges = Maps.newHashMap(); - Map perNodeSharedScans = Maps.newHashMap(); int perFragmentInstanceIdx; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index f0e3febe192854..a02ee90e901cd5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -423,7 +423,7 @@ private static void setScanSourceParam( boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob; if (isLocalShuffle && ((LocalShuffleAssignedJob) instance).receiveDataFromLocal) { - // save thrift rpc message size, don't need perNodeScanRanges and perNodeSharedScans, + // save thrift rpc message size, don't need perNodeScanRanges, // but the perNodeScanRanges is required rpc field instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap()); return; @@ -459,19 +459,16 @@ private static void ignoreDataDistribution(TPipelineFragmentParams currentFragme private static PerNodeScanParams computeDefaultScanSourceParam(DefaultScanSource defaultScanSource) { Map> perNodeScanRanges = Maps.newLinkedHashMap(); - Map perNodeSharedScans = Maps.newLinkedHashMap(); for (Entry kv : defaultScanSource.scanNodeToScanRanges.entrySet()) { int scanNodeId = kv.getKey().getId().asInt(); perNodeScanRanges.put(scanNodeId, kv.getValue().params); - perNodeSharedScans.put(scanNodeId, true); } - return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans); + return new PerNodeScanParams(perNodeScanRanges); } private static PerNodeScanParams computeBucketScanSourceParam(BucketScanSource bucketScanSource) { Map> perNodeScanRanges = Maps.newLinkedHashMap(); - Map perNodeSharedScans = Maps.newLinkedHashMap(); for (Entry> kv : bucketScanSource.bucketIndexToScanNodeToTablets.entrySet()) { Map scanNodeToRanges = kv.getValue(); @@ -479,10 +476,9 @@ private static PerNodeScanParams computeBucketScanSourceParam(BucketScanSource b int scanNodeId = kv2.getKey().getId().asInt(); List scanRanges = perNodeScanRanges.computeIfAbsent(scanNodeId, ArrayList::new); scanRanges.addAll(kv2.getValue().params); - perNodeSharedScans.put(scanNodeId, true); } } - return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans); + return new PerNodeScanParams(perNodeScanRanges); } private static Map computeBucketIdToInstanceId( @@ -562,12 +558,9 @@ private static void filterInstancesWhichReceiveDataFromRemote( private static class PerNodeScanParams { Map> perNodeScanRanges; - Map perNodeSharedScans; - public PerNodeScanParams(Map> perNodeScanRanges, - Map perNodeSharedScans) { + public PerNodeScanParams(Map> perNodeScanRanges) { this.perNodeScanRanges = perNodeScanRanges; - this.perNodeSharedScans = perNodeSharedScans; } } } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 392aa8658df1d2..9a0fd910d94387 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -776,7 +776,7 @@ struct TPipelineInstanceParams { 4: optional i32 sender_id 5: optional TRuntimeFilterParams runtime_filter_params 6: optional i32 backend_num - 7: optional map per_node_shared_scans + 7: optional map per_node_shared_scans // deprecated 8: optional list topn_filter_source_node_ids // deprecated after we set topn_filter_descs 9: optional list topn_filter_descs } @@ -820,7 +820,7 @@ struct TPipelineFragmentParams { 33: optional i32 num_local_sink 34: optional i32 num_buckets 35: optional map bucket_seq_to_instance_idx - 36: optional map per_node_shared_scans + 36: optional map per_node_shared_scans // deprecated 37: optional i32 parallel_instances 38: optional i32 total_instances 39: optional map shuffle_idx_to_instance_idx