From e1ce2bf54829b220514e5e40cfc89c67bcc3dbc5 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 25 Dec 2024 20:45:10 +0800 Subject: [PATCH] [fix](parquet) impl has_dict_page to replace old logic and fix write empty parquet row group bug (#45740) ### What problem does this PR solve? Problem Summary: Checks if the given column has a dictionary page. This function determines the presence of a dictionary page by checking the `dictionary_page_offset` field in the column metadata. The `dictionary_page_offset` must be set and greater than 0, and it must be less than the `data_page_offset`. The reason for these checks is based on the implementation in the Java version of ORC, where `dictionary_page_offset` is used to indicate the absence of a dictionary. Additionally, Parquet may write an empty row group, in which case the dictionary page content would be empty, and thus the dictionary page should not be read. See https://github.com/apache/arrow/pull/2667/files --- .../parquet/vparquet_column_chunk_reader.cpp | 32 +++++++++++++----- .../parquet/vparquet_column_chunk_reader.h | 3 +- .../format/parquet/vparquet_column_reader.cpp | 6 ++-- .../exec/format/parquet/vparquet_reader.cpp | 8 ++--- .../vec/exec/format/table/iceberg_reader.cpp | 13 +++----- be/src/vec/runtime/vparquet_transformer.cpp | 33 +++---------------- be/src/vec/runtime/vparquet_transformer.h | 3 +- .../vec/exec/parquet/parquet_thrift_test.cpp | 5 ++- 8 files changed, 43 insertions(+), 60 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index dfd44892453cda..5c5ee475bd6b0a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -59,15 +59,15 @@ ColumnChunkReader::ColumnChunkReader(io::BufferedStreamReader* reader, _io_ctx(io_ctx) {} Status ColumnChunkReader::init() { - size_t start_offset = - _has_dict_page() ? _metadata.dictionary_page_offset : _metadata.data_page_offset; + size_t start_offset = has_dict_page(_metadata) ? _metadata.dictionary_page_offset + : _metadata.data_page_offset; size_t chunk_size = _metadata.total_compressed_size; // create page reader _page_reader = create_page_reader(_stream_reader, _io_ctx, start_offset, chunk_size, _metadata.num_values, _offset_index); // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec)); - if (_has_dict_page()) { + if (has_dict_page(_metadata)) { // seek to the directory page _page_reader->seek_to_page(_metadata.dictionary_page_offset); // Parse dictionary data when reading @@ -81,10 +81,6 @@ Status ColumnChunkReader::init() { return Status::OK(); } -bool ColumnChunkReader::_has_dict_page() const { - return _metadata.__isset.dictionary_page_offset && _metadata.dictionary_page_offset > 0; -} - Status ColumnChunkReader::next_page() { if (_state == HEADER_PARSED) { return Status::OK(); @@ -337,4 +333,24 @@ int32_t ColumnChunkReader::_get_type_length() { return -1; } } -} // namespace doris::vectorized \ No newline at end of file + +/** + * Checks if the given column has a dictionary page. + * + * This function determines the presence of a dictionary page by checking the + * dictionary_page_offset field in the column metadata. The dictionary_page_offset + * must be set and greater than 0, and it must be less than the data_page_offset. + * + * The reason for these checks is based on the implementation in the Java version + * of ORC, where dictionary_page_offset is used to indicate the absence of a dictionary. + * Additionally, Parquet may write an empty row group, in which case the dictionary page + * content would be empty, and thus the dictionary page should not be read. + * + * See https://github.com/apache/arrow/pull/2667/files + */ +bool has_dict_page(const tparquet::ColumnMetaData& column) { + return column.__isset.dictionary_page_offset && column.dictionary_page_offset > 0 && + column.dictionary_page_offset < column.data_page_offset; +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 531ae511a3e0e3..57f7c0c72d0e6f 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -191,7 +191,6 @@ class ColumnChunkReader { private: enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED }; - bool _has_dict_page() const; Status _decode_dict_page(); void _reserve_decompress_buf(size_t size); int32_t _get_type_length(); @@ -230,4 +229,6 @@ class ColumnChunkReader { Statistics _statistics; }; +bool has_dict_page(const tparquet::ColumnMetaData& column); + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 11fec1d5a79042..d09b1a261abd81 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -179,10 +179,8 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size) { _field_schema = field; auto& chunk_meta = _chunk_meta.meta_data; - int64_t chunk_start = - chunk_meta.__isset.dictionary_page_offset && chunk_meta.dictionary_page_offset > 0 - ? chunk_meta.dictionary_page_offset - : chunk_meta.data_page_offset; + int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; size_t chunk_len = chunk_meta.total_compressed_size; size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size); if (typeid_cast(file.get())) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 44522454846978..dd62640e23666d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -744,7 +744,7 @@ std::vector ParquetReader::_generate_random_access_ranges( const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index]; auto& chunk_meta = chunk.meta_data; - int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset + int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset : chunk_meta.data_page_offset; int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size; @@ -1007,11 +1007,7 @@ Status ParquetReader::_process_bloom_filter(bool* filter_group) { } int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) { - if (column.__isset.dictionary_page_offset) { - DCHECK_LT(column.dictionary_page_offset, column.data_page_offset); - return column.dictionary_page_offset; - } - return column.data_page_offset; + return has_dict_page(column) ? column.dictionary_page_offset : column.data_page_offset; } void ParquetReader::_collect_profile() { diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 837269b0bb355d..21a98f79acb171 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -17,7 +17,6 @@ #include "iceberg_reader.h" -#include #include #include #include @@ -25,17 +24,15 @@ #include #include #include -#include #include #include +#include #include #include -#include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" -#include "olap/olap_common.h" #include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "runtime/runtime_state.h" @@ -54,6 +51,7 @@ #include "vec/exec/format/generic_reader.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" #include "vec/exec/format/table/table_format_reader.h" namespace cctz { @@ -580,10 +578,9 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data(); bool dictionary_coded = true; - for (int j = 0; j < meta_data->row_groups.size(); ++j) { - auto& column_chunk = meta_data->row_groups[j].columns[ICEBERG_FILE_PATH_INDEX]; - if (!(column_chunk.__isset.meta_data && - column_chunk.meta_data.__isset.dictionary_page_offset)) { + for (const auto& row_group : meta_data->row_groups) { + const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX]; + if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) { dictionary_coded = false; break; } diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index 86ca54909f7d5e..8343cf3b46806c 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -21,55 +21,29 @@ #include #include #include -#include #include #include #include #include #include -#include -#include -#include +#include #include #include #include #include "common/config.h" #include "common/status.h" -#include "gutil/endian.h" #include "io/fs/file_writer.h" -#include "olap/olap_common.h" -#include "runtime/decimalv2_value.h" -#include "runtime/define_primitive_type.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" -#include "runtime/types.h" #include "util/arrow/block_convertor.h" #include "util/arrow/row_batch.h" #include "util/arrow/utils.h" -#include "util/binary_cast.hpp" #include "util/debug_util.h" -#include "util/mysql_global.h" -#include "util/types.h" -#include "vec/columns/column.h" -#include "vec/columns/column_complex.h" -#include "vec/columns/column_decimal.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_string.h" -#include "vec/columns/column_vector.h" -#include "vec/columns/columns_number.h" -#include "vec/common/assert_cast.h" -#include "vec/common/string_ref.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/core/types.h" -#include "vec/data_types/data_type_decimal.h" -#include "vec/data_types/data_type_nullable.h" #include "vec/exec/format/table/iceberg/arrow_schema_util.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" -#include "vec/functions/function_helpers.h" -#include "vec/runtime/vdatetime_value.h" namespace doris::vectorized { @@ -306,11 +280,12 @@ Status VParquetTransformer::write(const Block& block) { RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, ExecEnv::GetInstance()->arrow_memory_pool(), &result, _state->timezone_obj())); - + if (_write_size == 0) { + RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup()); + } RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result)); _write_size += block.bytes(); if (_write_size >= doris::config::min_row_group_size) { - RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup()); _write_size = 0; } return Status::OK(); diff --git a/be/src/vec/runtime/vparquet_transformer.h b/be/src/vec/runtime/vparquet_transformer.h index 03c9aeb0816788..5f3173af18104e 100644 --- a/be/src/vec/runtime/vparquet_transformer.h +++ b/be/src/vec/runtime/vparquet_transformer.h @@ -25,7 +25,8 @@ #include #include #include -#include + +#include #include "vec/exec/format/table/iceberg/schema.h" #include "vfile_format_transformer.h" diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 132de072127e14..9759f91cad9903 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -186,9 +186,8 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column FieldSchema* field_schema, ColumnPtr& doris_column, DataTypePtr& data_type, level_t* definitions) { tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data; - size_t start_offset = chunk_meta.__isset.dictionary_page_offset - ? chunk_meta.dictionary_page_offset - : chunk_meta.data_page_offset; + size_t start_offset = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; size_t chunk_size = chunk_meta.total_compressed_size; cctz::time_zone ctz;