Skip to content

Commit

Permalink
[fix](coordinator) Fix wrong bucket assginment in old-version coordin… (
Browse files Browse the repository at this point in the history
apache#44539)

…ator

Follow-up for : apache#44459
  • Loading branch information
Gabriel39 authored Nov 25, 2024
1 parent aa92038 commit a89926a
Showing 1 changed file with 47 additions and 48 deletions.
95 changes: 47 additions & 48 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1433,31 +1433,8 @@ protected void computeFragmentExecParams() throws Exception {
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
// process bucket shuffle join on fragment without scan node
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();

dest.fragment_instance_id = new TUniqueId(-1, -1);
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

Set<TNetworkAddress> hostSet = new HashSet<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = params.destinations.size();
break;
}
}

TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(), bucketSeq);
bucketSeq++;
params.destinations.add(dest);
}
Expand Down Expand Up @@ -1504,6 +1481,50 @@ protected void computeFragmentExecParams() throws Exception {
}
}

private TPlanFragmentDestination setDestination(FragmentExecParams destParams, int recvrId, int bucketSeq)
throws Exception {
TPlanFragmentDestination dest = new TPlanFragmentDestination();
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
dest.fragment_instance_id = new TUniqueId(-1, -1);
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

if (destParams.ignoreDataDistribution) {
Map<TNetworkAddress, Pair<TUniqueId, Set<Integer>>> hostToInstanceIdAndBucketSeq
= new HashMap<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
hostToInstanceIdAndBucketSeq.putIfAbsent(instanceExecParams.host,
Pair.of(instanceExecParams.instanceId, new HashSet<>()));
hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.addAll(
instanceExecParams.bucketSeqSet);
}
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.contains(bucketSeq)) {
dest.fragment_instance_id = hostToInstanceIdAndBucketSeq.get(instanceExecParams.host)
.first;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = recvrId;
break;
}
}
} else {
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = recvrId;
break;
}
}
}
return dest;
}

private void computeMultiCastFragmentParams() throws Exception {
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (!(params.fragment instanceof MultiCastPlanFragment)) {
Expand Down Expand Up @@ -1556,31 +1577,9 @@ private void computeMultiCastFragmentParams() throws Exception {
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
// process bucket shuffle join on fragment without scan node
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();

dest.fragment_instance_id = new TUniqueId(-1, -1);
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

Set<TNetworkAddress> hostSet = new HashSet<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = params.destinations.size();
break;
}
}

TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(),
bucketSeq);
bucketSeq++;
destinations.add(dest);
}
Expand Down

0 comments on commit a89926a

Please sign in to comment.