diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 8f130ca6002d5d4..837269b0bb355d4 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -96,25 +96,25 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma _iceberg_profile.delete_rows_sort_time = ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); if (range.table_format_params.iceberg_params.__isset.row_count) { - _remaining_push_down_count = range.table_format_params.iceberg_params.row_count; + _remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count; } else { - _remaining_push_down_count = -1; + _remaining_table_level_row_count = -1; } } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { // already get rows from be - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { - auto rows = - std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); - _remaining_push_down_count -= rows; + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { + auto rows = std::min(_remaining_table_level_row_count, + (int64_t)_state->query_options().batch_size); + _remaining_table_level_row_count -= rows; auto mutate_columns = block->mutate_columns(); for (auto& col : mutate_columns) { col->resize(rows); } block->set_columns(std::move(mutate_columns)); *read_rows = rows; - if (_remaining_push_down_count == 0) { + if (_remaining_table_level_row_count == 0) { *eof = true; } @@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns( Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) { // We get the count value by doris's be, so we don't need to read the delete file - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { return Status::OK(); } @@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOC if (position_delete_files.size() > 0) { RETURN_IF_ERROR( _position_delete_base(table_desc.original_file_path, position_delete_files)); + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } if (equality_delete_files.size() > 0) { RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 2e240f465b6a2cd..ee7dcdd68d24fa6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -167,7 +167,9 @@ class IcebergTableReader : public TableFormatReader { bool _has_schema_change = false; bool _has_iceberg_schema = false; - int64_t _remaining_push_down_count; + // the table level row count for optimizing query like: + // select count(*) from table; + int64_t _remaining_table_level_row_count; Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index 263fdc8014bc36a..055d6179b2c4221 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext return Status::OK(); } + // set push down agg type to NONE because we can not do count push down opt + // if there are delete files. + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); const auto& deletion_file = table_desc.deletion_file; io::FileSystemProperties properties = { .system_type = _params.file_type, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 296e59f8df16e70..d53bb105c70600d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -788,6 +788,9 @@ Status VFileScanner::_get_next_reader() { _should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache() : nullptr, _state->query_options().enable_parquet_lazy_mat); + // ATTN: the push down agg type may be set back to NONE, + // see IcebergTableReader::init_row_filters for example. + parquet_reader->set_push_down_agg_type(_get_push_down_agg_type()); { SCOPED_TIMER(_open_reader_timer); RETURN_IF_ERROR(parquet_reader->open()); @@ -853,6 +856,7 @@ Status VFileScanner::_get_next_reader() { _profile, _state, *_params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat, unsupported_pushdown_types); + orc_reader->set_push_down_agg_type(_get_push_down_agg_type()); if (push_down_predicates) { RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java index b1e7c7c89e91f5b..9eacb8a0422695c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java @@ -26,6 +26,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.tablefunction.BackendsTableValuedFunction; import org.apache.doris.tablefunction.LocalTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionIf; @@ -119,8 +120,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException { analyzeJoin(analyzer); } - public ScanNode getScanNode(PlanNodeId id) { - return tableFunction.getScanNode(id, desc); + public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) { + return tableFunction.getScanNode(id, desc, sv); } public TableValuedFunctionIf getTableFunction() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 5ea2a2637d86e27..4a071fa6682c579 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -39,6 +39,7 @@ import org.apache.doris.datasource.hive.source.HiveSplit; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; @@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode { protected String brokerName; protected TableSnapshot tableSnapshot; + // Save the reference of session variable, so that we don't need to get it from connection context. + // connection context is a thread local variable, it is not available is running in other thread. + protected SessionVariable sessionVariable; /** * External file scan node for Query hms table @@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends FileScanNode { * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv) { + StatisticalType statisticalType, boolean needCheckColumnPriv, + SessionVariable sv) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + this.sessionVariable = sv; } @Override @@ -112,7 +118,6 @@ public void init(Analyzer analyzer) throws UserException { ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime(); } super.init(analyzer); - initFileSplitSize(); doInitialize(); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime(); @@ -314,6 +319,7 @@ public void createScanRangeLocations() throws UserException { params.setProperties(locationProperties); } + int numBackends = backendPolicy.numBackends(); List pathPartitionKeys = getPathPartitionKeys(); if (isBatchMode()) { // File splits are generated lazily, and fetched by backends while scanning. @@ -356,7 +362,7 @@ public void createScanRangeLocations() throws UserException { scanBackendIds.add(backend.getId()); } } else { - List inputSplits = getSplits(); + List inputSplits = getSplits(numBackends); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); } @@ -605,4 +611,19 @@ public TableSnapshot getQueryTableSnapshot() { } return this.tableSnapshot; } + + /** + * The real file split size is determined by: + * 1. If user specify the split size in session variable `file_split_size`, use user specified value. + * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size. + * @param blockSize, got from file system, eg, hdfs + * @return the real file split size + */ + protected long getRealFileSplitSize(long blockSize) { + long realSplitSize = sessionVariable.getFileSplitSize(); + if (realSplitSize <= 0) { + realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize); + } + return realSplitSize; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 29fdb2b09acfcd6..b7d343123133086 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -25,15 +25,10 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.LocationPath; -import org.apache.doris.common.util.Util; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanNode; import org.apache.doris.thrift.TFileScanRangeParams; @@ -46,11 +41,9 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import org.apache.hadoop.fs.BlockLocation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -68,8 +61,6 @@ public abstract class FileScanNode extends ExternalScanNode { // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; - protected long fileSplitSize; - protected boolean isSplitSizeSetBySession = false; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -77,19 +68,6 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St this.needCheckColumnPriv = needCheckColumnPriv; } - @Override - public void init() throws UserException { - initFileSplitSize(); - } - - protected void initFileSplitSize() { - this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); - this.isSplitSizeSetBySession = this.fileSplitSize > 0; - if (this.fileSplitSize <= 0) { - this.fileSplitSize = DEFAULT_SPLIT_SIZE; - } - } - @Override protected void toThrift(TPlanNode planNode) { planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); @@ -256,61 +234,4 @@ protected void setDefaultValueExprs(TableIf tbl, } } } - - protected List splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length, - long modificationTime, boolean splittable, List partitionValues, SplitCreator splitCreator) - throws IOException { - if (blockLocations == null) { - blockLocations = new BlockLocation[0]; - } - List result = Lists.newArrayList(); - TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get()); - if (!splittable || compressType != TFileCompressType.PLAIN) { - if (LOG.isDebugEnabled()) { - LOG.debug("Path {} is not splittable.", path); - } - String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); - result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues)); - return result; - } - // if file split size is set by session variable, use session variable. - // Otherwise, use max(file split size, block size) - if (!isSplitSizeSetBySession) { - fileSplitSize = Math.max(fileSplitSize, blockSize); - } - long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; - bytesRemaining -= fileSplitSize) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize, - length, modificationTime, hosts, partitionValues)); - } - if (bytesRemaining != 0L) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, - length, modificationTime, hosts, partitionValues)); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Path {} includes {} splits.", path, result.size()); - } - return result; - } - - protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { - if (blkLocations == null || blkLocations.length == 0) { - return -1; - } - for (int i = 0; i < blkLocations.length; ++i) { - if (blkLocations[i].getOffset() <= offset - && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) { - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1L; - throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java index 1ebb390e90438f7..37e66c7056fbd79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java @@ -87,9 +87,12 @@ public static class FileSplitCreator implements SplitCreator { @Override public Split create(LocationPath path, long start, long length, long fileLength, + long fileSplitSize, long modificationTime, String[] hosts, List partitionValues) { - return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); + FileSplit split = new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); + split.setTargetSplitSize(fileSplitSize); + return split; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java new file mode 100644 index 000000000000000..b923c87d3ac96d1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.common.util.Util; +import org.apache.doris.spi.Split; +import org.apache.doris.thrift.TFileCompressType; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.List; + +public class FileSplitter { + private static final Logger LOG = LogManager.getLogger(FileSplitter.class); + + // If the number of files is larger than parallel instances * num of backends, + // we don't need to split the file. + // Otherwise, split the file to avoid local shuffle. + public static boolean needSplitForCountPushdown(int parallelism, int numBackends, long totalFileNum) { + return totalFileNum < parallelism * numBackends; + } + + public static List splitFile( + LocationPath path, + long fileSplitSize, + BlockLocation[] blockLocations, + long length, + long modificationTime, + boolean splittable, + List partitionValues, + SplitCreator splitCreator) + throws IOException { + if (blockLocations == null) { + blockLocations = new BlockLocation[0]; + } + List result = Lists.newArrayList(); + TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get()); + if (!splittable || compressType != TFileCompressType.PLAIN) { + if (LOG.isDebugEnabled()) { + LOG.debug("Path {} is not splittable.", path); + } + String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); + result.add(splitCreator.create(path, 0, length, length, fileSplitSize, + modificationTime, hosts, partitionValues)); + return result; + } + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; + bytesRemaining -= fileSplitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize, + length, fileSplitSize, modificationTime, hosts, partitionValues)); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, + length, fileSplitSize, modificationTime, hosts, partitionValues)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Path {} includes {} splits.", path, result.size()); + } + return result; + } + + private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { + if (blkLocations == null || blkLocations.length == 0) { + return -1; + } + for (int i = 0; i < blkLocations.length; ++i) { + if (blkLocations[i].getOffset() <= offset + && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1L; + throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); + } + + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java index 928854b91d18102..a26abc7fc5e037f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java @@ -69,7 +69,7 @@ public SplitAssignment( } public void init() throws UserException { - splitGenerator.startSplit(); + splitGenerator.startSplit(backendPolicy.numBackends()); synchronized (assignLock) { while (sampleSplit == null && waitFirstSplit()) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java index 4df30459db7021c..6df84d2f0f5ee94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java @@ -23,6 +23,6 @@ import java.util.List; public interface SplitCreator { - Split create(LocationPath path, long start, long length, long fileLength, + Split create(LocationPath path, long start, long length, long fileLength, long fileSplitSize, long modificationTime, String[] hosts, List partitionValues); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java index c4a373bc85b3b12..34ff3911445bfeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java @@ -30,8 +30,9 @@ public interface SplitGenerator { /** * Get all file splits if the producer doesn't support batch mode. + * @param numBackends the number of backends, this is useful when determine the number of splits. */ - default List getSplits() throws UserException { + default List getSplits(int numBackends) throws UserException { // todo: remove this interface if batch mode is stable throw new NotImplementedException("Not implement"); } @@ -51,7 +52,7 @@ default int numApproximateSplits() { return -1; } - default void startSplit() { + default void startSplit(int numBackends) { } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 99d3cd1cd216226..35b21c368ea9f92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -32,6 +32,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.FileSplitter; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -44,12 +45,14 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TPushAggOp; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -98,15 +101,13 @@ public class HiveScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv); - hmsTable = (HMSExternalTable) desc.getTable(); - brokerName = hmsTable.getCatalog().bindBrokerName(); + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv); } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv) { - super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); } @@ -163,7 +164,7 @@ protected List getPartitions() throws AnalysisException { } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { long start = System.currentTimeMillis(); try { if (!partitionInit) { @@ -174,7 +175,7 @@ public List getSplits() throws UserException { .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); List allFiles = Lists.newArrayList(); - getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName); + getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName, numBackends); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime(); } @@ -193,7 +194,7 @@ public List getSplits() throws UserException { } @Override - public void startSplit() { + public void startSplit(int numBackends) { if (prunedPartitions.isEmpty()) { splitAssignment.finishSchedule(); return; @@ -214,12 +215,12 @@ public void startSplit() { try { List allFiles = Lists.newArrayList(); getFileSplitByPartitions( - cache, Collections.singletonList(partition), allFiles, bindBrokerName); + cache, Collections.singletonList(partition), allFiles, bindBrokerName, numBackends); if (allFiles.size() > numSplitsPerPartition.get()) { numSplitsPerPartition.set(allFiles.size()); } splitAssignment.addToQueue(allFiles); - } catch (IOException e) { + } catch (Exception e) { batchException.set(new UserException(e.getMessage(), e)); } finally { splittersOnFlight.release(); @@ -263,7 +264,7 @@ public int numApproximateSplits() { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles, String bindBrokerName) throws IOException { + List allFiles, String bindBrokerName, int numBackends) throws IOException, UserException { List fileCaches; if (hiveTransaction != null) { fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); @@ -276,11 +277,34 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, List hiveFileStatuses) throws IOException { for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) { - allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(), + allFiles.addAll(FileSplitter.splitFile(status.getPath(), getRealFileSplitSize(status.getBlockSize()), status.getBlockLocations(), status.getLength(), status.getModificationTime(), status.isSplittable(), status.getPartitionValues(), new HiveSplitCreator(status.getAcidInfo()))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java index 5dd63e734c9c911..58bfb32e617e340 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java @@ -54,9 +54,13 @@ public HiveSplitCreator(AcidInfo acidInfo) { @Override public Split create(LocationPath path, long start, long length, long fileLength, + long fileSplitSize, long modificationTime, String[] hosts, List partitionValues) { - return new HiveSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues, acidInfo); + HiveSplit split = new HiveSplit(path, start, length, fileLength, modificationTime, + hosts, partitionValues, acidInfo); + split.setTargetSplitSize(fileSplitSize); + return split; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index b2cad8ab710178f..e1dfaa40aefce19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -125,8 +125,8 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, Optional scanParams, Optional incrementalRelation, - SessionVariable sessionVariable) { - super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); + SessionVariable sv) { + super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, sv); isCowTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { if (isCowTable) { @@ -390,7 +390,7 @@ private void getPartitionsSplits(List partitions, List spl } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) { return getIncrementalSplits(); } @@ -406,7 +406,7 @@ public List getSplits() throws UserException { } @Override - public void startSplit() { + public void startSplit(int numBackends) { if (prunedPartitions.isEmpty()) { splitAssignment.finishSchedule(); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index f7b58158d1a72c9..c78140b9d3cd995 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -36,7 +36,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -87,7 +87,13 @@ public class IcebergScanNode extends FileQueryScanNode { private IcebergSource source; private Table icebergTable; private List pushdownIcebergPredicates = Lists.newArrayList(); - private boolean pushDownCount = false; + // If tableLevelPushDownCount is true, means we can do count push down opt at table level. + // which means all splits have no position/equality delete files, + // so for query like "select count(*) from ice_tbl", we can get count from snapshot row count info directly. + // If tableLevelPushDownCount is false, means we can't do count push down opt at table level, + // But for part of splits which have no position/equality delete files, we can still do count push down opt. + // And for split level count push down opt, the flag is set in each split. + private boolean tableLevelPushDownCount = false; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; /** @@ -96,8 +102,8 @@ public class IcebergScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv); + public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv, sv); ExternalTable table = (ExternalTable) desc.getTable(); if (table instanceof HMSExternalTable) { @@ -140,8 +146,8 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli int formatVersion = icebergSplit.getFormatVersion(); fileDesc.setFormatVersion(formatVersion); fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); - if (pushDownCount) { - fileDesc.setRowCount(icebergSplit.getRowCount()); + if (tableLevelPushDownCount) { + fileDesc.setRowCount(icebergSplit.getTableLevelRowCount()); } if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); @@ -177,11 +183,12 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli } @Override - public List getSplits() throws UserException { - return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), this::doGetSplits); + public List getSplits(int numBackends) throws UserException { + return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), + () -> doGetSplits(numBackends)); } - private List doGetSplits() throws UserException { + private List doGetSplits(int numBackends) throws UserException { TableScan scan = icebergTable.newScan(); // set snapshot @@ -209,9 +216,10 @@ private List doGetSplits() throws UserException { HashSet partitionPathSet = new HashSet<>(); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); - CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize); + long realFileSplitSize = getRealFileSplitSize(0); + CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize); try (CloseableIterable combinedScanTasks = - TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) { + TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { List partitionValues = new ArrayList<>(); if (isPartitionedTable) { @@ -250,6 +258,7 @@ private List doGetSplits() throws UserException { source.getCatalog().getProperties(), partitionValues, originalPath); + split.setTargetSplitSize(realFileSplitSize); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } @@ -268,11 +277,11 @@ private List doGetSplits() throws UserException { } long countFromSnapshot = getCountFromSnapshot(); if (countFromSnapshot >= 0) { - pushDownCount = true; + tableLevelPushDownCount = true; List pushDownCountSplits; if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) { - int parallelNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); - pushDownCountSplits = splits.subList(0, Math.min(splits.size(), parallelNum)); + int minSplits = sessionVariable.getParallelExecInstanceNum() * numBackends; + pushDownCountSplits = splits.subList(0, Math.min(splits.size(), minSplits)); } else { pushDownCountSplits = Collections.singletonList(splits.get(0)); } @@ -282,7 +291,6 @@ private List doGetSplits() throws UserException { } selectedPartitionNum = partitionPathSet.size(); - splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } @@ -422,8 +430,8 @@ private void assignCountToSplits(List splits, long totalCount) { int size = splits.size(); long countPerSplit = totalCount / size; for (int i = 0; i < size - 1; i++) { - ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit); + ((IcebergSplit) splits.get(i)).setTableLevelRowCount(countPerSplit); } - ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size); + ((IcebergSplit) splits.get(size - 1)).setTableLevelRowCount(countPerSplit + totalCount % size); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 580d3cf1bb23f38..0520612935a7781 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -37,7 +37,8 @@ public class IcebergSplit extends FileSplit { private Integer formatVersion; private List deleteFileFilters; private Map config; - private long rowCount = -1; + // tableLevelRowCount will be set only table-level count push down opt is available. + private long tableLevelRowCount = -1; // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, @@ -50,14 +51,6 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength, this.selfSplitWeight = length; } - public long getRowCount() { - return rowCount; - } - - public void setRowCount(long rowCount) { - this.rowCount = rowCount; - } - public void setDeleteFileFilters(List deleteFileFilters) { this.deleteFileFilters = deleteFileFilters; this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java index a7311055cffcc9b..80f037239ce3f25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.property.constants.MinioProperties; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; @@ -80,8 +81,8 @@ public class LakeSoulScanNode extends FileQueryScanNode { String readType; - public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv); + public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv, sv); } @Override @@ -209,7 +210,7 @@ private void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulS rangeDesc.setTableFormatParams(tableFormatFileDesc); } - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { if (LOG.isDebugEnabled()) { LOG.debug("getSplits with columnFilters={}", columnFilters); LOG.debug("getSplits with columnNameToRange={}", columnNameToRange); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index 9fa22a0fffaab8b..e4bb8b5e9dcc533 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -44,7 +44,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.nereids.util.DateUtils; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; @@ -107,21 +107,23 @@ public class MaxComputeScanNode extends FileQueryScanNode { // For new planner public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, - SelectedPartitions selectedPartitions, boolean needCheckColumnPriv) { + SelectedPartitions selectedPartitions, boolean needCheckColumnPriv, + SessionVariable sv) { this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, - selectedPartitions, needCheckColumnPriv); + selectedPartitions, needCheckColumnPriv, sv); } // For old planner - public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + SessionVariable sv) { this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, - SelectedPartitions.NOT_PRUNED, needCheckColumnPriv); + SelectedPartitions.NOT_PRUNED, needCheckColumnPriv, sv); } private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, SelectedPartitions selectedPartitions, - boolean needCheckColumnPriv) { - super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv); table = (MaxComputeExternalTable) desc.getTable(); this.selectedPartitions = selectedPartitions; } @@ -214,7 +216,7 @@ public boolean isBatchMode() { return false; } - int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + int numPartitions = sessionVariable.getNumPartitionsInBatchMode(); return numPartitions > 0 && selectedPartitions != SelectedPartitions.NOT_PRUNED && selectedPartitions.selectedPartitions.size() >= numPartitions; @@ -226,7 +228,7 @@ public int numApproximateSplits() { } @Override - public void startSplit() { + public void startSplit(int numBackends) { this.totalPartitionNum = selectedPartitions.totalPartitionNum; this.selectedPartitionNum = selectedPartitions.selectedPartitions.size(); @@ -241,8 +243,7 @@ public void startSplit() { (key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key)) ); - - int batchNumPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + int batchNumPartitions = sessionVariable.getNumPartitionsInBatchMode(); Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor(); AtomicReference batchException = new AtomicReference<>(null); @@ -546,7 +547,7 @@ protected Map getLocationProperties() throws UserException { return new HashMap<>(); } - List getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws java.io.IOException { + private List getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws IOException { List result = new ArrayList<>(); String scanSessionSerialize = serializeSession(tableBatchReadSession); InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner(); @@ -595,9 +596,8 @@ List getSplitByTableSession(TableBatchReadSession tableBatchReadSession) return result; } - @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { List result = new ArrayList<>(); com.aliyun.odps.Table odpsTable = table.getOdpsTable(); if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 5009ec3c9049fb6..28efbc58f51b84e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -25,6 +25,7 @@ import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.FileSplitter; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.planner.PlanNodeId; @@ -36,6 +37,7 @@ import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TPaimonDeletionFileDesc; import org.apache.doris.thrift.TPaimonFileDesc; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; @@ -101,15 +103,16 @@ public String toString() { private int rawFileSplitNum = 0; private int paimonSplitNum = 0; private List splitStats = new ArrayList<>(); - private SessionVariable sessionVariable; private String serializedTable; + private boolean pushDownCount = false; + private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; + public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, - SessionVariable sessionVariable) { - super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); - this.sessionVariable = sessionVariable; + SessionVariable sv) { + super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv, sv); } @Override @@ -199,7 +202,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { boolean forceJniScanner = sessionVariable.isForceJniScanner(); SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType .valueOf(sessionVariable.getIgnoreSplitType()); @@ -211,6 +214,8 @@ public List getSplits() throws UserException { List paimonSplits = readBuilder.withFilter(predicates) .withProjection(projected) .newScan().plan().splits(); + + boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT; // Just for counting the number of selected partitions for this paimon table Set selectedPartitionValues = Sets.newHashSet(); for (org.apache.paimon.table.source.Split split : paimonSplits) { @@ -238,9 +243,9 @@ public List getSplits() throws UserException { LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties()); try { - List dorisSplits = splitFile( + List dorisSplits = FileSplitter.splitFile( locationPath, - 0, + getRealFileSplitSize(0), null, file.length(), -1, @@ -261,25 +266,7 @@ public List getSplits() throws UserException { } } } else { - for (RawFile file : rawFiles) { - LocationPath locationPath = new LocationPath(file.path(), - source.getCatalog().getProperties()); - try { - splits.addAll( - splitFile( - locationPath, - 0, - null, - file.length(), - -1, - true, - null, - PaimonSplit.PaimonSplitCreator.DEFAULT)); - ++rawFileSplitNum; - } catch (IOException e) { - throw new UserException("Paimon error to split file: " + e.getMessage(), e); - } - } + createRawFileSplits(rawFiles, splits, applyCountPushdown ? Long.MAX_VALUE : 0); } } else { if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { @@ -297,14 +284,34 @@ public List getSplits() throws UserException { } splitStats.add(splitStat); } + this.selectedPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number - // We should set fileSplitSize at the end because fileSplitSize may be modified - // in splitFile. - splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } + private void createRawFileSplits(List rawFiles, List splits, long blockSize) throws UserException { + for (RawFile file : rawFiles) { + LocationPath locationPath = new LocationPath(file.path(), + source.getCatalog().getProperties()); + try { + splits.addAll( + FileSplitter.splitFile( + locationPath, + getRealFileSplitSize(blockSize), + null, + file.length(), + -1, + true, + null, + PaimonSplit.PaimonSplitCreator.DEFAULT)); + ++rawFileSplitNum; + } catch (IOException e) { + throw new UserException("Paimon error to split file: " + e.getMessage(), e); + } + } + } + private String getFileFormat(String path) { return FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index 3ab38c7db28e9e8..988f043ad0e7d7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -38,7 +38,6 @@ public class PaimonSplit extends FileSplit { private TableFormatType tableFormatType; private Optional optDeletionFile; - public PaimonSplit(Split split) { super(DUMMY_PATH, 0, 0, 0, 0, null, null); this.split = split; @@ -100,10 +99,14 @@ public org.apache.doris.spi.Split create(LocationPath path, long start, long length, long fileLength, + long fileSplitSize, long modificationTime, String[] hosts, List partitionValues) { - return new PaimonSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); + PaimonSplit split = new PaimonSplit(path, start, length, fileLength, + modificationTime, hosts, partitionValues); + split.setTargetSplitSize(fileSplitSize); + return split; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java index 6f660993d6350ab..50c1d5752a1fe30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileAttributes; @@ -97,8 +98,10 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { private ConnectorMetadata connectorMetadata; private Constraint constraint; - public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv); + public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + SessionVariable sv) { + super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv, + sv); } @Override @@ -129,7 +132,7 @@ protected void convertPredicate() { } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { // 1. Get necessary objects Connector connector = source.getConnector(); connectorMetadata = source.getConnectorMetadata(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index b0f0406c215c340..5e65093036517e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -29,7 +29,9 @@ import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.FileSplit.FileSplitCreator; +import org.apache.doris.datasource.FileSplitter; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; @@ -40,6 +42,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPushAggOp; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -63,8 +66,8 @@ public class TVFScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, needCheckColumnPriv); + public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, needCheckColumnPriv, sv); table = (FunctionGenTable) this.desc.getTable(); tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf(); } @@ -126,16 +129,27 @@ public TableIf getTargetTable() { } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { List splits = Lists.newArrayList(); if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM) { return splits; } + List fileStatuses = tableValuedFunction.getFileStatuses(); + + // Push down count optimization. + boolean needSplit = true; + if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT) { + int parallelNum = sessionVariable.getParallelExecInstanceNum(); + int totalFileNum = fileStatuses.size(); + needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum); + } + for (TBrokerFileStatus fileStatus : fileStatuses) { Map prop = Maps.newHashMap(); try { - splits.addAll(splitFile(new LocationPath(fileStatus.getPath(), prop), fileStatus.getBlockSize(), + splits.addAll(FileSplitter.splitFile(new LocationPath(fileStatus.getPath(), prop), + getRealFileSplitSize(needSplit ? fileStatus.getBlockSize() : Long.MAX_VALUE), null, fileStatus.getSize(), fileStatus.getModificationTime(), fileStatus.isSplitable, null, FileSplitCreator.DEFAULT)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 7f4d23c61303b3c..28b14398c86deb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -196,6 +196,7 @@ import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TFetchOption; @@ -555,15 +556,16 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla ExternalTable table = fileScan.getTable(); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); + SessionVariable sv = ConnectContext.get().getSessionVariable(); // TODO(cmy): determine the needCheckColumnPriv param ScanNode scanNode; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { case ICEBERG: - scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); break; case HIVE: - scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); HiveScanNode hiveScanNode = (HiveScanNode) scanNode; hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions()); if (fileScan.getTableSample().isPresent()) { @@ -575,17 +577,16 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla throw new RuntimeException("do not support DLA type " + ((HMSExternalTable) table).getDlaType()); } } else if (table instanceof IcebergExternalTable) { - scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof PaimonExternalTable) { - scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false, - ConnectContext.get().getSessionVariable()); + scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof TrinoConnectorExternalTable) { - scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof MaxComputeExternalTable) { scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor, - fileScan.getSelectedPartitions(), false); + fileScan.getSelectedPartitions(), false, sv); } else if (table instanceof LakeSoulExternalTable) { - scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else { throw new RuntimeException("do not support table type " + table.getType()); } @@ -911,7 +912,8 @@ public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, Pl TupleDescriptor tupleDescriptor = generateTupleDesc(slots, tvfRelation.getFunction().getTable(), context); TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction(); - ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor); + SessionVariable sv = ConnectContext.get().getSessionVariable(); + ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv); scanNode.setNereidsId(tvfRelation.getId()); Utils.execWithUncheckedException(scanNode::init); context.getRuntimeTranslator().ifPresent( diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index ae5d562bd5bd306..f29ff4dba32b9ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -75,6 +75,7 @@ import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.thrift.TPushAggOp; @@ -1916,8 +1917,8 @@ private boolean canMigrateConjuncts(InlineViewRef inlineViewRef) { */ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt) throws UserException { - ScanNode scanNode = null; - + SessionVariable sv = ConnectContext.get().getSessionVariable(); + ScanNode scanNode; switch (tblRef.getTable().getType()) { case OLAP: case MATERIALIZED_VIEW: @@ -1955,7 +1956,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), false); break; case TABLE_VALUED_FUNCTION: - scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId()); + scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId(), sv); break; case HMS_EXTERNAL_TABLE: TableIf table = tblRef.getDesc().getTable(); @@ -1968,13 +1969,13 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s + "please set enable_nereids_planner = true to enable new optimizer"); } scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty(), ConnectContext.get().getSessionVariable()); + Optional.empty(), Optional.empty(), sv); break; case ICEBERG: - scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case HIVE: - scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); ((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample()); break; default: @@ -1982,17 +1983,16 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s } break; case ICEBERG_EXTERNAL_TABLE: - scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case PAIMON_EXTERNAL_TABLE: - scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - ConnectContext.get().getSessionVariable()); + scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case TRINO_CONNECTOR_EXTERNAL_TABLE: - scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case MAX_COMPUTE_EXTERNAL_TABLE: - scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case ES_EXTERNAL_TABLE: scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); @@ -2001,7 +2001,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case LAKESOUl_EXTERNAL_TABLE: - scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case TEST_EXTERNAL_TABLE: scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), tblRef.getDesc()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java index 629b410e676590a..66f344f03be0a32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java @@ -22,6 +22,7 @@ import org.apache.doris.planner.DataGenScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TDataGenFunctionName; import java.util.List; @@ -32,7 +33,7 @@ public abstract class DataGenTableValuedFunction extends TableValuedFunctionIf { public abstract TDataGenFunctionName getDataGenFunctionName(); @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { return new DataGenScanNode(id, desc, this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index e137d5d200cc842..6f45a1cc0eb03dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -52,6 +52,7 @@ import org.apache.doris.proto.Types.PTypeDesc; import org.apache.doris.proto.Types.PTypeNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; @@ -301,8 +302,8 @@ public TFileAttributes getFileAttributes() { } @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { - return new TVFScanNode(id, desc, false); + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { + return new TVFScanNode(id, desc, false, sv); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java index 3bd262f467d6a3e..324e17d4f24490f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java @@ -30,6 +30,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TFileType; import java.util.ArrayList; @@ -83,7 +84,7 @@ public List getTableColumns() throws AnalysisException { } @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { return new GroupCommitScanNode(id, desc, tableId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java index b884dab38824b23..a9847c7eadd3295 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java @@ -26,6 +26,7 @@ import org.apache.doris.datasource.jdbc.source.JdbcScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.SessionVariable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,7 +48,7 @@ public List getTableColumns() throws AnalysisException { } @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf; JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(), desc.getTable().getFullSchema(), TableType.JDBC); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index a7e25bc7f824453..7bd28f363e7523a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -22,6 +22,7 @@ import org.apache.doris.datasource.tvf.source.MetadataScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; @@ -60,7 +61,7 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co public abstract TMetaScanRange getMetaScanRange(); @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { return new MetadataScanNode(id, desc, this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java index cb0f51002290605..07a125836b75fd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java @@ -28,6 +28,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -87,5 +88,5 @@ public String getTableName() { public abstract List getTableColumns() throws AnalysisException; @Override - public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc); + public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index d4faa46019541c2..eb323a7667224af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -25,6 +25,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import java.util.List; import java.util.Map; @@ -88,7 +89,7 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map getTableColumns() throws AnalysisException; - public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc); + public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv); public void checkAuth(ConnectContext ctx) { diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out index f3b44964915230f..a394836625d751f 100644 --- a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out +++ b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out @@ -578,6 +578,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1157,6 +1177,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1736,6 +1776,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -2315,3 +2375,23 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy index 306af3b2cb28525..7a9e90a61fe2cb1 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy @@ -110,7 +110,7 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external } finally { sql """ set enable_count_push_down_for_external_table=true; """ - sql """drop catalog if exists ${catalog_name}""" + // sql """drop catalog if exists ${catalog_name}""" } } diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy index 41afb02e0f932d0..9668cbb0950c5dd 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -181,6 +181,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ def c108= """ select id from tb_with_upper_case where id = 1 """ def c109= """ select id from tb_with_upper_case where id < 1 """ + def c110 = """select count(*) from deletion_vector_orc;""" + def c111 = """select count(*) from deletion_vector_parquet;""" + def c112 = """select count(*) from deletion_vector_orc where id > 2;""" + def c113 = """select count(*) from deletion_vector_parquet where id > 2;""" + def c114 = """select * from deletion_vector_orc where id > 2;""" + def c115 = """select * from deletion_vector_parquet where id > 2;""" + String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") String catalog_name = "ctl_test_paimon_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -289,6 +296,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ qt_c107 c107 qt_c108 c108 qt_c109 c109 + + qt_c110 c110 + qt_c111 c111 + qt_c112 c112 + qt_c113 c113 + qt_c114 c114 + qt_c115 c115 } test_cases("false", "false")