Skip to content

Commit

Permalink
fix 7
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Dec 19, 2024
1 parent 1c62ffe commit effb10d
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 22 deletions.
8 changes: 8 additions & 0 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ Status HierarchicalDataReader::seek_to_ordinal(ordinal_t ord) {
DCHECK(_root_reader->inited);
RETURN_IF_ERROR(_root_reader->iterator->seek_to_ordinal(ord));
}
if (_sparse_column_reader) {
DCHECK(_sparse_column_reader->inited);
RETURN_IF_ERROR(_sparse_column_reader->iterator->seek_to_ordinal(ord));
}
return Status::OK();
}

Expand Down Expand Up @@ -424,6 +428,10 @@ void SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr&
*var.get_subcolumn({}) /*root*/, null_map, StringRef {_path.data(), _path.size()},
_sparse_column->get_ptr(), 0, _sparse_column->size());
var.incr_num_rows(_sparse_column->size());
var.get_sparse_column()->assume_mutable()->insert_many_defaults(_sparse_column->size());
#ifndef NDEBUG
var.check_consistency();
#endif
_sparse_column->clear();
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,11 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column,
return Status::OK();
}

ColumnReader* Segment::get_column_reader(int32_t col_unique_id) {
Result<ColumnReader*> Segment::get_column_reader(int32_t col_unique_id) {
auto status = _create_column_readers_once();
if (!status) {
return ResultError(std::move(status));
}
if (_column_readers.contains(col_unique_id)) {
return _column_readers[col_unique_id].get();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; }

ColumnReader* get_column_reader(int32_t col_unique_id);
Result<ColumnReader*> get_column_reader(int32_t col_unique_id);

private:
DISALLOW_COPY_AND_ASSIGN(Segment);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ Status SegmentWriter::_write_footer() {

// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
std::string footer_buf;
VLOG_DEBUG << "footer " << _footer.DebugString();
if (!_footer.SerializeToString(&footer_buf)) {
return Status::InternalError("failed to serialize segment footer");
}
Expand Down
39 changes: 25 additions & 14 deletions be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "vec/columns/column_object.h"
#include "vec/columns/columns_number.h"
#include "vec/common/schema_util.h"
#include "vec/json/path_in_data.h"
#include "vec/olap/olap_data_convertor.h"

namespace doris::segment_v2 {
Expand All @@ -47,12 +48,12 @@ Status VariantColumnWriterImpl::init() {
if (dynamic_paths.empty()) {
_column = vectorized::ColumnObject::create(true, false);
} else {
vectorized::ColumnObject::Subcolumns dynamic_subcolumns;
for (const auto& path : dynamic_paths) {
dynamic_subcolumns.add(vectorized::PathInData(path),
vectorized::ColumnObject::Subcolumn {0, true});
// create root
auto col = vectorized::ColumnObject::create(true, true);
for (const auto& str_path : dynamic_paths) {
DCHECK(col->add_sub_column(vectorized::PathInData(str_path), 0));
}
_column = vectorized::ColumnObject::create(std::move(dynamic_subcolumns), true);
_column = std::move(col);
}
if (_tablet_column->is_nullable()) {
_null_column = vectorized::ColumnUInt8::create(0);
Expand All @@ -69,7 +70,8 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::static_pointer_cast<BetaRowset>(reader->rowset()), &segment_cache));
for (const auto& segment : segment_cache.get_segments()) {
ColumnReader* column_reader = segment->get_column_reader(_tablet_column->unique_id());
ColumnReader* column_reader =
DORIS_TRY(segment->get_column_reader(_tablet_column->unique_id()));
if (!column_reader) {
continue;
}
Expand Down Expand Up @@ -104,10 +106,10 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st
paths_with_sizes.emplace_back(size, path);
}
std::sort(paths_with_sizes.begin(), paths_with_sizes.end(), std::greater());

// Fill dynamic_paths with first max_dynamic_paths paths in sorted list.
// reserve 1 for root column
for (const auto& [size, path] : paths_with_sizes) {
if (paths.size() < vectorized::ColumnObject::MAX_SUBCOLUMNS) {
if (paths.size() < vectorized::ColumnObject::MAX_SUBCOLUMNS - 1) {
paths.emplace(path);
}
// // todo : Add all remaining paths into shared data statistics until we reach its max size;
Expand Down Expand Up @@ -141,6 +143,7 @@ Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* p
ptr->ensure_root_node_type(expected_root_type);

converter->add_column_data_convertor(*_tablet_column);
DCHECK_EQ(ptr->get_root()->get_ptr()->size(), num_rows);
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
{ptr->get_root()->get_ptr(), nullptr, ""}, 0, num_rows, column_id));
auto [status, column] = converter->convert_column_data(column_id);
Expand Down Expand Up @@ -228,12 +231,17 @@ Status VariantColumnWriterImpl::_process_sparse_column(

// convert root column data from engine format to storage layer format
converter->add_column_data_convertor(sparse_column);
DCHECK_EQ(ptr->get_sparse_column()->size(), num_rows);
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
{ptr->get_sparse_column()->get_ptr(), nullptr, ""}, 0, num_rows, column_id));
{ptr->get_sparse_column(), nullptr, ""}, 0, num_rows, column_id));
auto [status, column] = converter->convert_column_data(column_id);
if (!status.ok()) {
return status;
}
VLOG_DEBUG << "dump sparse "
<< vectorized::schema_util::dump_column(
vectorized::ColumnObject::get_sparse_column_type(),
ptr->get_sparse_column());
RETURN_IF_ERROR(
_sparse_column_writer->append(column->get_nullmap(), column->get_data(), num_rows));
++column_id;
Expand All @@ -253,7 +261,6 @@ Status VariantColumnWriterImpl::_process_sparse_column(
_statistics.sparse_column_non_null_size.emplace(path, 1);
}
}

sparse_writer_opts.meta->set_num_rows(num_rows);
return Status::OK();
}
Expand Down Expand Up @@ -294,6 +301,10 @@ Status VariantColumnWriterImpl::finalize() {
ptr->create_root(root_type, std::move(root_col));
}

#ifndef NDEBUG
ptr->check_consistency();
#endif

size_t num_rows = _column->size();
int column_id = 0;

Expand Down Expand Up @@ -333,10 +344,10 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() {
return _column->byte_size();
}
uint64_t size = 0;
size += _root_writer->estimate_buffer_size();
for (auto& column_writer : _subcolumn_writers) {
size += column_writer->estimate_buffer_size();
}
size += _root_writer->estimate_buffer_size();
size += _sparse_column_writer->estimate_buffer_size();
return size;
}
Expand All @@ -346,32 +357,32 @@ Status VariantColumnWriterImpl::finish() {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->finish());
RETURN_IF_ERROR(_sparse_column_writer->finish());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->finish());
}
RETURN_IF_ERROR(_sparse_column_writer->finish());
return Status::OK();
}
Status VariantColumnWriterImpl::write_data() {
if (!is_finalized()) {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->write_data());
RETURN_IF_ERROR(_sparse_column_writer->write_data());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_data());
}
RETURN_IF_ERROR(_sparse_column_writer->write_data());
return Status::OK();
}
Status VariantColumnWriterImpl::write_ordinal_index() {
if (!is_finalized()) {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->write_ordinal_index());
RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_ordinal_index());
}
RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index());
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ Status VerticalSegmentWriter::_write_footer() {
_footer.set_num_rows(_row_count);

// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
LOG(INFO) << "footer " << _footer.DebugString();
VLOG_DEBUG << "footer " << _footer.DebugString();
std::string footer_buf;
if (!_footer.SerializeToString(&footer_buf)) {
return Status::InternalError("failed to serialize segment footer");
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ void ColumnObject::Subcolumn::finalize(FinalizeMode mode) {
}
data = {std::move(result_column)};
data_types = {std::move(to_type)};
data_serdes = {data_types[0]->get_serde()};
num_of_defaults_in_prefix = 0;
}

Expand Down Expand Up @@ -1253,10 +1254,11 @@ bool ColumnObject::try_add_new_subcolumn(const PathInData& path) {
}

void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t length) {
const auto& src_object = assert_cast<const ColumnObject&>(src);
#ifndef NDEBUG
check_consistency();
src_object.check_consistency();
#endif
const auto& src_object = assert_cast<const ColumnObject&>(src);

// First, insert src subcolumns
// We can reach the limit of subcolumns, and in this case
Expand Down Expand Up @@ -2224,6 +2226,9 @@ void ColumnObject::create_root(const DataTypePtr& type, MutableColumnPtr&& colum
num_rows = column->size();
}
add_sub_column({}, std::move(column), type);
if (serialized_sparse_column->empty()) {
serialized_sparse_column->insert_many_defaults(num_rows);
}
}

const DataTypePtr& ColumnObject::get_most_common_type() {
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,7 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

Subcolumns& get_subcolumns() { return subcolumns; }

ColumnPtr get_sparse_column() const {
return serialized_sparse_column->convert_to_full_column_if_const();
}
ColumnPtr get_sparse_column() const { return serialized_sparse_column; }

// use sparse_subcolumns_schema to record sparse column's path info and type
static MutableColumnPtr create_sparse_column_fn() {
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/functions/function_cast.h
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,6 @@ class FunctionCast final : public IFunctionBase {
// set variant root column/type to from column/type
auto variant = ColumnObject::create(true /*always nullable*/);
variant->create_root(from_type, col_from->assume_mutable());
variant->get_sparse_column()->assume_mutable()->insert_many_defaults(input_rows_count);
block.replace_by_position(result, std::move(variant));
return Status::OK();
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/json/parse2column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length,
}
}
column_object.incr_num_rows();
auto sparse_column = column_object.get_sparse_column();
if (sparse_column->size() == old_num_rows) {
sparse_column->assume_mutable()->insert_default();
}
#ifndef NDEBUG
column_object.check_consistency();
#endif
}

// exposed interfaces
Expand Down

0 comments on commit effb10d

Please sign in to comment.