Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](catalog) opt the count pushdown rule for iceberg/paimon/hive scan node (#44038) #45224

Merged
merged 1 commit into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,25 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
_remaining_push_down_count = range.table_format_params.iceberg_params.row_count;
_remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count;
} else {
_remaining_push_down_count = -1;
_remaining_table_level_row_count = -1;
}
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
// already get rows from be
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
auto rows =
std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size);
_remaining_push_down_count -= rows;
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
auto rows = std::min(_remaining_table_level_row_count,
(int64_t)_state->query_options().batch_size);
_remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
if (_remaining_push_down_count == 0) {
if (_remaining_table_level_row_count == 0) {
*eof = true;
}

Expand Down Expand Up @@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns(

Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
return Status::OK();
}

Expand All @@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOC
if (position_delete_files.size() > 0) {
RETURN_IF_ERROR(
_position_delete_base(table_desc.original_file_path, position_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
if (equality_delete_files.size() > 0) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}

COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ class IcebergTableReader : public TableFormatReader {
bool _has_schema_change = false;
bool _has_iceberg_schema = false;

int64_t _remaining_push_down_count;
// the table level row count for optimizing query like:
// select count(*) from table;
int64_t _remaining_table_level_row_count;
Fileformat _file_format = Fileformat::NONE;

const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext
return Status::OK();
}

// set push down agg type to NONE because we can not do count push down opt
// if there are delete files.
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
const auto& deletion_file = table_desc.deletion_file;
io::FileSystemProperties properties = {
.system_type = _params.file_type,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ Status VFileScanner::_get_next_reader() {
_should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
{
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
Expand Down Expand Up @@ -853,6 +856,7 @@ Status VFileScanner::_get_next_reader() {
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
unsupported_pushdown_types);
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
Expand Down Expand Up @@ -119,8 +120,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
analyzeJoin(analyzer);
}

public ScanNode getScanNode(PlanNodeId id) {
return tableFunction.getScanNode(id, desc);
public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) {
return tableFunction.getScanNode(id, desc, sv);
}

public TableValuedFunctionIf getTableFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected String brokerName;

protected TableSnapshot tableSnapshot;
// Save the reference of session variable, so that we don't need to get it from connection context.
// connection context is a thread local variable, it is not available is running in other thread.
protected SessionVariable sessionVariable;

/**
* External file scan node for Query hms table
Expand All @@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
StatisticalType statisticalType, boolean needCheckColumnPriv,
SessionVariable sv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
this.sessionVariable = sv;
}

@Override
Expand All @@ -112,7 +118,6 @@ public void init(Analyzer analyzer) throws UserException {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
}
super.init(analyzer);
initFileSplitSize();
doInitialize();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime();
Expand Down Expand Up @@ -314,6 +319,7 @@ public void createScanRangeLocations() throws UserException {
params.setProperties(locationProperties);
}

int numBackends = backendPolicy.numBackends();
List<String> pathPartitionKeys = getPathPartitionKeys();
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while scanning.
Expand Down Expand Up @@ -356,7 +362,7 @@ public void createScanRangeLocations() throws UserException {
scanBackendIds.add(backend.getId());
}
} else {
List<Split> inputSplits = getSplits();
List<Split> inputSplits = getSplits(numBackends);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
Expand Down Expand Up @@ -605,4 +611,19 @@ public TableSnapshot getQueryTableSnapshot() {
}
return this.tableSnapshot;
}

/**
* The real file split size is determined by:
* 1. If user specify the split size in session variable `file_split_size`, use user specified value.
* 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
* @param blockSize, got from file system, eg, hdfs
* @return the real file split size
*/
protected long getRealFileSplitSize(long blockSize) {
long realSplitSize = sessionVariable.getFileSplitSize();
if (realSplitSize <= 0) {
realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
}
return realSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanNode;
import org.apache.doris.thrift.TFileScanRangeParams;
Expand All @@ -46,11 +41,9 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -68,28 +61,13 @@ public abstract class FileScanNode extends ExternalScanNode {
// For explain
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
protected long fileSplitSize;
protected boolean isSplitSizeSetBySession = false;

public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
this.needCheckColumnPriv = needCheckColumnPriv;
}

@Override
public void init() throws UserException {
initFileSplitSize();
}

protected void initFileSplitSize() {
this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
this.isSplitSizeSetBySession = this.fileSplitSize > 0;
if (this.fileSplitSize <= 0) {
this.fileSplitSize = DEFAULT_SPLIT_SIZE;
}
}

@Override
protected void toThrift(TPlanNode planNode) {
planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
Expand Down Expand Up @@ -256,61 +234,4 @@ protected void setDefaultValueExprs(TableIf tbl,
}
}
}

protected List<Split> splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
List<Split> result = Lists.newArrayList();
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get());
if (!splittable || compressType != TFileCompressType.PLAIN) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path {} is not splittable.", path);
}
String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues));
return result;
}
// if file split size is set by session variable, use session variable.
// Otherwise, use max(file split size, block size)
if (!isSplitSizeSetBySession) {
fileSplitSize = Math.max(fileSplitSize, blockSize);
}
long bytesRemaining;
for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D;
bytesRemaining -= fileSplitSize) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize,
length, modificationTime, hosts, partitionValues));
}
if (bytesRemaining != 0L) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining,
length, modificationTime, hosts, partitionValues));
}

if (LOG.isDebugEnabled()) {
LOG.debug("Path {} includes {} splits.", path, result.size());
}
return result;
}

protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
if (blkLocations == null || blkLocations.length == 0) {
return -1;
}
for (int i = 0; i < blkLocations.length; ++i) {
if (blkLocations[i].getOffset() <= offset
&& offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length - 1];
long fileLength = last.getOffset() + last.getLength() - 1L;
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ public static class FileSplitCreator implements SplitCreator {

@Override
public Split create(LocationPath path, long start, long length, long fileLength,
long fileSplitSize,
long modificationTime, String[] hosts,
List<String> partitionValues) {
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
FileSplit split = new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
split.setTargetSplitSize(fileSplitSize);
return split;
}
}

Expand Down
Loading
Loading