Skip to content

Commit

Permalink
[fix](catalog) opt the count pushdown rule for iceberg/paimon/hive sc…
Browse files Browse the repository at this point in the history
…an node (#44038) (#45224)

bp #44038
  • Loading branch information
morningman authored Dec 15, 2024
1 parent c804f5c commit af0c1ac
Show file tree
Hide file tree
Showing 35 changed files with 450 additions and 228 deletions.
18 changes: 10 additions & 8 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,25 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
_remaining_push_down_count = range.table_format_params.iceberg_params.row_count;
_remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count;
} else {
_remaining_push_down_count = -1;
_remaining_table_level_row_count = -1;
}
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
// already get rows from be
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
auto rows =
std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size);
_remaining_push_down_count -= rows;
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
auto rows = std::min(_remaining_table_level_row_count,
(int64_t)_state->query_options().batch_size);
_remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
if (_remaining_push_down_count == 0) {
if (_remaining_table_level_row_count == 0) {
*eof = true;
}

Expand Down Expand Up @@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns(

Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
return Status::OK();
}

Expand All @@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOC
if (position_delete_files.size() > 0) {
RETURN_IF_ERROR(
_position_delete_base(table_desc.original_file_path, position_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
if (equality_delete_files.size() > 0) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}

COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ class IcebergTableReader : public TableFormatReader {
bool _has_schema_change = false;
bool _has_iceberg_schema = false;

int64_t _remaining_push_down_count;
// the table level row count for optimizing query like:
// select count(*) from table;
int64_t _remaining_table_level_row_count;
Fileformat _file_format = Fileformat::NONE;

const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext
return Status::OK();
}

// set push down agg type to NONE because we can not do count push down opt
// if there are delete files.
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
const auto& deletion_file = table_desc.deletion_file;
io::FileSystemProperties properties = {
.system_type = _params.file_type,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ Status VFileScanner::_get_next_reader() {
_should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
{
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
Expand Down Expand Up @@ -853,6 +856,7 @@ Status VFileScanner::_get_next_reader() {
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
unsupported_pushdown_types);
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
Expand Down Expand Up @@ -119,8 +120,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
analyzeJoin(analyzer);
}

public ScanNode getScanNode(PlanNodeId id) {
return tableFunction.getScanNode(id, desc);
public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) {
return tableFunction.getScanNode(id, desc, sv);
}

public TableValuedFunctionIf getTableFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected String brokerName;

protected TableSnapshot tableSnapshot;
// Save the reference of session variable, so that we don't need to get it from connection context.
// connection context is a thread local variable, it is not available is running in other thread.
protected SessionVariable sessionVariable;

/**
* External file scan node for Query hms table
Expand All @@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
StatisticalType statisticalType, boolean needCheckColumnPriv,
SessionVariable sv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
this.sessionVariable = sv;
}

@Override
Expand All @@ -112,7 +118,6 @@ public void init(Analyzer analyzer) throws UserException {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
}
super.init(analyzer);
initFileSplitSize();
doInitialize();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime();
Expand Down Expand Up @@ -314,6 +319,7 @@ public void createScanRangeLocations() throws UserException {
params.setProperties(locationProperties);
}

int numBackends = backendPolicy.numBackends();
List<String> pathPartitionKeys = getPathPartitionKeys();
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while scanning.
Expand Down Expand Up @@ -356,7 +362,7 @@ public void createScanRangeLocations() throws UserException {
scanBackendIds.add(backend.getId());
}
} else {
List<Split> inputSplits = getSplits();
List<Split> inputSplits = getSplits(numBackends);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
Expand Down Expand Up @@ -605,4 +611,19 @@ public TableSnapshot getQueryTableSnapshot() {
}
return this.tableSnapshot;
}

/**
* The real file split size is determined by:
* 1. If user specify the split size in session variable `file_split_size`, use user specified value.
* 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
* @param blockSize, got from file system, eg, hdfs
* @return the real file split size
*/
protected long getRealFileSplitSize(long blockSize) {
long realSplitSize = sessionVariable.getFileSplitSize();
if (realSplitSize <= 0) {
realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
}
return realSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanNode;
import org.apache.doris.thrift.TFileScanRangeParams;
Expand All @@ -46,11 +41,9 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -68,28 +61,13 @@ public abstract class FileScanNode extends ExternalScanNode {
// For explain
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
protected long fileSplitSize;
protected boolean isSplitSizeSetBySession = false;

public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
this.needCheckColumnPriv = needCheckColumnPriv;
}

@Override
public void init() throws UserException {
initFileSplitSize();
}

protected void initFileSplitSize() {
this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
this.isSplitSizeSetBySession = this.fileSplitSize > 0;
if (this.fileSplitSize <= 0) {
this.fileSplitSize = DEFAULT_SPLIT_SIZE;
}
}

@Override
protected void toThrift(TPlanNode planNode) {
planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
Expand Down Expand Up @@ -256,61 +234,4 @@ protected void setDefaultValueExprs(TableIf tbl,
}
}
}

protected List<Split> splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
List<Split> result = Lists.newArrayList();
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get());
if (!splittable || compressType != TFileCompressType.PLAIN) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path {} is not splittable.", path);
}
String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues));
return result;
}
// if file split size is set by session variable, use session variable.
// Otherwise, use max(file split size, block size)
if (!isSplitSizeSetBySession) {
fileSplitSize = Math.max(fileSplitSize, blockSize);
}
long bytesRemaining;
for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D;
bytesRemaining -= fileSplitSize) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize,
length, modificationTime, hosts, partitionValues));
}
if (bytesRemaining != 0L) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining,
length, modificationTime, hosts, partitionValues));
}

if (LOG.isDebugEnabled()) {
LOG.debug("Path {} includes {} splits.", path, result.size());
}
return result;
}

protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
if (blkLocations == null || blkLocations.length == 0) {
return -1;
}
for (int i = 0; i < blkLocations.length; ++i) {
if (blkLocations[i].getOffset() <= offset
&& offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length - 1];
long fileLength = last.getOffset() + last.getLength() - 1L;
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ public static class FileSplitCreator implements SplitCreator {

@Override
public Split create(LocationPath path, long start, long length, long fileLength,
long fileSplitSize,
long modificationTime, String[] hosts,
List<String> partitionValues) {
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
FileSplit split = new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
split.setTargetSplitSize(fileSplitSize);
return split;
}
}

Expand Down
Loading

0 comments on commit af0c1ac

Please sign in to comment.