Skip to content

Commit

Permalink
[fix](catalog) opt the count pushdown rule for iceberg/paimon/hive sc…
Browse files Browse the repository at this point in the history
…an node (#44038)

### What problem does this PR solve?

1. Opt the parallelism when doing count push down optimization

Count push down optimization is used to optimize queries such as `select
count(*) from table`.
In this scenario, we can directly obtain the number of rows through the
row count statistics
    of the external table, or the metadata of the Parquet/ORC file,
without reading the actual file content, thereby speeding up such
queries.

Currently, we support count push down optimization for Hive, Iceberg,
and Paimon tables.
    There are two ways to obtain the number of rows:

    1. Obtain directly from statistics

For Iceberg tables, we can obtain the number of rows directly from
statistics.
However, due to the historical issues of Iceberg, if there is
position/equality delete in the table,
        this method cannot be used to prevent incorrect row count.
In this case, it will degenerate to obtaining from the metadata of the
file.

    2. Obtain from the metadata of the file

For Hive, Paimon, and some of Iceberg tables, the number of rows can be
obtained directly
        from the metadata of the Parquet/ORC file.
For Text format tables, efficiency can also be improved by only
performing row separation, without column separation.

In the task splitting logic, for Count push-down optimization, the
number of split tasks should comprehensively
consider the file format, number of files, parallelism, number of BE
nodes, and the Local Shuffle:

1. Count push-down optimization should avoid Local Shuffle, so the
number of split tasks should be greater than or equal to `parallelism *
number of BE nodes`.

2. Fix the incorrect logic of Count push-down optimization

In the previous code, for Iceberg and Paimon tables, Count push-down
optimization did not take effect because we did not push
CountPushDown information to FileFormatReader inside TableForamtReader.
This PR fixes this problem.

3. Store SessionVaraible variables in FileQueryScanNode.

SessionVaraible is a variable in ConnectionContext. And
ConnectionContext is a ThreadLocal variable.
In FileQueryScanNode, SessionVaraible may be accessed in other threads
in some cases,
    so ThreadLocal variables may not be obtained.
Therefore, the SessionVaraible reference is stored in FileQueryScanNode
to prevent illegal access.

4. Independent FileSplitter class.

The FileSplitter class is a tool class that allows users to split
`Split` according to different strategies.
This PR does not modify the splitting strategy, but only extracts this
part of the logic separately,
    to be able to perform logic optimization later.
  • Loading branch information
morningman committed Dec 10, 2024
1 parent 328c127 commit 2efca61
Show file tree
Hide file tree
Showing 35 changed files with 450 additions and 228 deletions.
18 changes: 10 additions & 8 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,25 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
_remaining_push_down_count = range.table_format_params.iceberg_params.row_count;
_remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count;
} else {
_remaining_push_down_count = -1;
_remaining_table_level_row_count = -1;
}
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
// already get rows from be
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
auto rows =
std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size);
_remaining_push_down_count -= rows;
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
auto rows = std::min(_remaining_table_level_row_count,
(int64_t)_state->query_options().batch_size);
_remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
if (_remaining_push_down_count == 0) {
if (_remaining_table_level_row_count == 0) {
*eof = true;
}

Expand Down Expand Up @@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns(

Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
return Status::OK();
}

Expand All @@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOC
if (position_delete_files.size() > 0) {
RETURN_IF_ERROR(
_position_delete_base(table_desc.original_file_path, position_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
if (equality_delete_files.size() > 0) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}

COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ class IcebergTableReader : public TableFormatReader {
bool _has_schema_change = false;
bool _has_iceberg_schema = false;

int64_t _remaining_push_down_count;
// the table level row count for optimizing query like:
// select count(*) from table;
int64_t _remaining_table_level_row_count;
Fileformat _file_format = Fileformat::NONE;

const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext
return Status::OK();
}

// set push down agg type to NONE because we can not do count push down opt
// if there are delete files.
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
const auto& deletion_file = table_desc.deletion_file;
io::FileSystemProperties properties = {
.system_type = _params.file_type,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ Status VFileScanner::_get_next_reader() {
_should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
{
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
Expand Down Expand Up @@ -853,6 +856,7 @@ Status VFileScanner::_get_next_reader() {
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
unsupported_pushdown_types);
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
Expand Down Expand Up @@ -119,8 +120,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
analyzeJoin(analyzer);
}

public ScanNode getScanNode(PlanNodeId id) {
return tableFunction.getScanNode(id, desc);
public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) {
return tableFunction.getScanNode(id, desc, sv);
}

public TableValuedFunctionIf getTableFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected String brokerName;

protected TableSnapshot tableSnapshot;
// Save the reference of session variable, so that we don't need to get it from connection context.
// connection context is a thread local variable, it is not available is running in other thread.
protected SessionVariable sessionVariable;

/**
* External file scan node for Query hms table
Expand All @@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
StatisticalType statisticalType, boolean needCheckColumnPriv,
SessionVariable sv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
this.sessionVariable = sv;
}

@Override
Expand All @@ -112,7 +118,6 @@ public void init(Analyzer analyzer) throws UserException {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
}
super.init(analyzer);
initFileSplitSize();
doInitialize();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime();
Expand Down Expand Up @@ -314,6 +319,7 @@ public void createScanRangeLocations() throws UserException {
params.setProperties(locationProperties);
}

int numBackends = backendPolicy.numBackends();
List<String> pathPartitionKeys = getPathPartitionKeys();
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while scanning.
Expand Down Expand Up @@ -356,7 +362,7 @@ public void createScanRangeLocations() throws UserException {
scanBackendIds.add(backend.getId());
}
} else {
List<Split> inputSplits = getSplits();
List<Split> inputSplits = getSplits(numBackends);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
Expand Down Expand Up @@ -605,4 +611,19 @@ public TableSnapshot getQueryTableSnapshot() {
}
return this.tableSnapshot;
}

/**
* The real file split size is determined by:
* 1. If user specify the split size in session variable `file_split_size`, use user specified value.
* 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
* @param blockSize, got from file system, eg, hdfs
* @return the real file split size
*/
protected long getRealFileSplitSize(long blockSize) {
long realSplitSize = sessionVariable.getFileSplitSize();
if (realSplitSize <= 0) {
realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
}
return realSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanNode;
import org.apache.doris.thrift.TFileScanRangeParams;
Expand All @@ -46,11 +41,9 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -68,28 +61,13 @@ public abstract class FileScanNode extends ExternalScanNode {
// For explain
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
protected long fileSplitSize;
protected boolean isSplitSizeSetBySession = false;

public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
this.needCheckColumnPriv = needCheckColumnPriv;
}

@Override
public void init() throws UserException {
initFileSplitSize();
}

protected void initFileSplitSize() {
this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
this.isSplitSizeSetBySession = this.fileSplitSize > 0;
if (this.fileSplitSize <= 0) {
this.fileSplitSize = DEFAULT_SPLIT_SIZE;
}
}

@Override
protected void toThrift(TPlanNode planNode) {
planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
Expand Down Expand Up @@ -256,61 +234,4 @@ protected void setDefaultValueExprs(TableIf tbl,
}
}
}

protected List<Split> splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
List<Split> result = Lists.newArrayList();
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get());
if (!splittable || compressType != TFileCompressType.PLAIN) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path {} is not splittable.", path);
}
String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues));
return result;
}
// if file split size is set by session variable, use session variable.
// Otherwise, use max(file split size, block size)
if (!isSplitSizeSetBySession) {
fileSplitSize = Math.max(fileSplitSize, blockSize);
}
long bytesRemaining;
for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D;
bytesRemaining -= fileSplitSize) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize,
length, modificationTime, hosts, partitionValues));
}
if (bytesRemaining != 0L) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining,
length, modificationTime, hosts, partitionValues));
}

if (LOG.isDebugEnabled()) {
LOG.debug("Path {} includes {} splits.", path, result.size());
}
return result;
}

protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
if (blkLocations == null || blkLocations.length == 0) {
return -1;
}
for (int i = 0; i < blkLocations.length; ++i) {
if (blkLocations[i].getOffset() <= offset
&& offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length - 1];
long fileLength = last.getOffset() + last.getLength() - 1L;
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ public static class FileSplitCreator implements SplitCreator {

@Override
public Split create(LocationPath path, long start, long length, long fileLength,
long fileSplitSize,
long modificationTime, String[] hosts,
List<String> partitionValues) {
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
FileSplit split = new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
split.setTargetSplitSize(fileSplitSize);
return split;
}
}

Expand Down
Loading

0 comments on commit 2efca61

Please sign in to comment.