From a89926a41f05e86a2460529a74f7878c89c54fac Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 25 Nov 2024 20:54:12 +0800 Subject: [PATCH] =?UTF-8?q?[fix](coordinator)=20Fix=20wrong=20bucket=20ass?= =?UTF-8?q?ginment=20in=20old-version=20coordin=E2=80=A6=20(#44539)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …ator Follow-up for : #44459 --- .../java/org/apache/doris/qe/Coordinator.java | 95 +++++++++---------- 1 file changed, 47 insertions(+), 48 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e508efde42dbf6..61ecaf7fc8b359 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1433,31 +1433,8 @@ protected void computeFragmentExecParams() throws Exception { destParams.instanceExecParams.get(0).bucketSeqSet.add(0); } // process bucket shuffle join on fragment without scan node - TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); while (bucketSeq < bucketNum) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - - dest.fragment_instance_id = new TUniqueId(-1, -1); - dest.server = dummyServer; - dest.setBrpcServer(dummyServer); - - Set hostSet = new HashSet<>(); - for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { - FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); - if (destParams.ignoreDataDistribution - && hostSet.contains(instanceExecParams.host)) { - continue; - } - hostSet.add(instanceExecParams.host); - if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { - dest.fragment_instance_id = instanceExecParams.instanceId; - dest.server = toRpcHost(instanceExecParams.host); - dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); - instanceExecParams.recvrId = params.destinations.size(); - break; - } - } - + TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(), bucketSeq); bucketSeq++; params.destinations.add(dest); } @@ -1504,6 +1481,50 @@ protected void computeFragmentExecParams() throws Exception { } } + private TPlanFragmentDestination setDestination(FragmentExecParams destParams, int recvrId, int bucketSeq) + throws Exception { + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); + dest.fragment_instance_id = new TUniqueId(-1, -1); + dest.server = dummyServer; + dest.setBrpcServer(dummyServer); + + if (destParams.ignoreDataDistribution) { + Map>> hostToInstanceIdAndBucketSeq + = new HashMap<>(); + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + hostToInstanceIdAndBucketSeq.putIfAbsent(instanceExecParams.host, + Pair.of(instanceExecParams.instanceId, new HashSet<>())); + hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.addAll( + instanceExecParams.bucketSeqSet); + } + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.contains(bucketSeq)) { + dest.fragment_instance_id = hostToInstanceIdAndBucketSeq.get(instanceExecParams.host) + .first; + dest.server = toRpcHost(instanceExecParams.host); + dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + instanceExecParams.recvrId = recvrId; + break; + } + } + } else { + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { + dest.fragment_instance_id = instanceExecParams.instanceId; + dest.server = toRpcHost(instanceExecParams.host); + dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + instanceExecParams.recvrId = recvrId; + break; + } + } + } + return dest; + } + private void computeMultiCastFragmentParams() throws Exception { for (FragmentExecParams params : fragmentExecParamsMap.values()) { if (!(params.fragment instanceof MultiCastPlanFragment)) { @@ -1556,31 +1577,9 @@ private void computeMultiCastFragmentParams() throws Exception { destParams.instanceExecParams.get(0).bucketSeqSet.add(0); } // process bucket shuffle join on fragment without scan node - TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); while (bucketSeq < bucketNum) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - - dest.fragment_instance_id = new TUniqueId(-1, -1); - dest.server = dummyServer; - dest.setBrpcServer(dummyServer); - - Set hostSet = new HashSet<>(); - for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { - FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); - if (destParams.ignoreDataDistribution - && hostSet.contains(instanceExecParams.host)) { - continue; - } - hostSet.add(instanceExecParams.host); - if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { - dest.fragment_instance_id = instanceExecParams.instanceId; - dest.server = toRpcHost(instanceExecParams.host); - dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); - instanceExecParams.recvrId = params.destinations.size(); - break; - } - } - + TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(), + bucketSeq); bucketSeq++; destinations.add(dest); }