diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 45760d56ad8eef9..99dff605e031c77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -303,6 +303,7 @@ public void createScanRangeLocations() throws UserException { location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); curLocations.addToLocations(location); scanRangeLocations.add(curLocations); + scanBackendIds.add(backendId); return; } } @@ -353,6 +354,7 @@ public void createScanRangeLocations() throws UserException { // However, even one ScanNode instance can provide maximum scanning concurrency. scanRangeLocations.add(curLocations); setLocationPropertiesIfNecessary(backend, locationType, locationProperties); + scanBackendIds.add(backend.getId()); } } else { List inputSplits = getSplits(numBackends); @@ -370,6 +372,7 @@ public void createScanRangeLocations() throws UserException { scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys)); totalFileSize += split.getLength(); } + scanBackendIds.add(backend.getId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index 0de351d812cb864..e4aab923506cb16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -137,7 +137,7 @@ public List getSplits(int numBackends) throws UserException { List fileStatuses = tableValuedFunction.getFileStatuses(); // Push down count optimization. - // See commend in HiveScanNode.getFileSplitByPartitions + // See comment in HiveScanNode.getFileSplitByPartitions boolean needSplit = true; if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT && noNeedSplitForCountPushDown()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index afdffc748c01a65..41c055062e3e9bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -192,8 +192,6 @@ public class OlapScanNode extends ScanNode { private Set sampleTabletIds = Sets.newHashSet(); private TableSample tableSample; - private HashSet scanBackendIds = new HashSet<>(); - private Map tabletId2BucketSeq = Maps.newHashMap(); // a bucket seq may map to many tablets, and each tablet has a // TScanRangeLocations. @@ -1919,9 +1917,4 @@ public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column co public int getScanRangeNum() { return getScanTabletIds().size(); } - - @Override - public int numScanBackends() { - return scanBackendIds.size(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 85c8de68b8c1a5a..b4cc83c352db481 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -110,6 +110,11 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { protected TableSnapshot tableSnapshot; + // Save the id of backends which this scan node will be executed on. + // This is also important for local shuffle logic. + // Now only OlapScanNode and FileQueryScanNode implement this. + protected HashSet scanBackendIds = new HashSet<>(); + public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType) { super(id, desc.getId().asList(), planNodeName, statisticalType); this.desc = desc; @@ -731,7 +736,7 @@ && getScanRangeNum() } public int numScanBackends() { - return 0; + return scanBackendIds.size(); } public int getScanRangeNum() {