Skip to content

Commit

Permalink
fix 6 (apache#45633)
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored Dec 19, 2024
1 parent ba260c3 commit 1c62ffe
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 73 deletions.
15 changes: 10 additions & 5 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
// Node contains column with children columns or has correspoding sparse columns
// Create reader with hirachical data.
std::unique_ptr<ColumnIterator> sparse_iter;
if (!_sparse_column_set_in_stats.empty()) {
if (_statistics && !_statistics->sparse_column_non_null_size.empty()) {
// Sparse column exists or reached sparse size limit, read sparse column
ColumnIterator* iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter));
Expand All @@ -259,9 +259,10 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
read_type, std::move(sparse_iter)));
}
} else {
if (_sparse_column_set_in_stats.contains(StringRef {relative_path.get_path()}) ||
_sparse_column_set_in_stats.size() >
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
if (_statistics &&
(_statistics->sparse_column_non_null_size.contains(relative_path.get_path()) ||
_statistics->sparse_column_non_null_size.size() >
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE)) {
// Sparse column exists or reached sparse size limit, read sparse column
ColumnIterator* inner_iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
Expand Down Expand Up @@ -323,9 +324,13 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF

// init sparse column set in stats
if (self_column_pb.has_variant_statistics()) {
_statistics = std::make_unique<VariantStatistics>();
const auto& variant_stats = self_column_pb.variant_statistics();
for (const auto& [path, _] : variant_stats.sparse_column_non_null_size()) {
_sparse_column_set_in_stats.emplace(path.data(), path.size());
_statistics->sparse_column_non_null_size.emplace(path.data(), path.size());
}
for (const auto& [path, _] : variant_stats.subcolumn_non_null_size()) {
_statistics->subcolumns_non_null_size.emplace(path.data(), path.size());
}
}
return Status::OK();
Expand Down
10 changes: 4 additions & 6 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ class InvertedIndexFileReader;
class PageDecoder;
class RowRanges;
class ZoneMapIndexReader;
// struct SubcolumnReader;
// using SubcolumnColumnReaders = vectorized::SubcolumnsTree<SubcolumnReader>;
struct VariantStatistics;

struct ColumnReaderOptions {
// whether verify checksum when read page
Expand Down Expand Up @@ -311,13 +310,12 @@ class VariantColumnReader : public ColumnReader {

FieldType get_meta_type() override { return FieldType::OLAP_FIELD_TYPE_VARIANT; }

const VariantStatistics* get_stats() const { return _statistics.get(); }

private:
std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers;
std::unique_ptr<ColumnReader> _sparse_column_reader;
// Some sparse column record in stats, use StringRef to reduce memory usage,
// notice: make sure the ref is not released before the ColumnReader is destructed,
// used to decide whether to read from sparse column
std::unordered_set<StringRef> _sparse_column_set_in_stats;
std::unique_ptr<VariantStatistics> _statistics;
};

// Base iterator to read one column data
Expand Down
25 changes: 19 additions & 6 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/rowset/segment_v2/column_reader.h"
#include "vec/columns/column.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
Expand Down Expand Up @@ -82,6 +83,10 @@ Status HierarchicalDataReader::init(const ColumnIteratorOptions& opts) {
RETURN_IF_ERROR(_root_reader->iterator->init(opts));
_root_reader->inited = true;
}
if (_sparse_column_reader && !_sparse_column_reader->inited) {
RETURN_IF_ERROR(_sparse_column_reader->iterator->init(opts));
_sparse_column_reader->inited = true;
}
return Status::OK();
}

Expand Down Expand Up @@ -402,15 +407,23 @@ Status SparseColumnExtractReader::seek_to_ordinal(ordinal_t ord) {
}

void SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& dst) {
vectorized::ColumnNullable* nullable_column = nullptr;
if (dst->is_nullable()) {
nullable_column = assert_cast<vectorized::ColumnNullable*>(dst.get());
}
vectorized::ColumnObject& var =
dst->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
nullable_column != nullptr
? assert_cast<vectorized::ColumnObject&>(nullable_column->get_nested_column())
: assert_cast<vectorized::ColumnObject&>(*dst);
DCHECK(!var.is_null_root());
vectorized::ColumnObject::fill_path_olumn_from_sparse_data(
*var.get_subcolumn({}) /*root*/, StringRef {_path.data(), _path.size()},
if (var.is_null_root()) {
var.add_sub_column({}, dst->size());
}
vectorized::NullMap* null_map =
nullable_column ? &nullable_column->get_null_map_data() : nullptr;
vectorized::ColumnObject::fill_path_column_from_sparse_data(
*var.get_subcolumn({}) /*root*/, null_map, StringRef {_path.data(), _path.size()},
_sparse_column->get_ptr(), 0, _sparse_column->size());
var.incr_num_rows(_sparse_column->size());
_sparse_column->clear();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ class HierarchicalDataReader : public ColumnIterator {
MutableColumnPtr container;
RETURN_IF_ERROR(_init_container(container, nrows));
auto& container_variant = assert_cast<ColumnObject&>(*container);

variant.insert_range_from(container_variant, 0, nrows);

_rows_read += nrows;
Expand Down
23 changes: 7 additions & 16 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,9 @@ Status Segment::_open() {
// 0.01 comes from PrimaryKeyIndexBuilder::init
_meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8;

// collec variant statistics
for (const auto& column_pb : _footer_pb->columns()) {
if (column_pb.has_variant_statistics()) {
_variant_column_stats.try_emplace(column_pb.unique_id(),
column_pb.variant_statistics());
}
}

return Status::OK();
}

const VariantStatisticsPB* Segment::get_stats(int32_t unique_id) const {
auto it = _variant_column_stats.find(unique_id);
if (it == _variant_column_stats.end()) {
return nullptr;
}
return &it->second;
}

Status Segment::_open_inverted_index() {
_inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>(
_fs,
Expand Down Expand Up @@ -828,6 +812,13 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column,
return Status::OK();
}

ColumnReader* Segment::get_column_reader(int32_t col_unique_id) {
if (_column_readers.contains(col_unique_id)) {
return _column_readers[col_unique_id].get();
}
return nullptr;
}

ColumnReader* Segment::_get_column_reader(const TabletColumn& col) {
// init column iterator by path info
if (col.has_path_info() || col.is_variant_type()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class BitmapIndexIterator;
class Segment;
class InvertedIndexIterator;
class InvertedIndexFileReader;
struct VariantStatistics;

using SegmentSharedPtr = std::shared_ptr<Segment>;
// A Segment is used to represent a segment in memory format. When segment is
Expand Down Expand Up @@ -208,7 +209,7 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; }

const VariantStatisticsPB* get_stats(int32_t unique_id) const;
ColumnReader* get_column_reader(int32_t col_unique_id);

private:
DISALLOW_COPY_AND_ASSIGN(Segment);
Expand Down Expand Up @@ -288,7 +289,6 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

int _be_exec_version = BeExecVersionManager::get_newest_version();
OlapReaderStatistics* _pk_index_load_stats = nullptr;
std::unordered_map<int32_t, VariantStatisticsPB> _variant_column_stats;
};

} // namespace segment_v2
Expand Down
38 changes: 22 additions & 16 deletions be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <gen_cpp/segment_v2.pb.h>

#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer_context.h"
Expand Down Expand Up @@ -68,19 +69,24 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::static_pointer_cast<BetaRowset>(reader->rowset()), &segment_cache));
for (const auto& segment : segment_cache.get_segments()) {
const VariantStatisticsPB* source_statistics =
segment->get_stats(_tablet_column->unique_id());
ColumnReader* column_reader = segment->get_column_reader(_tablet_column->unique_id());
if (!column_reader) {
continue;
}
CHECK(column_reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT);
const VariantStatistics* source_statistics =
static_cast<const VariantColumnReader*>(column_reader)->get_stats();
if (!source_statistics) {
continue;
}
for (const auto& [path, size] : source_statistics->subcolumn_non_null_size()) {
for (const auto& [path, size] : source_statistics->subcolumns_non_null_size) {
auto it = path_to_total_number_of_non_null_values.find(path);
if (it == path_to_total_number_of_non_null_values.end()) {
it = path_to_total_number_of_non_null_values.emplace(path, 0).first;
}
it->second += size;
}
for (const auto& [path, size] : source_statistics->sparse_column_non_null_size()) {
for (const auto& [path, size] : source_statistics->sparse_column_non_null_size) {
auto it = path_to_total_number_of_non_null_values.find(path);
if (it == path_to_total_number_of_non_null_values.end()) {
it = path_to_total_number_of_non_null_values.emplace(path, 0).first;
Expand Down Expand Up @@ -201,8 +207,8 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
_subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);

// get stastics
_statistics._subcolumns_non_null_size.emplace(entry->path.get_path(),
entry->data.get_non_null_value_size());
_statistics.subcolumns_non_null_size.emplace(entry->path.get_path(),
entry->data.get_non_null_value_size());
}
return Status::OK();
}
Expand Down Expand Up @@ -239,12 +245,12 @@ Status VariantColumnWriterImpl::_process_sparse_column(
const auto [sparse_data_paths, _] = ptr->get_sparse_data_paths_and_values();
for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
auto path = sparse_data_paths->get_data_at(i);
if (auto it = _statistics._sparse_column_non_null_size.find(path);
it != _statistics._sparse_column_non_null_size.end()) {
if (auto it = _statistics.sparse_column_non_null_size.find(path.to_string());
it != _statistics.sparse_column_non_null_size.end()) {
++it->second;
} else if (_statistics._sparse_column_non_null_size.size() <
} else if (_statistics.sparse_column_non_null_size.size() <
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
_statistics._sparse_column_non_null_size.emplace(path, 1);
_statistics.sparse_column_non_null_size.emplace(path, 1);
}
}

Expand All @@ -253,21 +259,21 @@ Status VariantColumnWriterImpl::_process_sparse_column(
}

void VariantStatistics::to_pb(VariantStatisticsPB* stats) const {
for (const auto& [path, value] : _subcolumns_non_null_size) {
stats->mutable_subcolumn_non_null_size()->emplace(path.to_string(), value);
for (const auto& [path, value] : subcolumns_non_null_size) {
stats->mutable_subcolumn_non_null_size()->emplace(path, value);
}
for (const auto& [path, value] : _sparse_column_non_null_size) {
stats->mutable_sparse_column_non_null_size()->emplace(path.to_string(), value);
for (const auto& [path, value] : sparse_column_non_null_size) {
stats->mutable_sparse_column_non_null_size()->emplace(path, value);
}
}

void VariantStatistics::from_pb(const VariantStatisticsPB& stats) {
// make sure the ref of path, todo not use ref
for (const auto& [path, value] : stats.subcolumn_non_null_size()) {
_subcolumns_non_null_size[StringRef(path.data(), path.size())] = value;
subcolumns_non_null_size[path] = value;
}
for (const auto& [path, value] : stats.sparse_column_non_null_size()) {
_sparse_column_non_null_size[StringRef(path.data(), path.size())] = value;
sparse_column_non_null_size[path] = value;
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class ScalarColumnWriter;
struct VariantStatistics {
// If reached the size of this, we should stop writing statistics for sparse data
constexpr static size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000;
std::map<StringRef, size_t> _subcolumns_non_null_size;
std::map<StringRef, size_t> _sparse_column_non_null_size;
std::unordered_map<std::string, size_t> subcolumns_non_null_size;
std::unordered_map<std::string, size_t> sparse_column_non_null_size;

void to_pb(VariantStatisticsPB* stats) const;
void from_pb(const VariantStatisticsPB& stats);
Expand Down
32 changes: 16 additions & 16 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,6 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
size_t sorted_src_subcolumn_for_sparse_column_idx = 0;
size_t sorted_src_subcolumn_for_sparse_column_size =
sorted_src_subcolumn_for_sparse_column.size();
int null_count = 0;

size_t offset = src_serialized_sparse_column_offsets[row - 1];
size_t end = src_serialized_sparse_column_offsets[row];
Expand All @@ -1379,7 +1378,7 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
subcolumn->insert(data.first, data.second);
} else {
// Before inserting this path into sparse column check if we need to
// insert suibcolumns from sorted_src_subcolumn_for_sparse_column before.
// insert subcolumns from sorted_src_subcolumn_for_sparse_column before.
while (sorted_src_subcolumn_for_sparse_column_idx <
sorted_src_subcolumn_for_sparse_column_size &&
sorted_src_subcolumn_for_sparse_column
Expand All @@ -1390,9 +1389,6 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
bool is_null = false;
src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path,
sparse_column_values, row, is_null);
if (is_null) {
++null_count;
}
}

/// Insert path and value from src sparse column to our sparse column.
Expand All @@ -1409,17 +1405,10 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
bool is_null = false;
src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path,
sparse_column_values, row, is_null);
if (is_null) {
++null_count;
}
}

// All the sparse columns in this row are null.
if (null_count == sorted_src_subcolumn_for_sparse_column.size()) {
serialized_sparse_column->insert_default();
} else {
sparse_column_offsets.push_back(sparse_column_path->size());
}
sparse_column_offsets.push_back(sparse_column_path->size());

// Insert default values in all remaining dense columns.
for (const auto& entry : subcolumns) {
Expand Down Expand Up @@ -2041,6 +2030,7 @@ Status ColumnObject::finalize(FinalizeMode mode) {
new_subcolumns.get_mutable_root()->data.finalize(mode);
} else if (mode == FinalizeMode::WRITE_MODE) {
new_subcolumns.create_root(Subcolumn(num_rows, is_nullable, true));
new_subcolumns.get_mutable_root()->data.finalize(mode);
}

const bool need_pick_subcolumn_to_sparse_column =
Expand Down Expand Up @@ -2493,15 +2483,19 @@ size_t ColumnObject::find_path_lower_bound_in_sparse_data(StringRef path,
return it.index;
}

void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, StringRef path,
const ColumnPtr& sparse_data_column,
size_t start, size_t end) {
void ColumnObject::fill_path_column_from_sparse_data(Subcolumn& subcolumn, NullMap* null_map,
StringRef path,
const ColumnPtr& sparse_data_column,
size_t start, size_t end) {
const auto& sparse_data_map = assert_cast<const ColumnMap&>(*sparse_data_column);
const auto& sparse_data_offsets = sparse_data_map.get_offsets();
size_t first_offset = sparse_data_offsets[static_cast<ssize_t>(start) - 1];
size_t last_offset = sparse_data_offsets[static_cast<ssize_t>(end) - 1];
// Check if we have at least one row with data.
if (first_offset == last_offset) {
if (null_map) {
null_map->resize_fill(end - start, 1);
}
subcolumn.insert_many_defaults(end - start);
return;
}
Expand All @@ -2513,6 +2507,7 @@ void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, String
size_t paths_end = sparse_data_offsets[static_cast<ssize_t>(i)];
auto lower_bound_path_index = ColumnObject::find_path_lower_bound_in_sparse_data(
path, sparse_data_paths, paths_start, paths_end);
bool is_null = false;
if (lower_bound_path_index != paths_end &&
sparse_data_paths.get_data_at(lower_bound_path_index) == path) {
// auto value_data = sparse_data_values.get_data_at(lower_bound_path_index);
Expand All @@ -2521,8 +2516,13 @@ void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, String
const auto& data = ColumnObject::deserialize_from_sparse_column(&sparse_data_values,
lower_bound_path_index);
subcolumn.insert(data.first, data.second);
is_null = false;
} else {
subcolumn.insert_default();
is_null = true;
}
if (null_map) {
null_map->push_back(is_null);
}
}
}
Expand Down
Loading

0 comments on commit 1c62ffe

Please sign in to comment.