Skip to content

Commit

Permalink
fix 4 (apache#45601)
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored Dec 18, 2024
1 parent 52e9e14 commit 989aa0f
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 184 deletions.
174 changes: 83 additions & 91 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,6 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF
io::FileReaderSPtr file_reader) {
// init sub columns
_subcolumn_readers = std::make_unique<SubcolumnColumnReaders>();
std::unordered_map<vectorized::PathInData, uint32_t, vectorized::PathInData::Hash>
column_path_to_footer_ordinal;
for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
const auto& column_pb = footer.columns(ordinal);
// column path for accessing subcolumns of variant
if (column_pb.has_column_path_info()) {
vectorized::PathInData path;
path.from_protobuf(column_pb.column_path_info());
column_path_to_footer_ordinal.emplace(path, ordinal);
}
}

const ColumnMetaPB& self_column_pb = footer.columns(column_id);
for (const ColumnMetaPB& column_pb : footer.columns()) {
if (column_pb.unique_id() != self_column_pb.unique_id()) {
Expand All @@ -311,23 +299,25 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF
&_sparse_column_reader));
continue;
}
// init subcolumns
auto relative_path = path.copy_pop_front();
auto get_data_type_fn = [&]() {
if (relative_path.empty()) {
return make_nullable(std::make_unique<vectorized::ColumnObject::MostCommonType>());
}
return vectorized::DataTypeFactory::instance().create_data_type(column_pb);
};
// init subcolumns
if (_subcolumn_readers->get_root() == nullptr) {
_subcolumn_readers->create_root(SubcolumnReader {nullptr, nullptr});
}
if (relative_path.empty()) {
// root column
_subcolumn_readers->get_mutable_root()->modify_to_scalar(SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
_subcolumn_readers->get_mutable_root()->modify_to_scalar(
SubcolumnReader {std::move(reader), get_data_type_fn()});
} else {
// check the root is already a leaf node
_subcolumn_readers->add(
relative_path,
SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
_subcolumn_readers->add(relative_path,
SubcolumnReader {std::move(reader), get_data_type_fn()});
}
}

Expand Down Expand Up @@ -876,7 +866,9 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
return new_map_iterator(iterator);
}
case FieldType::OLAP_FIELD_TYPE_VARIANT: {
*iterator = new VariantRootColumnIterator(new FileColumnIterator(this));
// read from root data
// *iterator = new VariantRootColumnIterator(new FileColumnIterator(this));
*iterator = new FileColumnIterator(this);
return Status::OK();
}
default:
Expand Down Expand Up @@ -1738,75 +1730,75 @@ void DefaultValueColumnIterator::_insert_many_default(vectorized::MutableColumnP
}
}

Status VariantRootColumnIterator::_process_root_column(
vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& root_column,
const vectorized::DataTypePtr& most_common_type) {
auto& obj =
dst->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*dst);

// fill nullmap
if (root_column->is_nullable() && dst->is_nullable()) {
vectorized::ColumnUInt8& dst_null_map =
assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
vectorized::ColumnUInt8& src_null_map =
assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column();
dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
}

// add root column to a tmp object column
auto tmp = vectorized::ColumnObject::create(true, false);
auto& tmp_obj = assert_cast<vectorized::ColumnObject&>(*tmp);
tmp_obj.add_sub_column({}, std::move(root_column), most_common_type);

// merge tmp object column to dst
obj.insert_range_from(*tmp, 0, tmp->size());

// finalize object if needed
if (!obj.is_finalized()) {
obj.finalize();
}

#ifndef NDEBUG
obj.check_consistency();
#endif

return Status::OK();
}

Status VariantRootColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
// read root column
auto& obj =
dst->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*dst);

auto most_common_type = obj.get_most_common_type();
auto root_column = most_common_type->create_column();
RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));

return _process_root_column(dst, root_column, most_common_type);
}

Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) {
// read root column
auto& obj =
dst->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*dst);

auto most_common_type = obj.get_most_common_type();
auto root_column = most_common_type->create_column();
RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));

return _process_root_column(dst, root_column, most_common_type);
}
// Status VariantRootColumnIterator::_process_root_column(
// vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& root_column,
// const vectorized::DataTypePtr& most_common_type) {
// auto& obj =
// dst->is_nullable()
// ? assert_cast<vectorized::ColumnObject&>(
// assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
// : assert_cast<vectorized::ColumnObject&>(*dst);
//
// // fill nullmap
// if (root_column->is_nullable() && dst->is_nullable()) {
// vectorized::ColumnUInt8& dst_null_map =
// assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
// vectorized::ColumnUInt8& src_null_map =
// assert_cast<vectorized::ColumnNullable&>(*root_column).get_null_map_column();
// dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
// }
//
// // add root column to a tmp object column
// auto tmp = vectorized::ColumnObject::create(true, false);
// auto& tmp_obj = assert_cast<vectorized::ColumnObject&>(*tmp);
// tmp_obj.add_sub_column({}, std::move(root_column), most_common_type);
//
// // merge tmp object column to dst
// obj.insert_range_from(*tmp, 0, tmp_obj.rows());
//
// // finalize object if needed
// if (!obj.is_finalized()) {
// obj.finalize();
// }
//
// #ifndef NDEBUG
// obj.check_consistency();
// #endif
//
// return Status::OK();
// }
//
// Status VariantRootColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
// bool* has_null) {
// // read root column
// auto& obj =
// dst->is_nullable()
// ? assert_cast<vectorized::ColumnObject&>(
// assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
// : assert_cast<vectorized::ColumnObject&>(*dst);
//
// auto most_common_type = obj.get_most_common_type();
// auto root_column = most_common_type->create_column();
// RETURN_IF_ERROR(_inner_iter->next_batch(n, root_column, has_null));
//
// return _process_root_column(dst, root_column, most_common_type);
// }
//
// Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
// vectorized::MutableColumnPtr& dst) {
// // read root column
// auto& obj =
// dst->is_nullable()
// ? assert_cast<vectorized::ColumnObject&>(
// assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
// : assert_cast<vectorized::ColumnObject&>(*dst);
//
// auto most_common_type = obj.get_most_common_type();
// auto root_column = most_common_type->create_column();
// RETURN_IF_ERROR(_inner_iter->read_by_rowids(rowids, count, root_column));
//
// return _process_root_column(dst, root_column, most_common_type);
// }

Status DefaultNestedColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
bool has_null = false;
Expand Down
72 changes: 37 additions & 35 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class ColumnReader : public MetadataAdder<ColumnReader> {

void disable_index_meta_cache() { _use_index_page_cache = false; }

FieldType get_meta_type() { return _meta_type; }
virtual FieldType get_meta_type() { return _meta_type; }

private:
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
Expand Down Expand Up @@ -309,6 +309,8 @@ class VariantColumnReader : public ColumnReader {

~VariantColumnReader() override = default;

FieldType get_meta_type() override { return FieldType::OLAP_FIELD_TYPE_VARIANT; }

private:
std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers;
std::unique_ptr<ColumnReader> _sparse_column_reader;
Expand Down Expand Up @@ -661,40 +663,40 @@ class RowIdColumnIterator : public ColumnIterator {
int32_t _segment_id = 0;
};

class VariantRootColumnIterator : public ColumnIterator {
public:
VariantRootColumnIterator() = delete;

explicit VariantRootColumnIterator(FileColumnIterator* iter) { _inner_iter.reset(iter); }

~VariantRootColumnIterator() override = default;

Status init(const ColumnIteratorOptions& opts) override { return _inner_iter->init(opts); }

Status seek_to_first() override { return _inner_iter->seek_to_first(); }

Status seek_to_ordinal(ordinal_t ord_idx) override {
return _inner_iter->seek_to_ordinal(ord_idx);
}

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
bool has_null;
return next_batch(n, dst, &has_null);
}

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;

ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); }

private:
Status _process_root_column(vectorized::MutableColumnPtr& dst,
vectorized::MutableColumnPtr& root_column,
const vectorized::DataTypePtr& most_common_type);
std::unique_ptr<FileColumnIterator> _inner_iter;
};
// class VariantRootColumnIterator : public ColumnIterator {
// public:
// VariantRootColumnIterator() = delete;
//
// explicit VariantRootColumnIterator(FileColumnIterator* iter) { _inner_iter.reset(iter); }
//
// ~VariantRootColumnIterator() override = default;
//
// Status init(const ColumnIteratorOptions& opts) override { return _inner_iter->init(opts); }
//
// Status seek_to_first() override { return _inner_iter->seek_to_first(); }
//
// Status seek_to_ordinal(ordinal_t ord_idx) override {
// return _inner_iter->seek_to_ordinal(ord_idx);
// }
//
// Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
// bool has_null;
// return next_batch(n, dst, &has_null);
// }
//
// Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;
//
// Status read_by_rowids(const rowid_t* rowids, const size_t count,
// vectorized::MutableColumnPtr& dst) override;
//
// ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); }
//
// private:
// Status _process_root_column(vectorized::MutableColumnPtr& dst,
// vectorized::MutableColumnPtr& root_column,
// const vectorized::DataTypePtr& most_common_type);
// std::unique_ptr<FileColumnIterator> _inner_iter;
// };

// This iterator is used to read default value column
class DefaultValueColumnIterator : public ColumnIterator {
Expand Down
31 changes: 15 additions & 16 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,17 @@ Status HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr& con

// add root first
if (_path.get_parts().empty() && _root_reader) {
auto& root_var =
_root_reader->column->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*_root_reader->column)
.get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*_root_reader->column);
auto column = root_var.get_root();
auto type = root_var.get_root_type();
container_variant.add_sub_column({}, std::move(column), type);
// auto& root_var =
// _root_reader->column->is_nullable()
// ? assert_cast<vectorized::ColumnObject&>(
// assert_cast<vectorized::ColumnNullable&>(*_root_reader->column)
// .get_nested_column())
// : assert_cast<vectorized::ColumnObject&>(*_root_reader->column);
// auto column = root_var.get_root();
// auto type = root_var.get_root_type();
MutableColumnPtr column = _root_reader->column->get_ptr();
container_variant.add_sub_column({}, std::move(column),
ColumnObject::get_most_common_type());
}
// parent path -> subcolumns
std::map<PathInData, PathsWithColumnAndType> nested_subcolumns;
Expand Down Expand Up @@ -361,7 +363,9 @@ Status HierarchicalDataReader::_init_null_map_and_clear_columns(
return Status::OK();
}));
container->clear();
_sparse_column_reader->column->clear();
if (_sparse_column_reader) {
_sparse_column_reader->column->clear();
}
if (_root_reader) {
if (_root_reader->column->is_nullable()) {
// fill nullmap
Expand All @@ -372,13 +376,8 @@ Status HierarchicalDataReader::_init_null_map_and_clear_columns(
dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
// clear nullmap and inner data
src_null_map.clear();
assert_cast<ColumnObject&>(
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
.clear_column_data();
} else {
auto& root_column = assert_cast<ColumnObject&>(*_root_reader->column);
root_column.clear_column_data();
}
_root_reader->column->clear();
} else {
if (dst->is_nullable()) {
// No nullable info exist in hirearchical data, fill nullmap with all none null
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unordered_map>
#include <utility>

#include "common/exception.h"
#include "common/status.h"
#include "io/io_common.h"
#include "olap/field.h"
Expand Down
Loading

0 comments on commit 989aa0f

Please sign in to comment.