From 2efca61bafa49f98d4e743e1fd167417999e4344 Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Mon, 9 Dec 2024 19:46:13 -0800 Subject: [PATCH] [fix](catalog) opt the count pushdown rule for iceberg/paimon/hive scan node (#44038) ### What problem does this PR solve? 1. Opt the parallelism when doing count push down optimization Count push down optimization is used to optimize queries such as `select count(*) from table`. In this scenario, we can directly obtain the number of rows through the row count statistics of the external table, or the metadata of the Parquet/ORC file, without reading the actual file content, thereby speeding up such queries. Currently, we support count push down optimization for Hive, Iceberg, and Paimon tables. There are two ways to obtain the number of rows: 1. Obtain directly from statistics For Iceberg tables, we can obtain the number of rows directly from statistics. However, due to the historical issues of Iceberg, if there is position/equality delete in the table, this method cannot be used to prevent incorrect row count. In this case, it will degenerate to obtaining from the metadata of the file. 2. Obtain from the metadata of the file For Hive, Paimon, and some of Iceberg tables, the number of rows can be obtained directly from the metadata of the Parquet/ORC file. For Text format tables, efficiency can also be improved by only performing row separation, without column separation. In the task splitting logic, for Count push-down optimization, the number of split tasks should comprehensively consider the file format, number of files, parallelism, number of BE nodes, and the Local Shuffle: 1. Count push-down optimization should avoid Local Shuffle, so the number of split tasks should be greater than or equal to `parallelism * number of BE nodes`. 2. Fix the incorrect logic of Count push-down optimization In the previous code, for Iceberg and Paimon tables, Count push-down optimization did not take effect because we did not push CountPushDown information to FileFormatReader inside TableForamtReader. This PR fixes this problem. 3. Store SessionVaraible variables in FileQueryScanNode. SessionVaraible is a variable in ConnectionContext. And ConnectionContext is a ThreadLocal variable. In FileQueryScanNode, SessionVaraible may be accessed in other threads in some cases, so ThreadLocal variables may not be obtained. Therefore, the SessionVaraible reference is stored in FileQueryScanNode to prevent illegal access. 4. Independent FileSplitter class. The FileSplitter class is a tool class that allows users to split `Split` according to different strategies. This PR does not modify the splitting strategy, but only extracts this part of the logic separately, to be able to perform logic optimization later. --- .../vec/exec/format/table/iceberg_reader.cpp | 18 +-- be/src/vec/exec/format/table/iceberg_reader.h | 4 +- .../vec/exec/format/table/paimon_reader.cpp | 3 + be/src/vec/exec/scan/vfile_scanner.cpp | 4 + .../analysis/TableValuedFunctionRef.java | 5 +- .../doris/datasource/FileQueryScanNode.java | 27 ++++- .../apache/doris/datasource/FileScanNode.java | 79 ------------- .../apache/doris/datasource/FileSplit.java | 5 +- .../apache/doris/datasource/FileSplitter.java | 104 ++++++++++++++++++ .../doris/datasource/SplitAssignment.java | 2 +- .../apache/doris/datasource/SplitCreator.java | 2 +- .../doris/datasource/SplitGenerator.java | 5 +- .../datasource/hive/source/HiveScanNode.java | 52 ++++++--- .../datasource/hive/source/HiveSplit.java | 6 +- .../datasource/hudi/source/HudiScanNode.java | 8 +- .../iceberg/source/IcebergScanNode.java | 42 ++++--- .../iceberg/source/IcebergSplit.java | 11 +- .../lakesoul/source/LakeSoulScanNode.java | 7 +- .../maxcompute/source/MaxComputeScanNode.java | 28 ++--- .../paimon/source/PaimonScanNode.java | 65 ++++++----- .../datasource/paimon/source/PaimonSplit.java | 7 +- .../source/TrinoConnectorScanNode.java | 9 +- .../datasource/tvf/source/TVFScanNode.java | 22 +++- .../translator/PhysicalPlanTranslator.java | 20 ++-- .../doris/planner/SingleNodePlanner.java | 24 ++-- .../DataGenTableValuedFunction.java | 3 +- .../ExternalFileTableValuedFunction.java | 5 +- .../GroupCommitTableValuedFunction.java | 3 +- .../JdbcQueryTableValueFunction.java | 3 +- .../MetadataTableValuedFunction.java | 3 +- .../QueryTableValueFunction.java | 3 +- .../tablefunction/TableValuedFunctionIf.java | 3 +- .../paimon/test_paimon_catalog.out | 80 ++++++++++++++ .../test_iceberg_optimize_count.groovy | 2 +- .../paimon/test_paimon_catalog.groovy | 14 +++ 35 files changed, 450 insertions(+), 228 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 8f130ca6002d5d4..837269b0bb355d4 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -96,25 +96,25 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma _iceberg_profile.delete_rows_sort_time = ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); if (range.table_format_params.iceberg_params.__isset.row_count) { - _remaining_push_down_count = range.table_format_params.iceberg_params.row_count; + _remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count; } else { - _remaining_push_down_count = -1; + _remaining_table_level_row_count = -1; } } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { // already get rows from be - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { - auto rows = - std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); - _remaining_push_down_count -= rows; + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { + auto rows = std::min(_remaining_table_level_row_count, + (int64_t)_state->query_options().batch_size); + _remaining_table_level_row_count -= rows; auto mutate_columns = block->mutate_columns(); for (auto& col : mutate_columns) { col->resize(rows); } block->set_columns(std::move(mutate_columns)); *read_rows = rows; - if (_remaining_push_down_count == 0) { + if (_remaining_table_level_row_count == 0) { *eof = true; } @@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns( Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) { // We get the count value by doris's be, so we don't need to read the delete file - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { return Status::OK(); } @@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOC if (position_delete_files.size() > 0) { RETURN_IF_ERROR( _position_delete_base(table_desc.original_file_path, position_delete_files)); + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } if (equality_delete_files.size() > 0) { RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 2e240f465b6a2cd..ee7dcdd68d24fa6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -167,7 +167,9 @@ class IcebergTableReader : public TableFormatReader { bool _has_schema_change = false; bool _has_iceberg_schema = false; - int64_t _remaining_push_down_count; + // the table level row count for optimizing query like: + // select count(*) from table; + int64_t _remaining_table_level_row_count; Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index 263fdc8014bc36a..055d6179b2c4221 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext return Status::OK(); } + // set push down agg type to NONE because we can not do count push down opt + // if there are delete files. + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); const auto& deletion_file = table_desc.deletion_file; io::FileSystemProperties properties = { .system_type = _params.file_type, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 296e59f8df16e70..d53bb105c70600d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -788,6 +788,9 @@ Status VFileScanner::_get_next_reader() { _should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache() : nullptr, _state->query_options().enable_parquet_lazy_mat); + // ATTN: the push down agg type may be set back to NONE, + // see IcebergTableReader::init_row_filters for example. + parquet_reader->set_push_down_agg_type(_get_push_down_agg_type()); { SCOPED_TIMER(_open_reader_timer); RETURN_IF_ERROR(parquet_reader->open()); @@ -853,6 +856,7 @@ Status VFileScanner::_get_next_reader() { _profile, _state, *_params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat, unsupported_pushdown_types); + orc_reader->set_push_down_agg_type(_get_push_down_agg_type()); if (push_down_predicates) { RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java index b1e7c7c89e91f5b..9eacb8a0422695c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java @@ -26,6 +26,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.tablefunction.BackendsTableValuedFunction; import org.apache.doris.tablefunction.LocalTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionIf; @@ -119,8 +120,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException { analyzeJoin(analyzer); } - public ScanNode getScanNode(PlanNodeId id) { - return tableFunction.getScanNode(id, desc); + public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) { + return tableFunction.getScanNode(id, desc, sv); } public TableValuedFunctionIf getTableFunction() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 5ea2a2637d86e27..4a071fa6682c579 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -39,6 +39,7 @@ import org.apache.doris.datasource.hive.source.HiveSplit; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; @@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode { protected String brokerName; protected TableSnapshot tableSnapshot; + // Save the reference of session variable, so that we don't need to get it from connection context. + // connection context is a thread local variable, it is not available is running in other thread. + protected SessionVariable sessionVariable; /** * External file scan node for Query hms table @@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends FileScanNode { * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv) { + StatisticalType statisticalType, boolean needCheckColumnPriv, + SessionVariable sv) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + this.sessionVariable = sv; } @Override @@ -112,7 +118,6 @@ public void init(Analyzer analyzer) throws UserException { ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime(); } super.init(analyzer); - initFileSplitSize(); doInitialize(); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime(); @@ -314,6 +319,7 @@ public void createScanRangeLocations() throws UserException { params.setProperties(locationProperties); } + int numBackends = backendPolicy.numBackends(); List pathPartitionKeys = getPathPartitionKeys(); if (isBatchMode()) { // File splits are generated lazily, and fetched by backends while scanning. @@ -356,7 +362,7 @@ public void createScanRangeLocations() throws UserException { scanBackendIds.add(backend.getId()); } } else { - List inputSplits = getSplits(); + List inputSplits = getSplits(numBackends); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); } @@ -605,4 +611,19 @@ public TableSnapshot getQueryTableSnapshot() { } return this.tableSnapshot; } + + /** + * The real file split size is determined by: + * 1. If user specify the split size in session variable `file_split_size`, use user specified value. + * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size. + * @param blockSize, got from file system, eg, hdfs + * @return the real file split size + */ + protected long getRealFileSplitSize(long blockSize) { + long realSplitSize = sessionVariable.getFileSplitSize(); + if (realSplitSize <= 0) { + realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize); + } + return realSplitSize; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 29fdb2b09acfcd6..b7d343123133086 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -25,15 +25,10 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.LocationPath; -import org.apache.doris.common.util.Util; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanNode; import org.apache.doris.thrift.TFileScanRangeParams; @@ -46,11 +41,9 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import org.apache.hadoop.fs.BlockLocation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -68,8 +61,6 @@ public abstract class FileScanNode extends ExternalScanNode { // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; - protected long fileSplitSize; - protected boolean isSplitSizeSetBySession = false; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -77,19 +68,6 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St this.needCheckColumnPriv = needCheckColumnPriv; } - @Override - public void init() throws UserException { - initFileSplitSize(); - } - - protected void initFileSplitSize() { - this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); - this.isSplitSizeSetBySession = this.fileSplitSize > 0; - if (this.fileSplitSize <= 0) { - this.fileSplitSize = DEFAULT_SPLIT_SIZE; - } - } - @Override protected void toThrift(TPlanNode planNode) { planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); @@ -256,61 +234,4 @@ protected void setDefaultValueExprs(TableIf tbl, } } } - - protected List splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length, - long modificationTime, boolean splittable, List partitionValues, SplitCreator splitCreator) - throws IOException { - if (blockLocations == null) { - blockLocations = new BlockLocation[0]; - } - List result = Lists.newArrayList(); - TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get()); - if (!splittable || compressType != TFileCompressType.PLAIN) { - if (LOG.isDebugEnabled()) { - LOG.debug("Path {} is not splittable.", path); - } - String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); - result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues)); - return result; - } - // if file split size is set by session variable, use session variable. - // Otherwise, use max(file split size, block size) - if (!isSplitSizeSetBySession) { - fileSplitSize = Math.max(fileSplitSize, blockSize); - } - long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; - bytesRemaining -= fileSplitSize) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize, - length, modificationTime, hosts, partitionValues)); - } - if (bytesRemaining != 0L) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, - length, modificationTime, hosts, partitionValues)); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Path {} includes {} splits.", path, result.size()); - } - return result; - } - - protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { - if (blkLocations == null || blkLocations.length == 0) { - return -1; - } - for (int i = 0; i < blkLocations.length; ++i) { - if (blkLocations[i].getOffset() <= offset - && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) { - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1L; - throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java index 1ebb390e90438f7..37e66c7056fbd79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java @@ -87,9 +87,12 @@ public static class FileSplitCreator implements SplitCreator { @Override public Split create(LocationPath path, long start, long length, long fileLength, + long fileSplitSize, long modificationTime, String[] hosts, List partitionValues) { - return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); + FileSplit split = new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); + split.setTargetSplitSize(fileSplitSize); + return split; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java new file mode 100644 index 000000000000000..b923c87d3ac96d1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.common.util.Util; +import org.apache.doris.spi.Split; +import org.apache.doris.thrift.TFileCompressType; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.List; + +public class FileSplitter { + private static final Logger LOG = LogManager.getLogger(FileSplitter.class); + + // If the number of files is larger than parallel instances * num of backends, + // we don't need to split the file. + // Otherwise, split the file to avoid local shuffle. + public static boolean needSplitForCountPushdown(int parallelism, int numBackends, long totalFileNum) { + return totalFileNum < parallelism * numBackends; + } + + public static List splitFile( + LocationPath path, + long fileSplitSize, + BlockLocation[] blockLocations, + long length, + long modificationTime, + boolean splittable, + List partitionValues, + SplitCreator splitCreator) + throws IOException { + if (blockLocations == null) { + blockLocations = new BlockLocation[0]; + } + List result = Lists.newArrayList(); + TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get()); + if (!splittable || compressType != TFileCompressType.PLAIN) { + if (LOG.isDebugEnabled()) { + LOG.debug("Path {} is not splittable.", path); + } + String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); + result.add(splitCreator.create(path, 0, length, length, fileSplitSize, + modificationTime, hosts, partitionValues)); + return result; + } + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; + bytesRemaining -= fileSplitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize, + length, fileSplitSize, modificationTime, hosts, partitionValues)); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, + length, fileSplitSize, modificationTime, hosts, partitionValues)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Path {} includes {} splits.", path, result.size()); + } + return result; + } + + private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { + if (blkLocations == null || blkLocations.length == 0) { + return -1; + } + for (int i = 0; i < blkLocations.length; ++i) { + if (blkLocations[i].getOffset() <= offset + && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1L; + throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); + } + + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java index 928854b91d18102..a26abc7fc5e037f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java @@ -69,7 +69,7 @@ public SplitAssignment( } public void init() throws UserException { - splitGenerator.startSplit(); + splitGenerator.startSplit(backendPolicy.numBackends()); synchronized (assignLock) { while (sampleSplit == null && waitFirstSplit()) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java index 4df30459db7021c..6df84d2f0f5ee94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java @@ -23,6 +23,6 @@ import java.util.List; public interface SplitCreator { - Split create(LocationPath path, long start, long length, long fileLength, + Split create(LocationPath path, long start, long length, long fileLength, long fileSplitSize, long modificationTime, String[] hosts, List partitionValues); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java index c4a373bc85b3b12..34ff3911445bfeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java @@ -30,8 +30,9 @@ public interface SplitGenerator { /** * Get all file splits if the producer doesn't support batch mode. + * @param numBackends the number of backends, this is useful when determine the number of splits. */ - default List getSplits() throws UserException { + default List getSplits(int numBackends) throws UserException { // todo: remove this interface if batch mode is stable throw new NotImplementedException("Not implement"); } @@ -51,7 +52,7 @@ default int numApproximateSplits() { return -1; } - default void startSplit() { + default void startSplit(int numBackends) { } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 99d3cd1cd216226..35b21c368ea9f92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -32,6 +32,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.FileSplitter; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -44,12 +45,14 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TPushAggOp; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -98,15 +101,13 @@ public class HiveScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv); - hmsTable = (HMSExternalTable) desc.getTable(); - brokerName = hmsTable.getCatalog().bindBrokerName(); + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv); } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv) { - super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); } @@ -163,7 +164,7 @@ protected List getPartitions() throws AnalysisException { } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { long start = System.currentTimeMillis(); try { if (!partitionInit) { @@ -174,7 +175,7 @@ public List getSplits() throws UserException { .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); List allFiles = Lists.newArrayList(); - getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName); + getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName, numBackends); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime(); } @@ -193,7 +194,7 @@ public List getSplits() throws UserException { } @Override - public void startSplit() { + public void startSplit(int numBackends) { if (prunedPartitions.isEmpty()) { splitAssignment.finishSchedule(); return; @@ -214,12 +215,12 @@ public void startSplit() { try { List allFiles = Lists.newArrayList(); getFileSplitByPartitions( - cache, Collections.singletonList(partition), allFiles, bindBrokerName); + cache, Collections.singletonList(partition), allFiles, bindBrokerName, numBackends); if (allFiles.size() > numSplitsPerPartition.get()) { numSplitsPerPartition.set(allFiles.size()); } splitAssignment.addToQueue(allFiles); - } catch (IOException e) { + } catch (Exception e) { batchException.set(new UserException(e.getMessage(), e)); } finally { splittersOnFlight.release(); @@ -263,7 +264,7 @@ public int numApproximateSplits() { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles, String bindBrokerName) throws IOException { + List allFiles, String bindBrokerName, int numBackends) throws IOException, UserException { List fileCaches; if (hiveTransaction != null) { fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); @@ -276,11 +277,34 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, List hiveFileStatuses) throws IOException { for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) { - allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(), + allFiles.addAll(FileSplitter.splitFile(status.getPath(), getRealFileSplitSize(status.getBlockSize()), status.getBlockLocations(), status.getLength(), status.getModificationTime(), status.isSplittable(), status.getPartitionValues(), new HiveSplitCreator(status.getAcidInfo()))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java index 5dd63e734c9c911..58bfb32e617e340 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java @@ -54,9 +54,13 @@ public HiveSplitCreator(AcidInfo acidInfo) { @Override public Split create(LocationPath path, long start, long length, long fileLength, + long fileSplitSize, long modificationTime, String[] hosts, List partitionValues) { - return new HiveSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues, acidInfo); + HiveSplit split = new HiveSplit(path, start, length, fileLength, modificationTime, + hosts, partitionValues, acidInfo); + split.setTargetSplitSize(fileSplitSize); + return split; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index b2cad8ab710178f..e1dfaa40aefce19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -125,8 +125,8 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, Optional scanParams, Optional incrementalRelation, - SessionVariable sessionVariable) { - super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); + SessionVariable sv) { + super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, sv); isCowTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { if (isCowTable) { @@ -390,7 +390,7 @@ private void getPartitionsSplits(List partitions, List spl } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) { return getIncrementalSplits(); } @@ -406,7 +406,7 @@ public List getSplits() throws UserException { } @Override - public void startSplit() { + public void startSplit(int numBackends) { if (prunedPartitions.isEmpty()) { splitAssignment.finishSchedule(); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index f7b58158d1a72c9..c78140b9d3cd995 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -36,7 +36,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -87,7 +87,13 @@ public class IcebergScanNode extends FileQueryScanNode { private IcebergSource source; private Table icebergTable; private List pushdownIcebergPredicates = Lists.newArrayList(); - private boolean pushDownCount = false; + // If tableLevelPushDownCount is true, means we can do count push down opt at table level. + // which means all splits have no position/equality delete files, + // so for query like "select count(*) from ice_tbl", we can get count from snapshot row count info directly. + // If tableLevelPushDownCount is false, means we can't do count push down opt at table level, + // But for part of splits which have no position/equality delete files, we can still do count push down opt. + // And for split level count push down opt, the flag is set in each split. + private boolean tableLevelPushDownCount = false; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; /** @@ -96,8 +102,8 @@ public class IcebergScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv); + public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv, sv); ExternalTable table = (ExternalTable) desc.getTable(); if (table instanceof HMSExternalTable) { @@ -140,8 +146,8 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli int formatVersion = icebergSplit.getFormatVersion(); fileDesc.setFormatVersion(formatVersion); fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); - if (pushDownCount) { - fileDesc.setRowCount(icebergSplit.getRowCount()); + if (tableLevelPushDownCount) { + fileDesc.setRowCount(icebergSplit.getTableLevelRowCount()); } if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); @@ -177,11 +183,12 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli } @Override - public List getSplits() throws UserException { - return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), this::doGetSplits); + public List getSplits(int numBackends) throws UserException { + return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), + () -> doGetSplits(numBackends)); } - private List doGetSplits() throws UserException { + private List doGetSplits(int numBackends) throws UserException { TableScan scan = icebergTable.newScan(); // set snapshot @@ -209,9 +216,10 @@ private List doGetSplits() throws UserException { HashSet partitionPathSet = new HashSet<>(); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); - CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize); + long realFileSplitSize = getRealFileSplitSize(0); + CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize); try (CloseableIterable combinedScanTasks = - TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) { + TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { List partitionValues = new ArrayList<>(); if (isPartitionedTable) { @@ -250,6 +258,7 @@ private List doGetSplits() throws UserException { source.getCatalog().getProperties(), partitionValues, originalPath); + split.setTargetSplitSize(realFileSplitSize); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } @@ -268,11 +277,11 @@ private List doGetSplits() throws UserException { } long countFromSnapshot = getCountFromSnapshot(); if (countFromSnapshot >= 0) { - pushDownCount = true; + tableLevelPushDownCount = true; List pushDownCountSplits; if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) { - int parallelNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); - pushDownCountSplits = splits.subList(0, Math.min(splits.size(), parallelNum)); + int minSplits = sessionVariable.getParallelExecInstanceNum() * numBackends; + pushDownCountSplits = splits.subList(0, Math.min(splits.size(), minSplits)); } else { pushDownCountSplits = Collections.singletonList(splits.get(0)); } @@ -282,7 +291,6 @@ private List doGetSplits() throws UserException { } selectedPartitionNum = partitionPathSet.size(); - splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } @@ -422,8 +430,8 @@ private void assignCountToSplits(List splits, long totalCount) { int size = splits.size(); long countPerSplit = totalCount / size; for (int i = 0; i < size - 1; i++) { - ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit); + ((IcebergSplit) splits.get(i)).setTableLevelRowCount(countPerSplit); } - ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size); + ((IcebergSplit) splits.get(size - 1)).setTableLevelRowCount(countPerSplit + totalCount % size); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 580d3cf1bb23f38..0520612935a7781 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -37,7 +37,8 @@ public class IcebergSplit extends FileSplit { private Integer formatVersion; private List deleteFileFilters; private Map config; - private long rowCount = -1; + // tableLevelRowCount will be set only table-level count push down opt is available. + private long tableLevelRowCount = -1; // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, @@ -50,14 +51,6 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength, this.selfSplitWeight = length; } - public long getRowCount() { - return rowCount; - } - - public void setRowCount(long rowCount) { - this.rowCount = rowCount; - } - public void setDeleteFileFilters(List deleteFileFilters) { this.deleteFileFilters = deleteFileFilters; this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java index a7311055cffcc9b..80f037239ce3f25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.property.constants.MinioProperties; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; @@ -80,8 +81,8 @@ public class LakeSoulScanNode extends FileQueryScanNode { String readType; - public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv); + public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv, sv); } @Override @@ -209,7 +210,7 @@ private void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulS rangeDesc.setTableFormatParams(tableFormatFileDesc); } - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { if (LOG.isDebugEnabled()) { LOG.debug("getSplits with columnFilters={}", columnFilters); LOG.debug("getSplits with columnNameToRange={}", columnNameToRange); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index 9fa22a0fffaab8b..e4bb8b5e9dcc533 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -44,7 +44,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.nereids.util.DateUtils; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; @@ -107,21 +107,23 @@ public class MaxComputeScanNode extends FileQueryScanNode { // For new planner public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, - SelectedPartitions selectedPartitions, boolean needCheckColumnPriv) { + SelectedPartitions selectedPartitions, boolean needCheckColumnPriv, + SessionVariable sv) { this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, - selectedPartitions, needCheckColumnPriv); + selectedPartitions, needCheckColumnPriv, sv); } // For old planner - public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + SessionVariable sv) { this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, - SelectedPartitions.NOT_PRUNED, needCheckColumnPriv); + SelectedPartitions.NOT_PRUNED, needCheckColumnPriv, sv); } private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, SelectedPartitions selectedPartitions, - boolean needCheckColumnPriv) { - super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv); table = (MaxComputeExternalTable) desc.getTable(); this.selectedPartitions = selectedPartitions; } @@ -214,7 +216,7 @@ public boolean isBatchMode() { return false; } - int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + int numPartitions = sessionVariable.getNumPartitionsInBatchMode(); return numPartitions > 0 && selectedPartitions != SelectedPartitions.NOT_PRUNED && selectedPartitions.selectedPartitions.size() >= numPartitions; @@ -226,7 +228,7 @@ public int numApproximateSplits() { } @Override - public void startSplit() { + public void startSplit(int numBackends) { this.totalPartitionNum = selectedPartitions.totalPartitionNum; this.selectedPartitionNum = selectedPartitions.selectedPartitions.size(); @@ -241,8 +243,7 @@ public void startSplit() { (key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key)) ); - - int batchNumPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + int batchNumPartitions = sessionVariable.getNumPartitionsInBatchMode(); Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor(); AtomicReference batchException = new AtomicReference<>(null); @@ -546,7 +547,7 @@ protected Map getLocationProperties() throws UserException { return new HashMap<>(); } - List getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws java.io.IOException { + private List getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws IOException { List result = new ArrayList<>(); String scanSessionSerialize = serializeSession(tableBatchReadSession); InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner(); @@ -595,9 +596,8 @@ List getSplitByTableSession(TableBatchReadSession tableBatchReadSession) return result; } - @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { List result = new ArrayList<>(); com.aliyun.odps.Table odpsTable = table.getOdpsTable(); if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 5009ec3c9049fb6..28efbc58f51b84e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -25,6 +25,7 @@ import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.FileSplitter; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.planner.PlanNodeId; @@ -36,6 +37,7 @@ import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TPaimonDeletionFileDesc; import org.apache.doris.thrift.TPaimonFileDesc; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; @@ -101,15 +103,16 @@ public String toString() { private int rawFileSplitNum = 0; private int paimonSplitNum = 0; private List splitStats = new ArrayList<>(); - private SessionVariable sessionVariable; private String serializedTable; + private boolean pushDownCount = false; + private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; + public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, - SessionVariable sessionVariable) { - super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); - this.sessionVariable = sessionVariable; + SessionVariable sv) { + super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv, sv); } @Override @@ -199,7 +202,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { boolean forceJniScanner = sessionVariable.isForceJniScanner(); SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType .valueOf(sessionVariable.getIgnoreSplitType()); @@ -211,6 +214,8 @@ public List getSplits() throws UserException { List paimonSplits = readBuilder.withFilter(predicates) .withProjection(projected) .newScan().plan().splits(); + + boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT; // Just for counting the number of selected partitions for this paimon table Set selectedPartitionValues = Sets.newHashSet(); for (org.apache.paimon.table.source.Split split : paimonSplits) { @@ -238,9 +243,9 @@ public List getSplits() throws UserException { LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties()); try { - List dorisSplits = splitFile( + List dorisSplits = FileSplitter.splitFile( locationPath, - 0, + getRealFileSplitSize(0), null, file.length(), -1, @@ -261,25 +266,7 @@ public List getSplits() throws UserException { } } } else { - for (RawFile file : rawFiles) { - LocationPath locationPath = new LocationPath(file.path(), - source.getCatalog().getProperties()); - try { - splits.addAll( - splitFile( - locationPath, - 0, - null, - file.length(), - -1, - true, - null, - PaimonSplit.PaimonSplitCreator.DEFAULT)); - ++rawFileSplitNum; - } catch (IOException e) { - throw new UserException("Paimon error to split file: " + e.getMessage(), e); - } - } + createRawFileSplits(rawFiles, splits, applyCountPushdown ? Long.MAX_VALUE : 0); } } else { if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { @@ -297,14 +284,34 @@ public List getSplits() throws UserException { } splitStats.add(splitStat); } + this.selectedPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number - // We should set fileSplitSize at the end because fileSplitSize may be modified - // in splitFile. - splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } + private void createRawFileSplits(List rawFiles, List splits, long blockSize) throws UserException { + for (RawFile file : rawFiles) { + LocationPath locationPath = new LocationPath(file.path(), + source.getCatalog().getProperties()); + try { + splits.addAll( + FileSplitter.splitFile( + locationPath, + getRealFileSplitSize(blockSize), + null, + file.length(), + -1, + true, + null, + PaimonSplit.PaimonSplitCreator.DEFAULT)); + ++rawFileSplitNum; + } catch (IOException e) { + throw new UserException("Paimon error to split file: " + e.getMessage(), e); + } + } + } + private String getFileFormat(String path) { return FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index 3ab38c7db28e9e8..988f043ad0e7d7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -38,7 +38,6 @@ public class PaimonSplit extends FileSplit { private TableFormatType tableFormatType; private Optional optDeletionFile; - public PaimonSplit(Split split) { super(DUMMY_PATH, 0, 0, 0, 0, null, null); this.split = split; @@ -100,10 +99,14 @@ public org.apache.doris.spi.Split create(LocationPath path, long start, long length, long fileLength, + long fileSplitSize, long modificationTime, String[] hosts, List partitionValues) { - return new PaimonSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); + PaimonSplit split = new PaimonSplit(path, start, length, fileLength, + modificationTime, hosts, partitionValues); + split.setTargetSplitSize(fileSplitSize); + return split; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java index 6f660993d6350ab..50c1d5752a1fe30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileAttributes; @@ -97,8 +98,10 @@ public class TrinoConnectorScanNode extends FileQueryScanNode { private ConnectorMetadata connectorMetadata; private Constraint constraint; - public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv); + public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + SessionVariable sv) { + super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv, + sv); } @Override @@ -129,7 +132,7 @@ protected void convertPredicate() { } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { // 1. Get necessary objects Connector connector = source.getConnector(); connectorMetadata = source.getConnectorMetadata(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index b0f0406c215c340..5e65093036517e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -29,7 +29,9 @@ import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.FileSplit.FileSplitCreator; +import org.apache.doris.datasource.FileSplitter; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; @@ -40,6 +42,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPushAggOp; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -63,8 +66,8 @@ public class TVFScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { - super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, needCheckColumnPriv); + public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { + super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, needCheckColumnPriv, sv); table = (FunctionGenTable) this.desc.getTable(); tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf(); } @@ -126,16 +129,27 @@ public TableIf getTargetTable() { } @Override - public List getSplits() throws UserException { + public List getSplits(int numBackends) throws UserException { List splits = Lists.newArrayList(); if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM) { return splits; } + List fileStatuses = tableValuedFunction.getFileStatuses(); + + // Push down count optimization. + boolean needSplit = true; + if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT) { + int parallelNum = sessionVariable.getParallelExecInstanceNum(); + int totalFileNum = fileStatuses.size(); + needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum); + } + for (TBrokerFileStatus fileStatus : fileStatuses) { Map prop = Maps.newHashMap(); try { - splits.addAll(splitFile(new LocationPath(fileStatus.getPath(), prop), fileStatus.getBlockSize(), + splits.addAll(FileSplitter.splitFile(new LocationPath(fileStatus.getPath(), prop), + getRealFileSplitSize(needSplit ? fileStatus.getBlockSize() : Long.MAX_VALUE), null, fileStatus.getSize(), fileStatus.getModificationTime(), fileStatus.isSplitable, null, FileSplitCreator.DEFAULT)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 7f4d23c61303b3c..28b14398c86deb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -196,6 +196,7 @@ import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TFetchOption; @@ -555,15 +556,16 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla ExternalTable table = fileScan.getTable(); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); + SessionVariable sv = ConnectContext.get().getSessionVariable(); // TODO(cmy): determine the needCheckColumnPriv param ScanNode scanNode; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { case ICEBERG: - scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); break; case HIVE: - scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); HiveScanNode hiveScanNode = (HiveScanNode) scanNode; hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions()); if (fileScan.getTableSample().isPresent()) { @@ -575,17 +577,16 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla throw new RuntimeException("do not support DLA type " + ((HMSExternalTable) table).getDlaType()); } } else if (table instanceof IcebergExternalTable) { - scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof PaimonExternalTable) { - scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false, - ConnectContext.get().getSessionVariable()); + scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof TrinoConnectorExternalTable) { - scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof MaxComputeExternalTable) { scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor, - fileScan.getSelectedPartitions(), false); + fileScan.getSelectedPartitions(), false, sv); } else if (table instanceof LakeSoulExternalTable) { - scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else { throw new RuntimeException("do not support table type " + table.getType()); } @@ -911,7 +912,8 @@ public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, Pl TupleDescriptor tupleDescriptor = generateTupleDesc(slots, tvfRelation.getFunction().getTable(), context); TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction(); - ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor); + SessionVariable sv = ConnectContext.get().getSessionVariable(); + ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv); scanNode.setNereidsId(tvfRelation.getId()); Utils.execWithUncheckedException(scanNode::init); context.getRuntimeTranslator().ifPresent( diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index ae5d562bd5bd306..f29ff4dba32b9ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -75,6 +75,7 @@ import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.thrift.TPushAggOp; @@ -1916,8 +1917,8 @@ private boolean canMigrateConjuncts(InlineViewRef inlineViewRef) { */ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt) throws UserException { - ScanNode scanNode = null; - + SessionVariable sv = ConnectContext.get().getSessionVariable(); + ScanNode scanNode; switch (tblRef.getTable().getType()) { case OLAP: case MATERIALIZED_VIEW: @@ -1955,7 +1956,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), false); break; case TABLE_VALUED_FUNCTION: - scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId()); + scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId(), sv); break; case HMS_EXTERNAL_TABLE: TableIf table = tblRef.getDesc().getTable(); @@ -1968,13 +1969,13 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s + "please set enable_nereids_planner = true to enable new optimizer"); } scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty(), ConnectContext.get().getSessionVariable()); + Optional.empty(), Optional.empty(), sv); break; case ICEBERG: - scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case HIVE: - scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); ((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample()); break; default: @@ -1982,17 +1983,16 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s } break; case ICEBERG_EXTERNAL_TABLE: - scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case PAIMON_EXTERNAL_TABLE: - scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - ConnectContext.get().getSessionVariable()); + scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case TRINO_CONNECTOR_EXTERNAL_TABLE: - scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case MAX_COMPUTE_EXTERNAL_TABLE: - scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case ES_EXTERNAL_TABLE: scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); @@ -2001,7 +2001,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case LAKESOUl_EXTERNAL_TABLE: - scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case TEST_EXTERNAL_TABLE: scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), tblRef.getDesc()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java index 629b410e676590a..66f344f03be0a32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java @@ -22,6 +22,7 @@ import org.apache.doris.planner.DataGenScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TDataGenFunctionName; import java.util.List; @@ -32,7 +33,7 @@ public abstract class DataGenTableValuedFunction extends TableValuedFunctionIf { public abstract TDataGenFunctionName getDataGenFunctionName(); @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { return new DataGenScanNode(id, desc, this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index e137d5d200cc842..6f45a1cc0eb03dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -52,6 +52,7 @@ import org.apache.doris.proto.Types.PTypeDesc; import org.apache.doris.proto.Types.PTypeNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; @@ -301,8 +302,8 @@ public TFileAttributes getFileAttributes() { } @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { - return new TVFScanNode(id, desc, false); + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { + return new TVFScanNode(id, desc, false, sv); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java index 3bd262f467d6a3e..324e17d4f24490f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java @@ -30,6 +30,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TFileType; import java.util.ArrayList; @@ -83,7 +84,7 @@ public List getTableColumns() throws AnalysisException { } @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { return new GroupCommitScanNode(id, desc, tableId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java index b884dab38824b23..a9847c7eadd3295 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java @@ -26,6 +26,7 @@ import org.apache.doris.datasource.jdbc.source.JdbcScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.SessionVariable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,7 +48,7 @@ public List getTableColumns() throws AnalysisException { } @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf; JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(), desc.getTable().getFullSchema(), TableType.JDBC); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index a7e25bc7f824453..7bd28f363e7523a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -22,6 +22,7 @@ import org.apache.doris.datasource.tvf.source.MetadataScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; @@ -60,7 +61,7 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co public abstract TMetaScanRange getMetaScanRange(); @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { return new MetadataScanNode(id, desc, this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java index cb0f51002290605..07a125836b75fd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java @@ -28,6 +28,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -87,5 +88,5 @@ public String getTableName() { public abstract List getTableColumns() throws AnalysisException; @Override - public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc); + public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index d4faa46019541c2..eb323a7667224af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -25,6 +25,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import java.util.List; import java.util.Map; @@ -88,7 +89,7 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map getTableColumns() throws AnalysisException; - public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc); + public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv); public void checkAuth(ConnectContext ctx) { diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out index f3b44964915230f..a394836625d751f 100644 --- a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out +++ b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out @@ -578,6 +578,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1157,6 +1177,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1736,6 +1776,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -2315,3 +2375,23 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy index 306af3b2cb28525..7a9e90a61fe2cb1 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy @@ -110,7 +110,7 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external } finally { sql """ set enable_count_push_down_for_external_table=true; """ - sql """drop catalog if exists ${catalog_name}""" + // sql """drop catalog if exists ${catalog_name}""" } } diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy index 41afb02e0f932d0..9668cbb0950c5dd 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -181,6 +181,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ def c108= """ select id from tb_with_upper_case where id = 1 """ def c109= """ select id from tb_with_upper_case where id < 1 """ + def c110 = """select count(*) from deletion_vector_orc;""" + def c111 = """select count(*) from deletion_vector_parquet;""" + def c112 = """select count(*) from deletion_vector_orc where id > 2;""" + def c113 = """select count(*) from deletion_vector_parquet where id > 2;""" + def c114 = """select * from deletion_vector_orc where id > 2;""" + def c115 = """select * from deletion_vector_parquet where id > 2;""" + String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") String catalog_name = "ctl_test_paimon_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -289,6 +296,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ qt_c107 c107 qt_c108 c108 qt_c109 c109 + + qt_c110 c110 + qt_c111 c111 + qt_c112 c112 + qt_c113 c113 + qt_c114 c114 + qt_c115 c115 } test_cases("false", "false")