From effb10d310ddb244df0d0b2d595b8cc41db727b6 Mon Sep 17 00:00:00 2001 From: eldenmoon Date: Thu, 19 Dec 2024 15:11:13 +0800 Subject: [PATCH] fix 7 --- .../segment_v2/hierarchical_data_reader.cpp | 8 ++++ be/src/olap/rowset/segment_v2/segment.cpp | 6 ++- be/src/olap/rowset/segment_v2/segment.h | 2 +- .../olap/rowset/segment_v2/segment_writer.cpp | 1 + .../segment_v2/variant_column_writer_impl.cpp | 39 ++++++++++++------- .../segment_v2/vertical_segment_writer.cpp | 2 +- be/src/vec/columns/column_object.cpp | 7 +++- be/src/vec/columns/column_object.h | 4 +- be/src/vec/functions/function_cast.h | 1 - be/src/vec/json/parse2column.cpp | 7 ++++ 10 files changed, 55 insertions(+), 22 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index a0e8b3fd0eecd8..de0123a330a904 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -104,6 +104,10 @@ Status HierarchicalDataReader::seek_to_ordinal(ordinal_t ord) { DCHECK(_root_reader->inited); RETURN_IF_ERROR(_root_reader->iterator->seek_to_ordinal(ord)); } + if (_sparse_column_reader) { + DCHECK(_sparse_column_reader->inited); + RETURN_IF_ERROR(_sparse_column_reader->iterator->seek_to_ordinal(ord)); + } return Status::OK(); } @@ -424,6 +428,10 @@ void SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& *var.get_subcolumn({}) /*root*/, null_map, StringRef {_path.data(), _path.size()}, _sparse_column->get_ptr(), 0, _sparse_column->size()); var.incr_num_rows(_sparse_column->size()); + var.get_sparse_column()->assume_mutable()->insert_many_defaults(_sparse_column->size()); +#ifndef NDEBUG + var.check_consistency(); +#endif _sparse_column->clear(); } diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 238898a74ecc09..9b505e4a4a5d61 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -812,7 +812,11 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column, return Status::OK(); } -ColumnReader* Segment::get_column_reader(int32_t col_unique_id) { +Result Segment::get_column_reader(int32_t col_unique_id) { + auto status = _create_column_readers_once(); + if (!status) { + return ResultError(std::move(status)); + } if (_column_readers.contains(col_unique_id)) { return _column_readers[col_unique_id].get(); } diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 5b88e60e37a29c..9fe545006e3da9 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -209,7 +209,7 @@ class Segment : public std::enable_shared_from_this, public MetadataAdd const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; } - ColumnReader* get_column_reader(int32_t col_unique_id); + Result get_column_reader(int32_t col_unique_id); private: DISALLOW_COPY_AND_ASSIGN(Segment); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index b76acf68978266..60fd7cea28a3da 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -1151,6 +1151,7 @@ Status SegmentWriter::_write_footer() { // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4) std::string footer_buf; + VLOG_DEBUG << "footer " << _footer.DebugString(); if (!_footer.SerializeToString(&footer_buf)) { return Status::InternalError("failed to serialize segment footer"); } diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index 6588e7dbe4f588..33499a8e7e2acc 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -30,6 +30,7 @@ #include "vec/columns/column_object.h" #include "vec/columns/columns_number.h" #include "vec/common/schema_util.h" +#include "vec/json/path_in_data.h" #include "vec/olap/olap_data_convertor.h" namespace doris::segment_v2 { @@ -47,12 +48,12 @@ Status VariantColumnWriterImpl::init() { if (dynamic_paths.empty()) { _column = vectorized::ColumnObject::create(true, false); } else { - vectorized::ColumnObject::Subcolumns dynamic_subcolumns; - for (const auto& path : dynamic_paths) { - dynamic_subcolumns.add(vectorized::PathInData(path), - vectorized::ColumnObject::Subcolumn {0, true}); + // create root + auto col = vectorized::ColumnObject::create(true, true); + for (const auto& str_path : dynamic_paths) { + DCHECK(col->add_sub_column(vectorized::PathInData(str_path), 0)); } - _column = vectorized::ColumnObject::create(std::move(dynamic_subcolumns), true); + _column = std::move(col); } if (_tablet_column->is_nullable()) { _null_column = vectorized::ColumnUInt8::create(0); @@ -69,7 +70,8 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::setload_segments( std::static_pointer_cast(reader->rowset()), &segment_cache)); for (const auto& segment : segment_cache.get_segments()) { - ColumnReader* column_reader = segment->get_column_reader(_tablet_column->unique_id()); + ColumnReader* column_reader = + DORIS_TRY(segment->get_column_reader(_tablet_column->unique_id())); if (!column_reader) { continue; } @@ -104,10 +106,10 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::setensure_root_node_type(expected_root_type); converter->add_column_data_convertor(*_tablet_column); + DCHECK_EQ(ptr->get_root()->get_ptr()->size(), num_rows); RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( {ptr->get_root()->get_ptr(), nullptr, ""}, 0, num_rows, column_id)); auto [status, column] = converter->convert_column_data(column_id); @@ -228,12 +231,17 @@ Status VariantColumnWriterImpl::_process_sparse_column( // convert root column data from engine format to storage layer format converter->add_column_data_convertor(sparse_column); + DCHECK_EQ(ptr->get_sparse_column()->size(), num_rows); RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( - {ptr->get_sparse_column()->get_ptr(), nullptr, ""}, 0, num_rows, column_id)); + {ptr->get_sparse_column(), nullptr, ""}, 0, num_rows, column_id)); auto [status, column] = converter->convert_column_data(column_id); if (!status.ok()) { return status; } + VLOG_DEBUG << "dump sparse " + << vectorized::schema_util::dump_column( + vectorized::ColumnObject::get_sparse_column_type(), + ptr->get_sparse_column()); RETURN_IF_ERROR( _sparse_column_writer->append(column->get_nullmap(), column->get_data(), num_rows)); ++column_id; @@ -253,7 +261,6 @@ Status VariantColumnWriterImpl::_process_sparse_column( _statistics.sparse_column_non_null_size.emplace(path, 1); } } - sparse_writer_opts.meta->set_num_rows(num_rows); return Status::OK(); } @@ -294,6 +301,10 @@ Status VariantColumnWriterImpl::finalize() { ptr->create_root(root_type, std::move(root_col)); } +#ifndef NDEBUG + ptr->check_consistency(); +#endif + size_t num_rows = _column->size(); int column_id = 0; @@ -333,10 +344,10 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() { return _column->byte_size(); } uint64_t size = 0; + size += _root_writer->estimate_buffer_size(); for (auto& column_writer : _subcolumn_writers) { size += column_writer->estimate_buffer_size(); } - size += _root_writer->estimate_buffer_size(); size += _sparse_column_writer->estimate_buffer_size(); return size; } @@ -346,10 +357,10 @@ Status VariantColumnWriterImpl::finish() { RETURN_IF_ERROR(finalize()); } RETURN_IF_ERROR(_root_writer->finish()); - RETURN_IF_ERROR(_sparse_column_writer->finish()); for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->finish()); } + RETURN_IF_ERROR(_sparse_column_writer->finish()); return Status::OK(); } Status VariantColumnWriterImpl::write_data() { @@ -357,10 +368,10 @@ Status VariantColumnWriterImpl::write_data() { RETURN_IF_ERROR(finalize()); } RETURN_IF_ERROR(_root_writer->write_data()); - RETURN_IF_ERROR(_sparse_column_writer->write_data()); for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->write_data()); } + RETURN_IF_ERROR(_sparse_column_writer->write_data()); return Status::OK(); } Status VariantColumnWriterImpl::write_ordinal_index() { @@ -368,10 +379,10 @@ Status VariantColumnWriterImpl::write_ordinal_index() { RETURN_IF_ERROR(finalize()); } RETURN_IF_ERROR(_root_writer->write_ordinal_index()); - RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index()); for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->write_ordinal_index()); } + RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index()); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 714ed6b7d6e02e..1b6da1dbf4c40d 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -1506,7 +1506,7 @@ Status VerticalSegmentWriter::_write_footer() { _footer.set_num_rows(_row_count); // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4) - LOG(INFO) << "footer " << _footer.DebugString(); + VLOG_DEBUG << "footer " << _footer.DebugString(); std::string footer_buf; if (!_footer.SerializeToString(&footer_buf)) { return Status::InternalError("failed to serialize segment footer"); diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 31b77d549fb65a..91a0936673ff27 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -701,6 +701,7 @@ void ColumnObject::Subcolumn::finalize(FinalizeMode mode) { } data = {std::move(result_column)}; data_types = {std::move(to_type)}; + data_serdes = {data_types[0]->get_serde()}; num_of_defaults_in_prefix = 0; } @@ -1253,10 +1254,11 @@ bool ColumnObject::try_add_new_subcolumn(const PathInData& path) { } void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t length) { + const auto& src_object = assert_cast(src); #ifndef NDEBUG check_consistency(); + src_object.check_consistency(); #endif - const auto& src_object = assert_cast(src); // First, insert src subcolumns // We can reach the limit of subcolumns, and in this case @@ -2224,6 +2226,9 @@ void ColumnObject::create_root(const DataTypePtr& type, MutableColumnPtr&& colum num_rows = column->size(); } add_sub_column({}, std::move(column), type); + if (serialized_sparse_column->empty()) { + serialized_sparse_column->insert_many_defaults(num_rows); + } } const DataTypePtr& ColumnObject::get_most_common_type() { diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 9d9d40f9075d65..c7859ab4b932af 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -363,9 +363,7 @@ class ColumnObject final : public COWHelper { Subcolumns& get_subcolumns() { return subcolumns; } - ColumnPtr get_sparse_column() const { - return serialized_sparse_column->convert_to_full_column_if_const(); - } + ColumnPtr get_sparse_column() const { return serialized_sparse_column; } // use sparse_subcolumns_schema to record sparse column's path info and type static MutableColumnPtr create_sparse_column_fn() { diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index 0e7a8c495d307c..5de820dfa3a24f 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -1933,7 +1933,6 @@ class FunctionCast final : public IFunctionBase { // set variant root column/type to from column/type auto variant = ColumnObject::create(true /*always nullable*/); variant->create_root(from_type, col_from->assume_mutable()); - variant->get_sparse_column()->assume_mutable()->insert_many_defaults(input_rows_count); block.replace_by_position(result, std::move(variant)); return Status::OK(); } diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp index ba18083a95c5f6..0e8472928a5627 100644 --- a/be/src/vec/json/parse2column.cpp +++ b/be/src/vec/json/parse2column.cpp @@ -191,6 +191,13 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length, } } column_object.incr_num_rows(); + auto sparse_column = column_object.get_sparse_column(); + if (sparse_column->size() == old_num_rows) { + sparse_column->assume_mutable()->insert_default(); + } +#ifndef NDEBUG + column_object.check_consistency(); +#endif } // exposed interfaces