Skip to content

Commit

Permalink
branch-3.0: [fix](parquet) impl has_dict_page to replace old logic an…
Browse files Browse the repository at this point in the history
…d fix write empty parquet row group bug #45740 (#45953)

Cherry-picked from #45740

Co-authored-by: Socrates <[email protected]>
  • Loading branch information
github-actions[bot] and suxiaogang223 authored Dec 25, 2024
1 parent d94ff8f commit 09cacab
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 60 deletions.
32 changes: 24 additions & 8 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ ColumnChunkReader::ColumnChunkReader(io::BufferedStreamReader* reader,
_io_ctx(io_ctx) {}

Status ColumnChunkReader::init() {
size_t start_offset =
_has_dict_page() ? _metadata.dictionary_page_offset : _metadata.data_page_offset;
size_t start_offset = has_dict_page(_metadata) ? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
// create page reader
_page_reader = create_page_reader(_stream_reader, _io_ctx, start_offset, chunk_size,
_metadata.num_values, _offset_index);
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec));
if (_has_dict_page()) {
if (has_dict_page(_metadata)) {
// seek to the directory page
_page_reader->seek_to_page(_metadata.dictionary_page_offset);
// Parse dictionary data when reading
Expand All @@ -81,10 +81,6 @@ Status ColumnChunkReader::init() {
return Status::OK();
}

bool ColumnChunkReader::_has_dict_page() const {
return _metadata.__isset.dictionary_page_offset && _metadata.dictionary_page_offset > 0;
}

Status ColumnChunkReader::next_page() {
if (_state == HEADER_PARSED) {
return Status::OK();
Expand Down Expand Up @@ -337,4 +333,24 @@ int32_t ColumnChunkReader::_get_type_length() {
return -1;
}
}
} // namespace doris::vectorized

/**
* Checks if the given column has a dictionary page.
*
* This function determines the presence of a dictionary page by checking the
* dictionary_page_offset field in the column metadata. The dictionary_page_offset
* must be set and greater than 0, and it must be less than the data_page_offset.
*
* The reason for these checks is based on the implementation in the Java version
* of ORC, where dictionary_page_offset is used to indicate the absence of a dictionary.
* Additionally, Parquet may write an empty row group, in which case the dictionary page
* content would be empty, and thus the dictionary page should not be read.
*
* See https://github.com/apache/arrow/pull/2667/files
*/
bool has_dict_page(const tparquet::ColumnMetaData& column) {
return column.__isset.dictionary_page_offset && column.dictionary_page_offset > 0 &&
column.dictionary_page_offset < column.data_page_offset;
}

} // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ class ColumnChunkReader {
private:
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED };

bool _has_dict_page() const;
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
int32_t _get_type_length();
Expand Down Expand Up @@ -230,4 +229,6 @@ class ColumnChunkReader {
Statistics _statistics;
};

bool has_dict_page(const tparquet::ColumnMetaData& column);

} // namespace doris::vectorized
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,8 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end
Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size) {
_field_schema = field;
auto& chunk_meta = _chunk_meta.meta_data;
int64_t chunk_start =
chunk_meta.__isset.dictionary_page_offset && chunk_meta.dictionary_page_offset > 0
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_len = chunk_meta.total_compressed_size;
size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
if (typeid_cast<io::MergeRangeFileReader*>(file.get())) {
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
const tparquet::ColumnChunk& chunk =
row_group.columns[field->physical_column_index];
auto& chunk_meta = chunk.meta_data;
int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
int64_t chunk_start = has_dict_page(chunk_meta)
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size;
Expand Down Expand Up @@ -1007,11 +1007,7 @@ Status ParquetReader::_process_bloom_filter(bool* filter_group) {
}

int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
if (column.__isset.dictionary_page_offset) {
DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
return column.dictionary_page_offset;
}
return column.data_page_offset;
return has_dict_page(column) ? column.dictionary_page_offset : column.data_page_offset;
}

void ParquetReader::_collect_profile() {
Expand Down
13 changes: 5 additions & 8 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,22 @@

#include "iceberg_reader.h"

#include <ctype.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/parquet_types.h>
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
#include <rapidjson/allocators.h>
#include <rapidjson/document.h>
#include <string.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <cstring>
#include <functional>
#include <memory>
#include <mutex>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "olap/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
Expand All @@ -54,6 +51,7 @@
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/schema_desc.h"
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
#include "vec/exec/format/table/table_format_reader.h"

namespace cctz {
Expand Down Expand Up @@ -580,10 +578,9 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d

const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data();
bool dictionary_coded = true;
for (int j = 0; j < meta_data->row_groups.size(); ++j) {
auto& column_chunk = meta_data->row_groups[j].columns[ICEBERG_FILE_PATH_INDEX];
if (!(column_chunk.__isset.meta_data &&
column_chunk.meta_data.__isset.dictionary_page_offset)) {
for (const auto& row_group : meta_data->row_groups) {
const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX];
if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
dictionary_coded = false;
break;
}
Expand Down
33 changes: 4 additions & 29 deletions be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,29 @@
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <glog/logging.h>
#include <math.h>
#include <parquet/column_writer.h>
#include <parquet/platform.h>
#include <parquet/schema.h>
#include <parquet/type_fwd.h>
#include <parquet/types.h>
#include <time.h>

#include <algorithm>
#include <cstdint>
#include <ctime>
#include <exception>
#include <ostream>
#include <string>

#include "common/config.h"
#include "common/status.h"
#include "gutil/endian.h"
#include "io/fs/file_writer.h"
#include "olap/olap_common.h"
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/utils.h"
#include "util/binary_cast.hpp"
#include "util/debug_util.h"
#include "util/mysql_global.h"
#include "util/types.h"
#include "vec/columns/column.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_decimal.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/function_helpers.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -306,11 +280,12 @@ Status VParquetTransformer::write(const Block& block) {
RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema,
ExecEnv::GetInstance()->arrow_memory_pool(), &result,
_state->timezone_obj()));

if (_write_size == 0) {
RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup());
}
RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result));
_write_size += block.bytes();
if (_write_size >= doris::config::min_row_group_size) {
RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup());
_write_size = 0;
}
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/runtime/vparquet_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
#include <parquet/file_writer.h>
#include <parquet/properties.h>
#include <parquet/types.h>
#include <stdint.h>

#include <cstdint>

#include "vec/exec/format/table/iceberg/schema.h"
#include "vfile_format_transformer.h"
Expand Down
5 changes: 2 additions & 3 deletions be/test/vec/exec/parquet/parquet_thrift_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,8 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column
FieldSchema* field_schema, ColumnPtr& doris_column,
DataTypePtr& data_type, level_t* definitions) {
tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t start_offset = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_size = chunk_meta.total_compressed_size;

cctz::time_zone ctz;
Expand Down

0 comments on commit 09cacab

Please sign in to comment.