Skip to content

Commit

Permalink
[opt](inverted index) Add NumInvertedIndexRemoteIOTotal statistics in…
Browse files Browse the repository at this point in the history
… profile (apache#44863)

Add NumInvertedIndexRemoteIOTotal to count the number of remote IO
operations in the inverted index
Related PR: apache#43542
  • Loading branch information
zzzxl1993 committed Dec 19, 2024
1 parent ecdd44e commit 6f2f8e0
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 15 deletions.
5 changes: 5 additions & 0 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct FileCacheProfile {
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
RuntimeProfile::Counter* num_inverted_index_remote_io_total = nullptr;
RuntimeProfile::Counter* local_io_timer = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
Expand All @@ -90,6 +91,8 @@ struct FileCacheProfileReporter {
cache_profile, 1);
num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
num_inverted_index_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "NumInvertedIndexRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1);
remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1);
write_cache_io_timer =
Expand All @@ -107,6 +110,8 @@ struct FileCacheProfileReporter {
void update(const FileCacheStatistics* statistics) const {
COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total);
COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total);
COUNTER_UPDATE(num_inverted_index_remote_io_total,
statistics->num_inverted_index_remote_io_total);
COUNTER_UPDATE(local_io_timer, statistics->local_io_timer);
COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer);
COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer);
Expand Down
8 changes: 6 additions & 2 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
_update_state(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
Expand Down Expand Up @@ -312,14 +312,18 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}

void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
FileCacheStatistics* statis) const {
FileCacheStatistics* statis,
bool is_inverted_index) const {
if (statis == nullptr) {
return;
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
if (is_inverted_index) {
statis->num_inverted_index_remote_io_total++;
}
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class CachedRemoteFileReader final : public FileReader {
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
};
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state) const;
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
};

} // namespace doris::io
2 changes: 2 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace io {
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
int64_t num_inverted_index_remote_io_total = 0;
int64_t local_io_timer = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
Expand All @@ -60,6 +61,7 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
bool is_inverted_index = false;
};

} // namespace io
Expand Down
11 changes: 9 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,19 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
base->setIoContext(_io_ctx);

if (_io_ctx) {
base->setIoContext(_io_ctx);
}

base->setIndexFile(_is_index_file);
base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
base->setIoContext(nullptr);

if (_io_ctx) {
base->setIoContext(nullptr);
}
}

CSIndexInput::~CSIndexInput() = default;
Expand Down
15 changes: 11 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,27 @@

namespace doris::segment_v2 {

Status InvertedIndexFileReader::init(int32_t read_buffer_size) {
Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) {
if (!_inited) {
_read_buffer_size = read_buffer_size;
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
auto st = _init_from_v2(read_buffer_size);
if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
auto st = _init_from(read_buffer_size, io_ctx);
if (!st.ok()) {
return st;
}
}
_inited = true;
} else {
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(io_ctx);
}
}
}
return Status::OK();
}

Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) {
Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) {
auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);

std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
Expand Down Expand Up @@ -76,6 +82,7 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) {
err.what());
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);

// 3. read file
int32_t version = _stream->readInt(); // Read version number
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class InvertedIndexFileReader {
_storage_format(storage_format),
_idx_file_info(idx_file_info) {}

Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size);
Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size,
const io::IOContext* io_ctx = nullptr);
Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta) const;
void debug_file_entries();
std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
Expand All @@ -70,7 +71,7 @@ class InvertedIndexFileReader {
int64_t get_inverted_file_size() const { return _stream == nullptr ? 0 : _stream->length(); }

private:
Status _init_from_v2(int32_t read_buffer_size);
Status _init_from(int32_t read_buffer_size, const io::IOContext* io_ctx);
Result<std::unique_ptr<DorisCompoundReader>> _open(int64_t index_id,
const std::string& index_suffix) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput
: BufferedIndexInput(buffer_size) {
this->_pos = 0;
this->_handle = std::move(handle);
_io_ctx.is_inverted_index = true;
}

protected:
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx,

if (!dir) {
// TODO: ugly code here, try to refact.
auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
io_ctx);
if (!st.ok()) {
LOG(WARNING) << st;
return st;
Expand All @@ -137,7 +138,6 @@ Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx,
InvertedIndexDescriptor::get_temporary_null_bitmap_file_name();
if (dir->fileExists(null_bitmap_file_name)) {
null_bitmap_in = dir->openInput(null_bitmap_file_name);
null_bitmap_in->setIoContext(io_ctx);
size_t null_bitmap_size = null_bitmap_in->length();
faststring buf;
buf.resize(null_bitmap_size);
Expand Down Expand Up @@ -180,7 +180,8 @@ Status InvertedIndexReader::handle_searcher_cache(
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_open_timer);
IndexSearcherPtr searcher;

auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
auto st =
_inverted_index_file_reader->init(config::inverted_index_read_buffer_size, io_ctx);
if (!st.ok()) {
LOG(WARNING) << st;
return st;
Expand Down Expand Up @@ -211,6 +212,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
auto searcher_result = DORIS_TRY(index_searcher_builder->get_index_searcher(dir));
*searcher = searcher_result;

// When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries.
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);

// NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ suite("test_index_io_context", "nonConcurrent") {

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexInput::readInternal")

qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """
qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """
qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """
Expand Down

0 comments on commit 6f2f8e0

Please sign in to comment.