From 828174943b2ae0858a6aab626b65ba6ddfb25d53 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 9 Dec 2024 05:24:07 +0800 Subject: [PATCH] test --- .../vec/exec/format/table/iceberg_reader.cpp | 18 +++-- be/src/vec/exec/format/table/iceberg_reader.h | 4 +- .../vec/exec/format/table/paimon_reader.cpp | 3 + be/src/vec/exec/scan/vfile_scanner.cpp | 2 + .../paimon/test_paimon_catalog.out | 80 +++++++++++++++++++ .../test_iceberg_optimize_count.groovy | 2 +- .../paimon/test_paimon_catalog.groovy | 14 ++++ 7 files changed, 113 insertions(+), 10 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 8f130ca6002d5d4..837269b0bb355d4 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -96,25 +96,25 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma _iceberg_profile.delete_rows_sort_time = ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); if (range.table_format_params.iceberg_params.__isset.row_count) { - _remaining_push_down_count = range.table_format_params.iceberg_params.row_count; + _remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count; } else { - _remaining_push_down_count = -1; + _remaining_table_level_row_count = -1; } } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { // already get rows from be - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { - auto rows = - std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); - _remaining_push_down_count -= rows; + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { + auto rows = std::min(_remaining_table_level_row_count, + (int64_t)_state->query_options().batch_size); + _remaining_table_level_row_count -= rows; auto mutate_columns = block->mutate_columns(); for (auto& col : mutate_columns) { col->resize(rows); } block->set_columns(std::move(mutate_columns)); *read_rows = rows; - if (_remaining_push_down_count == 0) { + if (_remaining_table_level_row_count == 0) { *eof = true; } @@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns( Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) { // We get the count value by doris's be, so we don't need to read the delete file - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { return Status::OK(); } @@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOC if (position_delete_files.size() > 0) { RETURN_IF_ERROR( _position_delete_base(table_desc.original_file_path, position_delete_files)); + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } if (equality_delete_files.size() > 0) { RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 2e240f465b6a2cd..ee7dcdd68d24fa6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -167,7 +167,9 @@ class IcebergTableReader : public TableFormatReader { bool _has_schema_change = false; bool _has_iceberg_schema = false; - int64_t _remaining_push_down_count; + // the table level row count for optimizing query like: + // select count(*) from table; + int64_t _remaining_table_level_row_count; Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index 263fdc8014bc36a..055d6179b2c4221 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext return Status::OK(); } + // set push down agg type to NONE because we can not do count push down opt + // if there are delete files. + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); const auto& deletion_file = table_desc.deletion_file; io::FileSystemProperties properties = { .system_type = _params.file_type, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 3944846a4307a59..76639e4bed4a288 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -822,6 +822,8 @@ Status VFileScanner::_get_next_reader() { _should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache() : nullptr, _state->query_options().enable_parquet_lazy_mat); + // ATTN: the push down agg type may be set back to NONE, + // see IcebergTableReader::init_row_filters for example. parquet_reader->set_push_down_agg_type(_get_push_down_agg_type()); { SCOPED_TIMER(_open_reader_timer); diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out index f3b44964915230f..a394836625d751f 100644 --- a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out +++ b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out @@ -578,6 +578,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1157,6 +1177,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1736,6 +1776,26 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -2315,3 +2375,23 @@ bbb -- !c109 -- +-- !c110 -- +3 + +-- !c111 -- +3 + +-- !c112 -- +2 + +-- !c113 -- +2 + +-- !c114 -- +3 3_1 +4 4_1 + +-- !c115 -- +3 3_1 +4 4_1 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy index 306af3b2cb28525..7a9e90a61fe2cb1 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy @@ -110,7 +110,7 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external } finally { sql """ set enable_count_push_down_for_external_table=true; """ - sql """drop catalog if exists ${catalog_name}""" + // sql """drop catalog if exists ${catalog_name}""" } } diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy index 41afb02e0f932d0..9668cbb0950c5dd 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -181,6 +181,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ def c108= """ select id from tb_with_upper_case where id = 1 """ def c109= """ select id from tb_with_upper_case where id < 1 """ + def c110 = """select count(*) from deletion_vector_orc;""" + def c111 = """select count(*) from deletion_vector_parquet;""" + def c112 = """select count(*) from deletion_vector_orc where id > 2;""" + def c113 = """select count(*) from deletion_vector_parquet where id > 2;""" + def c114 = """select * from deletion_vector_orc where id > 2;""" + def c115 = """select * from deletion_vector_parquet where id > 2;""" + String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") String catalog_name = "ctl_test_paimon_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -289,6 +296,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ qt_c107 c107 qt_c108 c108 qt_c109 c109 + + qt_c110 c110 + qt_c111 c111 + qt_c112 c112 + qt_c113 c113 + qt_c114 c114 + qt_c115 c115 } test_cases("false", "false")