Skip to content

Commit

Permalink
fix number backends
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Nov 18, 2024
1 parent ea17e9e commit d146e3f
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ public void createScanRangeLocations() throws UserException {
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
curLocations.addToLocations(location);
scanRangeLocations.add(curLocations);
scanBackendIds.add(backendId);
return;
}
}
Expand Down Expand Up @@ -353,6 +354,7 @@ public void createScanRangeLocations() throws UserException {
// However, even one ScanNode instance can provide maximum scanning concurrency.
scanRangeLocations.add(curLocations);
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
scanBackendIds.add(backend.getId());
}
} else {
List<Split> inputSplits = getSplits(numBackends);
Expand All @@ -370,6 +372,7 @@ public void createScanRangeLocations() throws UserException {
scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys));
totalFileSize += split.getLength();
}
scanBackendIds.add(backend.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public List<Split> getSplits(int numBackends) throws UserException {
List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses();

// Push down count optimization.
// See commend in HiveScanNode.getFileSplitByPartitions
// See comment in HiveScanNode.getFileSplitByPartitions
boolean needSplit = true;
if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT
&& noNeedSplitForCountPushDown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ public class OlapScanNode extends ScanNode {
private Set<Long> sampleTabletIds = Sets.newHashSet();
private TableSample tableSample;

private HashSet<Long> scanBackendIds = new HashSet<>();

private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
// a bucket seq may map to many tablets, and each tablet has a
// TScanRangeLocations.
Expand Down Expand Up @@ -1919,9 +1917,4 @@ public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column co
public int getScanRangeNum() {
return getScanTabletIds().size();
}

@Override
public int numScanBackends() {
return scanBackendIds.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {

protected TableSnapshot tableSnapshot;

// Save the id of backends which this scan node will be executed on.
// This is also important for local shuffle logic.
// Now only OlapScanNode and FileQueryScanNode implement this.
protected HashSet<Long> scanBackendIds = new HashSet<>();

public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType) {
super(id, desc.getId().asList(), planNodeName, statisticalType);
this.desc = desc;
Expand Down Expand Up @@ -731,7 +736,7 @@ && getScanRangeNum()
}

public int numScanBackends() {
return 0;
return scanBackendIds.size();
}

public int getScanRangeNum() {
Expand Down

0 comments on commit d146e3f

Please sign in to comment.