Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-3.0: [fix](parquet) impl has_dict_page to replace old logic and fix write empty parquet row group bug #45740 #45953

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ ColumnChunkReader::ColumnChunkReader(io::BufferedStreamReader* reader,
_io_ctx(io_ctx) {}

Status ColumnChunkReader::init() {
size_t start_offset =
_has_dict_page() ? _metadata.dictionary_page_offset : _metadata.data_page_offset;
size_t start_offset = has_dict_page(_metadata) ? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
// create page reader
_page_reader = create_page_reader(_stream_reader, _io_ctx, start_offset, chunk_size,
_metadata.num_values, _offset_index);
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec));
if (_has_dict_page()) {
if (has_dict_page(_metadata)) {
// seek to the directory page
_page_reader->seek_to_page(_metadata.dictionary_page_offset);
// Parse dictionary data when reading
Expand All @@ -81,10 +81,6 @@ Status ColumnChunkReader::init() {
return Status::OK();
}

bool ColumnChunkReader::_has_dict_page() const {
return _metadata.__isset.dictionary_page_offset && _metadata.dictionary_page_offset > 0;
}

Status ColumnChunkReader::next_page() {
if (_state == HEADER_PARSED) {
return Status::OK();
Expand Down Expand Up @@ -337,4 +333,24 @@ int32_t ColumnChunkReader::_get_type_length() {
return -1;
}
}
} // namespace doris::vectorized

/**
* Checks if the given column has a dictionary page.
*
* This function determines the presence of a dictionary page by checking the
* dictionary_page_offset field in the column metadata. The dictionary_page_offset
* must be set and greater than 0, and it must be less than the data_page_offset.
*
* The reason for these checks is based on the implementation in the Java version
* of ORC, where dictionary_page_offset is used to indicate the absence of a dictionary.
* Additionally, Parquet may write an empty row group, in which case the dictionary page
* content would be empty, and thus the dictionary page should not be read.
*
* See https://github.com/apache/arrow/pull/2667/files
*/
bool has_dict_page(const tparquet::ColumnMetaData& column) {
return column.__isset.dictionary_page_offset && column.dictionary_page_offset > 0 &&
column.dictionary_page_offset < column.data_page_offset;
}

} // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ class ColumnChunkReader {
private:
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED };

bool _has_dict_page() const;
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
int32_t _get_type_length();
Expand Down Expand Up @@ -230,4 +229,6 @@ class ColumnChunkReader {
Statistics _statistics;
};

bool has_dict_page(const tparquet::ColumnMetaData& column);

} // namespace doris::vectorized
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,8 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end
Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size) {
_field_schema = field;
auto& chunk_meta = _chunk_meta.meta_data;
int64_t chunk_start =
chunk_meta.__isset.dictionary_page_offset && chunk_meta.dictionary_page_offset > 0
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_len = chunk_meta.total_compressed_size;
size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
if (typeid_cast<io::MergeRangeFileReader*>(file.get())) {
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
const tparquet::ColumnChunk& chunk =
row_group.columns[field->physical_column_index];
auto& chunk_meta = chunk.meta_data;
int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
int64_t chunk_start = has_dict_page(chunk_meta)
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size;
Expand Down Expand Up @@ -1007,11 +1007,7 @@ Status ParquetReader::_process_bloom_filter(bool* filter_group) {
}

int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
if (column.__isset.dictionary_page_offset) {
DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
return column.dictionary_page_offset;
}
return column.data_page_offset;
return has_dict_page(column) ? column.dictionary_page_offset : column.data_page_offset;
}

void ParquetReader::_collect_profile() {
Expand Down
13 changes: 5 additions & 8 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,22 @@

#include "iceberg_reader.h"

#include <ctype.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/parquet_types.h>
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
#include <rapidjson/allocators.h>
#include <rapidjson/document.h>
#include <string.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <cstring>
#include <functional>
#include <memory>
#include <mutex>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "olap/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
Expand All @@ -54,6 +51,7 @@
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/schema_desc.h"
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
#include "vec/exec/format/table/table_format_reader.h"

namespace cctz {
Expand Down Expand Up @@ -580,10 +578,9 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d

const tparquet::FileMetaData* meta_data = parquet_delete_reader.get_meta_data();
bool dictionary_coded = true;
for (int j = 0; j < meta_data->row_groups.size(); ++j) {
auto& column_chunk = meta_data->row_groups[j].columns[ICEBERG_FILE_PATH_INDEX];
if (!(column_chunk.__isset.meta_data &&
column_chunk.meta_data.__isset.dictionary_page_offset)) {
for (const auto& row_group : meta_data->row_groups) {
const auto& column_chunk = row_group.columns[ICEBERG_FILE_PATH_INDEX];
if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
dictionary_coded = false;
break;
}
Expand Down
33 changes: 4 additions & 29 deletions be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,29 @@
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <glog/logging.h>
#include <math.h>
#include <parquet/column_writer.h>
#include <parquet/platform.h>
#include <parquet/schema.h>
#include <parquet/type_fwd.h>
#include <parquet/types.h>
#include <time.h>

#include <algorithm>
#include <cstdint>
#include <ctime>
#include <exception>
#include <ostream>
#include <string>

#include "common/config.h"
#include "common/status.h"
#include "gutil/endian.h"
#include "io/fs/file_writer.h"
#include "olap/olap_common.h"
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/utils.h"
#include "util/binary_cast.hpp"
#include "util/debug_util.h"
#include "util/mysql_global.h"
#include "util/types.h"
#include "vec/columns/column.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_decimal.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/function_helpers.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -306,11 +280,12 @@ Status VParquetTransformer::write(const Block& block) {
RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema,
ExecEnv::GetInstance()->arrow_memory_pool(), &result,
_state->timezone_obj()));

if (_write_size == 0) {
RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup());
}
RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result));
_write_size += block.bytes();
if (_write_size >= doris::config::min_row_group_size) {
RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup());
_write_size = 0;
}
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/runtime/vparquet_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
#include <parquet/file_writer.h>
#include <parquet/properties.h>
#include <parquet/types.h>
#include <stdint.h>

#include <cstdint>

#include "vec/exec/format/table/iceberg/schema.h"
#include "vfile_format_transformer.h"
Expand Down
5 changes: 2 additions & 3 deletions be/test/vec/exec/parquet/parquet_thrift_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,8 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column
FieldSchema* field_schema, ColumnPtr& doris_column,
DataTypePtr& data_type, level_t* definitions) {
tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t start_offset = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_size = chunk_meta.total_compressed_size;

cctz::time_zone ctz;
Expand Down
Loading