From a61a4d4034eac5d33b737ef330a940ade0cbce7a Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 8 Dec 2024 12:56:33 -0800 Subject: [PATCH] table level row count --- .../iceberg/source/IcebergScanNode.java | 18 ++++++++++++------ .../iceberg/source/IcebergSplit.java | 11 ++--------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index efcd646b873b2ae..c78140b9d3cd995 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -87,7 +87,13 @@ public class IcebergScanNode extends FileQueryScanNode { private IcebergSource source; private Table icebergTable; private List pushdownIcebergPredicates = Lists.newArrayList(); - private boolean pushDownCount = false; + // If tableLevelPushDownCount is true, means we can do count push down opt at table level. + // which means all splits have no position/equality delete files, + // so for query like "select count(*) from ice_tbl", we can get count from snapshot row count info directly. + // If tableLevelPushDownCount is false, means we can't do count push down opt at table level, + // But for part of splits which have no position/equality delete files, we can still do count push down opt. + // And for split level count push down opt, the flag is set in each split. + private boolean tableLevelPushDownCount = false; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; /** @@ -140,8 +146,8 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli int formatVersion = icebergSplit.getFormatVersion(); fileDesc.setFormatVersion(formatVersion); fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); - if (pushDownCount) { - fileDesc.setRowCount(icebergSplit.getRowCount()); + if (tableLevelPushDownCount) { + fileDesc.setRowCount(icebergSplit.getTableLevelRowCount()); } if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); @@ -271,7 +277,7 @@ private List doGetSplits(int numBackends) throws UserException { } long countFromSnapshot = getCountFromSnapshot(); if (countFromSnapshot >= 0) { - pushDownCount = true; + tableLevelPushDownCount = true; List pushDownCountSplits; if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) { int minSplits = sessionVariable.getParallelExecInstanceNum() * numBackends; @@ -424,8 +430,8 @@ private void assignCountToSplits(List splits, long totalCount) { int size = splits.size(); long countPerSplit = totalCount / size; for (int i = 0; i < size - 1; i++) { - ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit); + ((IcebergSplit) splits.get(i)).setTableLevelRowCount(countPerSplit); } - ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size); + ((IcebergSplit) splits.get(size - 1)).setTableLevelRowCount(countPerSplit + totalCount % size); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 580d3cf1bb23f38..0520612935a7781 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -37,7 +37,8 @@ public class IcebergSplit extends FileSplit { private Integer formatVersion; private List deleteFileFilters; private Map config; - private long rowCount = -1; + // tableLevelRowCount will be set only table-level count push down opt is available. + private long tableLevelRowCount = -1; // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, @@ -50,14 +51,6 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength, this.selfSplitWeight = length; } - public long getRowCount() { - return rowCount; - } - - public void setRowCount(long rowCount) { - this.rowCount = rowCount; - } - public void setDeleteFileFilters(List deleteFileFilters) { this.deleteFileFilters = deleteFileFilters; this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();