Skip to content

Commit

Permalink
[opt](ShortCircuit) add more stats info to trace (apache#42697)
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Nov 6, 2024
1 parent bd327c4 commit f5dd935
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 83 deletions.
56 changes: 27 additions & 29 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = segment->file_reader().get(),
.stats = stats,
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY},
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY,
.file_cache_stats = &stats->file_cache_stats},
};
RETURN_IF_ERROR((*column_iterator)->init(opt));
return Status::OK();
Expand Down Expand Up @@ -441,7 +442,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
const std::vector<RowsetSharedPtr>& specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset, bool with_rowid) {
RowsetSharedPtr* rowset, bool with_rowid,
OlapReaderStatistics* stats) {
SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
size_t seq_col_length = 0;
// use the latest tablet schema to decide if the tablet has sequence column currently
Expand Down Expand Up @@ -489,37 +491,33 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest

for (auto id : picked_segments) {
Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid,
&loc);
if (s.is<KEY_NOT_FOUND>()) {
&loc, stats);
continue;
}
if (s.ok() && _tablet_meta->delete_bitmap().contains_agg_without_cache(
{loc.rowset_id, loc.segment_id, version}, loc.row_id)) {
// if has sequence col, we continue to compare the sequence_id of
// all rowsets, util we find an existing key.
if (schema->has_sequence_col()) {
continue;
}
if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) {
return s;
}
if (s.ok() && _tablet_meta->delete_bitmap().contains_agg_without_cache(
{loc.rowset_id, loc.segment_id, version}, loc.row_id)) {
// if has sequence col, we continue to compare the sequence_id of
// all rowsets, util we find an existing key.
if (schema->has_sequence_col()) {
continue;
}
// The key is deleted, we don't need to search for it any more.
break;
}
// `st` is either OK or KEY_ALREADY_EXISTS now.
// for partial update, even if the key is already exists, we still need to
// read it's original values to keep all columns align.
*row_location = loc;
if (rowset) {
// return it's rowset
*rowset = rs;
}
// find it and return
return s;
// The key is deleted, we don't need to search for it any more.
break;
}
// `st` is either OK or KEY_ALREADY_EXISTS now.
// for partial update, even if the key is already exists, we still need to
// read it's original values to keep all columns align.
*row_location = loc;
if (rowset) {
// return it's rowset
*rowset = rs;
}
// find it and return
return s;
}
g_tablet_pk_not_found << 1;
return Status::Error<ErrorCode::KEY_NOT_FOUND>("can't find key in all rowsets");
}
g_tablet_pk_not_found << 1;
return Status::Error<ErrorCode::KEY_NOT_FOUND>("can't find key in all rowsets");
}

// if user pass a token, then all calculation works will submit to a threadpool,
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ class BaseTablet {
const std::vector<RowsetSharedPtr>& specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset = nullptr, bool with_rowid = true);
RowsetSharedPtr* rowset = nullptr, bool with_rowid = true,
OlapReaderStatistics* stats = nullptr);

// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/primary_key_index.h"

#include <butil/time.h>
#include <gen_cpp/segment_v2.pb.h>

#include <utility>
Expand Down Expand Up @@ -95,7 +96,8 @@ Status PrimaryKeyIndexReader::parse_index(io::FileReaderSPtr file_reader,
// parse primary key index
_index_reader.reset(new segment_v2::IndexedColumnReader(file_reader, meta.primary_key_index()));
_index_reader->set_is_pk_index(true);
RETURN_IF_ERROR(_index_reader->load(!config::disable_pk_storage_page_cache, false));
RETURN_IF_ERROR(_index_reader->load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));

_index_parsed = true;
return Status::OK();
Expand All @@ -107,7 +109,8 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader,
segment_v2::ColumnIndexMetaPB column_index_meta = meta.bloom_filter_index();
segment_v2::BloomFilterIndexReader bf_index_reader(std::move(file_reader),
column_index_meta.bloom_filter_index());
RETURN_IF_ERROR(bf_index_reader.load(!config::disable_pk_storage_page_cache, false));
RETURN_IF_ERROR(bf_index_reader.load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));
std::unique_ptr<segment_v2::BloomFilterIndexIterator> bf_iter;
RETURN_IF_ERROR(bf_index_reader.new_iterator(&bf_iter));
RETURN_IF_ERROR(bf_iter->read_bloom_filter(0, &_bf));
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/primary_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/bloom_filter_index_writer.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
Expand Down Expand Up @@ -97,7 +98,8 @@ class PrimaryKeyIndexBuilder {

class PrimaryKeyIndexReader {
public:
PrimaryKeyIndexReader() : _index_parsed(false), _bf_parsed(false) {}
PrimaryKeyIndexReader(OlapReaderStatistics* pk_index_load_stats = nullptr)
: _index_parsed(false), _bf_parsed(false), _pk_index_load_stats(pk_index_load_stats) {}

~PrimaryKeyIndexReader() {
segment_v2::g_pk_total_bloom_filter_num << -static_cast<int64_t>(_bf_num);
Expand All @@ -111,9 +113,10 @@ class PrimaryKeyIndexReader {

Status parse_bf(io::FileReaderSPtr file_reader, const segment_v2::PrimaryKeyIndexMetaPB& meta);

Status new_iterator(std::unique_ptr<segment_v2::IndexedColumnIterator>* index_iterator) const {
Status new_iterator(std::unique_ptr<segment_v2::IndexedColumnIterator>* index_iterator,
OlapReaderStatistics* stats = nullptr) const {
DCHECK(_index_parsed);
index_iterator->reset(new segment_v2::IndexedColumnIterator(_index_reader.get()));
index_iterator->reset(new segment_v2::IndexedColumnIterator(_index_reader.get(), stats));
return Status::OK();
}

Expand Down Expand Up @@ -152,6 +155,7 @@ class PrimaryKeyIndexReader {
std::unique_ptr<segment_v2::BloomFilter> _bf;
size_t _bf_num = 0;
uint64 _bf_bytes = 0;
OlapReaderStatistics* _pk_index_load_stats = nullptr;
};

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
namespace doris {
namespace segment_v2 {

Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory) {
Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
// TODO yyq: implement a new once flag to avoid status construct.
_index_load_stats = index_load_stats;
return _load_once.call([this, use_page_cache, kept_in_memory] {
return _load(use_page_cache, kept_in_memory);
});
Expand All @@ -42,7 +44,7 @@ Status BloomFilterIndexReader::_load(bool use_page_cache, bool kept_in_memory) {
const IndexedColumnMetaPB& bf_index_meta = _bloom_filter_index_meta->bloom_filter();

_bloom_filter_reader.reset(new IndexedColumnReader(_file_reader, bf_index_meta));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory, _index_load_stats));
return Status::OK();
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class BloomFilterIndexReader {
_bloom_filter_index_meta.reset(new BloomFilterIndexPB(bloom_filter_index_meta));
}

Status load(bool use_page_cache, bool kept_in_memory);
Status load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* _bf_index_load_stats = nullptr);

BloomFilterAlgorithmPB algorithm() { return _bloom_filter_index_meta->algorithm(); }

Expand All @@ -67,6 +68,7 @@ class BloomFilterIndexReader {
const TypeInfo* _type_info = nullptr;
std::unique_ptr<BloomFilterIndexPB> _bloom_filter_index_meta = nullptr;
std::unique_ptr<IndexedColumnReader> _bloom_filter_reader;
OlapReaderStatistics* _index_load_stats = nullptr;
};

class BloomFilterIndexIterator {
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ static bvar::Adder<uint64_t> g_index_reader_memory_bytes("doris_index_reader_mem

using strings::Substitute;

Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
_use_page_cache = use_page_cache;
_kept_in_memory = kept_in_memory;
_index_load_stats = index_load_stats;

_type_info = get_scalar_type_info((FieldType)_meta.data_type());
if (_type_info == nullptr) {
Expand Down Expand Up @@ -105,16 +107,18 @@ Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle*
BlockCompressionCodec* local_compress_codec;
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec));
RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
local_compress_codec, false));
local_compress_codec, false, _index_load_stats));
RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
_mem_size += body.get_size();
return Status::OK();
}

Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body,
PageFooterPB* footer, PageTypePB type,
BlockCompressionCodec* codec, bool pre_decode) const {
BlockCompressionCodec* codec, bool pre_decode,
OlapReaderStatistics* stats) const {
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
PageReadOptions opts {
.use_page_cache = _use_page_cache,
.kept_in_memory = _kept_in_memory,
Expand All @@ -123,9 +127,10 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle,
.file_reader = _file_reader.get(),
.page_pointer = pp,
.codec = codec,
.stats = &tmp_stats,
.stats = stats_ptr,
.encoding_info = _encoding_info,
.io_ctx = io::IOContext {.is_index_data = true},
.io_ctx = io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats},
};
if (_is_pk_index) {
opts.type = PRIMARY_KEY_INDEX_PAGE;
Expand Down Expand Up @@ -154,8 +159,8 @@ Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
PageHandle handle;
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(
_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec, true));
RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec,
true, _stats));
// parse data page
// note that page_index is not used in IndexedColumnIterator, so we pass 0
PageDecoderOptions opts;
Expand Down
15 changes: 11 additions & 4 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/index_page.h"
#include "olap/rowset/segment_v2/page_handle.h"
Expand All @@ -53,11 +54,13 @@ class IndexedColumnReader {

~IndexedColumnReader();

Status load(bool use_page_cache, bool kept_in_memory);
Status load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats = nullptr);

// read a page specified by `pp' from `file' into `handle'
Status read_page(const PagePointer& pp, PageHandle* handle, Slice* body, PageFooterPB* footer,
PageTypePB type, BlockCompressionCodec* codec, bool pre_decode) const;
PageTypePB type, BlockCompressionCodec* codec, bool pre_decode,
OlapReaderStatistics* stats = nullptr) const;

int64_t num_values() const { return _num_values; }
const EncodingInfo* encoding_info() const { return _encoding_info; }
Expand Down Expand Up @@ -95,14 +98,17 @@ class IndexedColumnReader {
const KeyCoder* _value_key_coder = nullptr;
uint64_t _mem_size = 0;
bool _is_pk_index = false;
OlapReaderStatistics* _index_load_stats = nullptr;
};

class IndexedColumnIterator {
public:
explicit IndexedColumnIterator(const IndexedColumnReader* reader)
explicit IndexedColumnIterator(const IndexedColumnReader* reader,
OlapReaderStatistics* stats = nullptr)
: _reader(reader),
_ordinal_iter(&reader->_ordinal_index_reader),
_value_iter(&reader->_value_index_reader) {}
_value_iter(&reader->_value_index_reader),
_stats(stats) {}

// Seek to the given ordinal entry. Entry 0 is the first entry.
// Return Status::Error<ENTRY_NOT_FOUND> if provided seek point is past the end.
Expand Down Expand Up @@ -151,6 +157,7 @@ class IndexedColumnIterator {
ordinal_t _current_ordinal = 0;
// iterator owned compress codec, should NOT be shared by threads, initialized before used
BlockCompressionCodec* _compress_codec = nullptr;
OlapReaderStatistics* _stats = nullptr;
};

} // namespace segment_v2
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ Status Segment::_load_pk_bloom_filter() {
});
}

Status Segment::load_pk_index_and_bf() {
Status Segment::load_pk_index_and_bf(OlapReaderStatistics* index_load_stats) {
_pk_index_load_stats = index_load_stats;
RETURN_IF_ERROR(load_index());
RETURN_IF_ERROR(_load_pk_bloom_filter());
return Status::OK();
Expand All @@ -467,7 +468,7 @@ Status Segment::load_pk_index_and_bf() {
Status Segment::load_index() {
return _load_index_once.call([this] {
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) {
_pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
_pk_index_reader = std::make_unique<PrimaryKeyIndexReader>(_pk_index_load_stats);
RETURN_IF_ERROR(_pk_index_reader->parse_index(_file_reader, *_pk_index_meta));
// _meta_mem_usage += _pk_index_reader->get_memory_size();
return Status::OK();
Expand Down Expand Up @@ -926,7 +927,8 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column,
}

Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema,
bool with_seq_col, bool with_rowid, RowLocation* row_location) {
bool with_seq_col, bool with_rowid, RowLocation* row_location,
OlapReaderStatistics* stats) {
RETURN_IF_ERROR(load_pk_index_and_bf());
bool has_seq_col = latest_schema->has_sequence_col();
bool has_rowid = !latest_schema->cluster_key_idxes().empty();
Expand All @@ -946,7 +948,7 @@ Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_sche
}
bool exact_match = false;
std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator));
RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator, stats));
auto st = index_iterator->seek_at_or_after(&key_without_seq, &exact_match);
if (!st.ok() && !st.is<ErrorCode::ENTRY_NOT_FOUND>()) {
return st;
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
}

Status lookup_row_key(const Slice& key, const TabletSchema* latest_schema, bool with_seq_col,
bool with_rowid, RowLocation* row_location);
bool with_rowid, RowLocation* row_location,
OlapReaderStatistics* stats = nullptr);

Status read_key_by_rowid(uint32_t row_id, std::string* key);

Expand All @@ -140,7 +141,7 @@ class Segment : public std::enable_shared_from_this<Segment> {

Status load_index();

Status load_pk_index_and_bf();
Status load_pk_index_and_bf(OlapReaderStatistics* index_load_stats = nullptr);

void update_healthy_status(Status new_status) { _healthy_status.update(new_status); }
// The segment is loaded into SegmentCache and then will load indices, if there are something wrong
Expand Down Expand Up @@ -294,6 +295,7 @@ class Segment : public std::enable_shared_from_this<Segment> {
InvertedIndexFileInfo _idx_file_info;

int _be_exec_version = BeExecVersionManager::get_newest_version();
OlapReaderStatistics* _pk_index_load_stats = nullptr;
};

} // namespace segment_v2
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "olap/segment_loader.h"

#include <butil/time.h>

#include "common/config.h"
#include "common/status.h"
#include "olap/olap_define.h"
Expand Down Expand Up @@ -52,7 +54,8 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {

Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool use_cache,
bool need_load_pk_index_and_bf) {
bool need_load_pk_index_and_bf,
OlapReaderStatistics* index_load_stats) {
if (cache_handle->is_inited()) {
return Status::OK();
}
Expand All @@ -70,7 +73,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf());
RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
}
if (use_cache && !config::disable_segment_cache) {
// memory of SegmentCache::CacheValue will be handled by SegmentCache
Expand Down
Loading

0 comments on commit f5dd935

Please sign in to comment.