Skip to content

Commit

Permalink
[opt](profile) add index page profile for io
Browse files Browse the repository at this point in the history
  • Loading branch information
airborne12 committed Dec 20, 2024
1 parent db3aff9 commit 51747fe
Show file tree
Hide file tree
Showing 22 changed files with 163 additions and 132 deletions.
6 changes: 3 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest

for (auto id : picked_segments) {
Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid,
&loc, encoded_seq_value, stats);
&loc, stats, encoded_seq_value);
if (s.is<KEY_NOT_FOUND>()) {
continue;
}
Expand Down Expand Up @@ -615,7 +615,7 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
vectorized::Block ordered_block = block.clone_empty();
uint32_t pos = 0;

RETURN_IF_ERROR(seg->load_pk_index_and_bf()); // We need index blocks to iterate
RETURN_IF_ERROR(seg->load_pk_index_and_bf(nullptr)); // We need index blocks to iterate
const auto* pk_idx = seg->get_primary_key_index();
int total = pk_idx->num_rows();
uint32_t row_id = 0;
Expand All @@ -629,7 +629,7 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
while (remaining > 0) {
std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
RETURN_IF_ERROR(pk_idx->new_iterator(&iter));
RETURN_IF_ERROR(pk_idx->new_iterator(&iter, nullptr));

size_t num_to_read = std::min(batch_size, remaining);
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/delete_bitmap_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,11 @@ Status MergeIndexDeleteBitmapCalculator::init(RowsetId rowset_id,
MergeIndexDeleteBitmapCalculatorContext::Comparator(seq_col_length, _rowid_length);
_contexts.reserve(segments.size());
_heap = std::make_unique<Heap>(_comparator);

for (auto& segment : segments) {
RETURN_IF_ERROR(segment->load_index());
RETURN_IF_ERROR(segment->load_index(nullptr));
auto pk_idx = segment->get_primary_key_index();
std::unique_ptr<segment_v2::IndexedColumnIterator> index;
RETURN_IF_ERROR(pk_idx->new_iterator(&index));
RETURN_IF_ERROR(pk_idx->new_iterator(&index, nullptr));
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
pk_idx->type_info()->type(), 1, 0);
_contexts.emplace_back(std::move(index), index_type, segment->id(), pk_idx->num_rows());
Expand Down
12 changes: 7 additions & 5 deletions be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,29 @@ Status PrimaryKeyIndexBuilder::finalize(segment_v2::PrimaryKeyIndexMetaPB* meta)
}

Status PrimaryKeyIndexReader::parse_index(io::FileReaderSPtr file_reader,
const segment_v2::PrimaryKeyIndexMetaPB& meta) {
const segment_v2::PrimaryKeyIndexMetaPB& meta,
OlapReaderStatistics* pk_index_load_stats) {
// parse primary key index
_index_reader.reset(new segment_v2::IndexedColumnReader(file_reader, meta.primary_key_index()));
_index_reader->set_is_pk_index(true);
RETURN_IF_ERROR(_index_reader->load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));
pk_index_load_stats));

_index_parsed = true;
return Status::OK();
}

Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader,
const segment_v2::PrimaryKeyIndexMetaPB& meta) {
const segment_v2::PrimaryKeyIndexMetaPB& meta,
OlapReaderStatistics* pk_index_load_stats) {
// parse bloom filter
segment_v2::ColumnIndexMetaPB column_index_meta = meta.bloom_filter_index();
segment_v2::BloomFilterIndexReader bf_index_reader(std::move(file_reader),
column_index_meta.bloom_filter_index());
RETURN_IF_ERROR(bf_index_reader.load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));
pk_index_load_stats));
std::unique_ptr<segment_v2::BloomFilterIndexIterator> bf_iter;
RETURN_IF_ERROR(bf_index_reader.new_iterator(&bf_iter));
RETURN_IF_ERROR(bf_index_reader.new_iterator(&bf_iter, pk_index_load_stats));
RETURN_IF_ERROR(bf_iter->read_bloom_filter(0, &_bf));
segment_v2::g_pk_total_bloom_filter_num << 1;
segment_v2::g_pk_total_bloom_filter_total_bytes << _bf->size();
Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/primary_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ class PrimaryKeyIndexBuilder {

class PrimaryKeyIndexReader {
public:
PrimaryKeyIndexReader(OlapReaderStatistics* pk_index_load_stats = nullptr)
: _index_parsed(false), _bf_parsed(false), _pk_index_load_stats(pk_index_load_stats) {}
PrimaryKeyIndexReader() : _index_parsed(false), _bf_parsed(false) {}

~PrimaryKeyIndexReader() {
segment_v2::g_pk_total_bloom_filter_num << -static_cast<int64_t>(_bf_num);
Expand All @@ -109,12 +108,14 @@ class PrimaryKeyIndexReader {
}

Status parse_index(io::FileReaderSPtr file_reader,
const segment_v2::PrimaryKeyIndexMetaPB& meta);
const segment_v2::PrimaryKeyIndexMetaPB& meta,
OlapReaderStatistics* pk_index_load_stats);

Status parse_bf(io::FileReaderSPtr file_reader, const segment_v2::PrimaryKeyIndexMetaPB& meta);
Status parse_bf(io::FileReaderSPtr file_reader, const segment_v2::PrimaryKeyIndexMetaPB& meta,
OlapReaderStatistics* pk_index_load_stats);

Status new_iterator(std::unique_ptr<segment_v2::IndexedColumnIterator>* index_iterator,
OlapReaderStatistics* stats = nullptr) const {
OlapReaderStatistics* stats) const {
DCHECK(_index_parsed);
index_iterator->reset(new segment_v2::IndexedColumnIterator(_index_reader.get(), stats));
return Status::OK();
Expand Down Expand Up @@ -155,7 +156,6 @@ class PrimaryKeyIndexReader {
std::unique_ptr<segment_v2::BloomFilter> _bf;
size_t _bf_num = 0;
uint64 _bf_bytes = 0;
OlapReaderStatistics* _pk_index_load_stats = nullptr;
};

} // namespace doris
16 changes: 8 additions & 8 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ namespace segment_v2 {
Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
// TODO yyq: implement a new once flag to avoid status construct.
_index_load_stats = index_load_stats;
return _load_once.call([this, use_page_cache, kept_in_memory] {
return _load(use_page_cache, kept_in_memory);
return _load_once.call([this, use_page_cache, kept_in_memory, index_load_stats] {
return _load(use_page_cache, kept_in_memory, index_load_stats);
});
}

Expand All @@ -45,20 +44,21 @@ int64_t BloomFilterIndexReader::get_metadata_size() const {
(_bloom_filter_index_meta ? _bloom_filter_index_meta->ByteSizeLong() : 0);
}

Status BloomFilterIndexReader::_load(bool use_page_cache, bool kept_in_memory) {
Status BloomFilterIndexReader::_load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
const IndexedColumnMetaPB& bf_index_meta = _bloom_filter_index_meta->bloom_filter();

_bloom_filter_reader.reset(new IndexedColumnReader(_file_reader, bf_index_meta));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory, _index_load_stats));
update_metadata_size();
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory, index_load_stats));
return Status::OK();
}

Status BloomFilterIndexReader::new_iterator(std::unique_ptr<BloomFilterIndexIterator>* iterator) {
Status BloomFilterIndexReader::new_iterator(std::unique_ptr<BloomFilterIndexIterator>* iterator,
OlapReaderStatistics* index_load_stats) {
DBUG_EXECUTE_IF("BloomFilterIndexReader::new_iterator.fail", {
return Status::InternalError("new_iterator for bloom filter index failed");
});
iterator->reset(new BloomFilterIndexIterator(this));
iterator->reset(new BloomFilterIndexIterator(this, index_load_stats));
return Status::OK();
}

Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,18 @@ class BloomFilterIndexReader : public MetadataAdder<BloomFilterIndexReader> {
}

Status load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* _bf_index_load_stats = nullptr);
OlapReaderStatistics* bf_index_load_stats);

BloomFilterAlgorithmPB algorithm() { return _bloom_filter_index_meta->algorithm(); }

// create a new column iterator.
Status new_iterator(std::unique_ptr<BloomFilterIndexIterator>* iterator);
Status new_iterator(std::unique_ptr<BloomFilterIndexIterator>* iterator,
OlapReaderStatistics* index_load_stats = nullptr);

const TypeInfo* type_info() const { return _type_info; }

private:
Status _load(bool use_page_cache, bool kept_in_memory);
Status _load(bool use_page_cache, bool kept_in_memory, OlapReaderStatistics* index_load_stats);

int64_t get_metadata_size() const override;

Expand All @@ -70,13 +71,12 @@ class BloomFilterIndexReader : public MetadataAdder<BloomFilterIndexReader> {
const TypeInfo* _type_info = nullptr;
std::unique_ptr<BloomFilterIndexPB> _bloom_filter_index_meta = nullptr;
std::unique_ptr<IndexedColumnReader> _bloom_filter_reader;
OlapReaderStatistics* _index_load_stats = nullptr;
};

class BloomFilterIndexIterator {
public:
explicit BloomFilterIndexIterator(BloomFilterIndexReader* reader)
: _reader(reader), _bloom_filter_iter(reader->_bloom_filter_reader.get()) {}
explicit BloomFilterIndexIterator(BloomFilterIndexReader* reader, OlapReaderStatistics* stats)
: _reader(reader), _bloom_filter_iter(reader->_bloom_filter_reader.get(), stats) {}

// Read bloom filter at the given ordinal into `bf`.
Status read_bloom_filter(rowid_t ordinal, std::unique_ptr<BloomFilter>* bf);
Expand Down
62 changes: 36 additions & 26 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,12 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag

Status ColumnReader::get_row_ranges_by_zone_map(
const AndBlockColumnPredicate* col_predicates,
const std::vector<const ColumnPredicate*>* delete_predicates, RowRanges* row_ranges) {
const std::vector<const ColumnPredicate*>* delete_predicates, RowRanges* row_ranges,
const ColumnIteratorOptions& iter_opts) {
std::vector<uint32_t> page_indexes;
RETURN_IF_ERROR(_get_filtered_pages(col_predicates, delete_predicates, &page_indexes));
RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, row_ranges));
RETURN_IF_ERROR(
_get_filtered_pages(col_predicates, delete_predicates, &page_indexes, iter_opts));
RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, row_ranges, iter_opts));
return Status::OK();
}

Expand Down Expand Up @@ -514,8 +516,8 @@ bool ColumnReader::_zone_map_match_condition(const ZoneMapPB& zone_map,
Status ColumnReader::_get_filtered_pages(
const AndBlockColumnPredicate* col_predicates,
const std::vector<const ColumnPredicate*>* delete_predicates,
std::vector<uint32_t>* page_indexes) {
RETURN_IF_ERROR(_load_zone_map_index(_use_index_page_cache, _opts.kept_in_memory));
std::vector<uint32_t>* page_indexes, const ColumnIteratorOptions& iter_opts) {
RETURN_IF_ERROR(_load_zone_map_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts));

FieldType type = _type_info->type();
const std::vector<ZoneMapPB>& zone_maps = _zone_map_index->page_zone_maps();
Expand Down Expand Up @@ -553,9 +555,10 @@ Status ColumnReader::_get_filtered_pages(
}

Status ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>& page_indexes,
RowRanges* row_ranges) {
RowRanges* row_ranges,
const ColumnIteratorOptions& iter_opts) {
row_ranges->clear();
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory));
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts));
for (auto i : page_indexes) {
ordinal_t page_first_id = _ordinal_index->get_first_ordinal(i);
ordinal_t page_last_id = _ordinal_index->get_last_ordinal(i);
Expand All @@ -566,12 +569,14 @@ Status ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>& page_ind
}

Status ColumnReader::get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
RowRanges* row_ranges) {
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory));
RETURN_IF_ERROR(_load_bloom_filter_index(_use_index_page_cache, _opts.kept_in_memory));
RowRanges* row_ranges,
const ColumnIteratorOptions& iter_opts) {
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts));
RETURN_IF_ERROR(
_load_bloom_filter_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts));
RowRanges bf_row_ranges;
std::unique_ptr<BloomFilterIndexIterator> bf_iter;
RETURN_IF_ERROR(_bloom_filter_index->new_iterator(&bf_iter));
RETURN_IF_ERROR(_bloom_filter_index->new_iterator(&bf_iter, iter_opts.stats));
size_t range_size = row_ranges->range_size();
// get covered page ids
std::set<uint32_t> page_ids;
Expand All @@ -598,16 +603,18 @@ Status ColumnReader::get_row_ranges_by_bloom_filter(const AndBlockColumnPredicat
return Status::OK();
}

Status ColumnReader::_load_ordinal_index(bool use_page_cache, bool kept_in_memory) {
Status ColumnReader::_load_ordinal_index(bool use_page_cache, bool kept_in_memory,
const ColumnIteratorOptions& iter_opts) {
if (!_ordinal_index) {
return Status::InternalError("ordinal_index not inited");
}
return _ordinal_index->load(use_page_cache, kept_in_memory);
return _ordinal_index->load(use_page_cache, kept_in_memory, iter_opts.stats);
}

Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memory) {
Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memory,
const ColumnIteratorOptions& iter_opts) {
if (_zone_map_index != nullptr) {
return _zone_map_index->load(use_page_cache, kept_in_memory);
return _zone_map_index->load(use_page_cache, kept_in_memory, iter_opts.stats);
}
return Status::OK();
}
Expand Down Expand Up @@ -681,24 +688,27 @@ bool ColumnReader::has_bloom_filter_index(bool ngram) const {
}
}

Status ColumnReader::_load_bloom_filter_index(bool use_page_cache, bool kept_in_memory) {
Status ColumnReader::_load_bloom_filter_index(bool use_page_cache, bool kept_in_memory,
const ColumnIteratorOptions& iter_opts) {
if (_bloom_filter_index != nullptr) {
return _bloom_filter_index->load(use_page_cache, kept_in_memory);
return _bloom_filter_index->load(use_page_cache, kept_in_memory, iter_opts.stats);
}
return Status::OK();
}

Status ColumnReader::seek_to_first(OrdinalPageIndexIterator* iter) {
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory));
Status ColumnReader::seek_to_first(OrdinalPageIndexIterator* iter,
const ColumnIteratorOptions& iter_opts) {
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts));
*iter = _ordinal_index->begin();
if (!iter->valid()) {
return Status::NotFound("Failed to seek to first rowid");
}
return Status::OK();
}

Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter) {
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory));
Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter,
const ColumnIteratorOptions& iter_opts) {
RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts));
*iter = _ordinal_index->seek_at_or_before(ordinal);
if (!iter->valid()) {
return Status::NotFound("Failed to seek to ordinal {}, ", ordinal);
Expand Down Expand Up @@ -1172,7 +1182,7 @@ Status FileColumnIterator::init(const ColumnIteratorOptions& opts) {
FileColumnIterator::~FileColumnIterator() = default;

Status FileColumnIterator::seek_to_first() {
RETURN_IF_ERROR(_reader->seek_to_first(&_page_iter));
RETURN_IF_ERROR(_reader->seek_to_first(&_page_iter, _opts));
RETURN_IF_ERROR(_read_data_page(_page_iter));

_seek_to_pos_in_page(&_page, 0);
Expand All @@ -1183,7 +1193,7 @@ Status FileColumnIterator::seek_to_first() {
Status FileColumnIterator::seek_to_ordinal(ordinal_t ord) {
// if current page contains this row, we don't need to seek
if (!_page || !_page.contains(ord) || !_page_iter.valid()) {
RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter));
RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter, _opts));
RETURN_IF_ERROR(_read_data_page(_page_iter));
}
_seek_to_pos_in_page(&_page, ord - _page.first_ordinal);
Expand Down Expand Up @@ -1431,8 +1441,8 @@ Status FileColumnIterator::get_row_ranges_by_zone_map(
const AndBlockColumnPredicate* col_predicates,
const std::vector<const ColumnPredicate*>* delete_predicates, RowRanges* row_ranges) {
if (_reader->has_zone_map()) {
RETURN_IF_ERROR(
_reader->get_row_ranges_by_zone_map(col_predicates, delete_predicates, row_ranges));
RETURN_IF_ERROR(_reader->get_row_ranges_by_zone_map(col_predicates, delete_predicates,
row_ranges, _opts));
}
return Status::OK();
}
Expand All @@ -1441,7 +1451,7 @@ Status FileColumnIterator::get_row_ranges_by_bloom_filter(
const AndBlockColumnPredicate* col_predicates, RowRanges* row_ranges) {
if ((col_predicates->can_do_bloom_filter(false) && _reader->has_bloom_filter_index(false)) ||
(col_predicates->can_do_bloom_filter(true) && _reader->has_bloom_filter_index(true))) {
RETURN_IF_ERROR(_reader->get_row_ranges_by_bloom_filter(col_predicates, row_ranges));
RETURN_IF_ERROR(_reader->get_row_ranges_by_bloom_filter(col_predicates, row_ranges, _opts));
}
return Status::OK();
}
Expand Down
Loading

0 comments on commit 51747fe

Please sign in to comment.