Skip to content

Commit

Permalink
[opt](inverted index) add performance profiling for remote io access …
Browse files Browse the repository at this point in the history
…in inverted index
  • Loading branch information
zzzxl1993 committed Nov 11, 2024
1 parent 90e49d2 commit 028c8fd
Show file tree
Hide file tree
Showing 26 changed files with 377 additions and 123 deletions.
2 changes: 1 addition & 1 deletion be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void search(lucene::store::Directory* dir, std::string& field, std::string& toke
std::vector<std::string> terms = split(token, '|');

doris::TQueryOptions queryOptions;
ConjunctionQuery conjunct_query(s, queryOptions);
ConjunctionQuery conjunct_query(s, queryOptions, nullptr);
conjunct_query.add(field_ws, terms);
conjunct_query.search(result);

Expand Down
7 changes: 2 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,8 @@ Status Compaction::do_inverted_index_compaction() {
fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(seg_id));
bool open_idx_file_cache = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache),
inverted_index_file_reader->init(nullptr, config::inverted_index_read_buffer_size),
"inverted_index_file_reader init failed");
inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader);
}
Expand Down Expand Up @@ -764,9 +762,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(i));
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(
config::inverted_index_read_buffer_size, open_idx_file_cache);
nullptr, config::inverted_index_read_buffer_size);
index_file_path = inverted_index_file_reader->get_index_file_path(index_meta);
if (!st.ok()) {
LOG(WARNING) << "init index " << index_file_path << " error:" << st;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ Status BetaRowset::show_nested_index_file(rapidjson::Value* rowset_value,
auto index_file_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(seg_path);
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs, std::string(index_file_path_prefix), storage_format);
RETURN_IF_ERROR(inverted_index_file_reader->init());
RETURN_IF_ERROR(inverted_index_file_reader->init(nullptr));
auto dirs = inverted_index_file_reader->get_all_directories();

auto add_file_info_to_json = [&](const std::string& path,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ Status ColumnReader::new_inverted_index_iterator(
{
std::shared_lock<std::shared_mutex> rlock(_load_index_lock);
if (_inverted_index) {
RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.stats,
RETURN_IF_ERROR(_inverted_index->new_iterator(&read_options.io_ctx, read_options.stats,
read_options.runtime_state, iterator));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
namespace doris::segment_v2 {

ConjunctionQuery::ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_io_ctx(io_ctx),
_index_version(_searcher->getReader()->getIndexVersion()),
_conjunction_ratio(query_options.inverted_index_conjunction_opt_threshold) {}

Expand All @@ -48,7 +49,7 @@ void ConjunctionQuery::add(const std::wstring& field_name, const std::vector<std
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
iterators.emplace_back(term_doc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris::segment_v2 {
class ConjunctionQuery : public Query {
public:
ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~ConjunctionQuery() override;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand All @@ -41,6 +41,7 @@ class ConjunctionQuery : public Query {

public:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

IndexVersion _index_version = IndexVersion::kV0;
int32_t _conjunction_ratio = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
namespace doris::segment_v2 {

DisjunctionQuery::DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
: _searcher(searcher) {}
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher), _io_ctx(io_ctx) {}

void DisjunctionQuery::add(const std::wstring& field_name, const std::vector<std::string>& terms) {
if (terms.empty()) {
Expand All @@ -36,7 +36,7 @@ void DisjunctionQuery::search(roaring::Roaring& roaring) {
auto func = [this, &roaring](const std::string& term, bool first) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
auto* t = _CLNEW Term(_field_name.c_str(), ws_term.c_str());
auto* term_doc = _searcher->getReader()->termDocs(t);
auto* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
TermIterator iterator(term_doc);

DocRange doc_range;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ namespace doris::segment_v2 {
class DisjunctionQuery : public Query {
public:
DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~DisjunctionQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
void search(roaring::Roaring& roaring) override;

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

std::wstring _field_name;
std::vector<std::string> _terms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace doris::segment_v2 {

PhraseEdgeQuery::PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhraseEdgeQuery : public Query {
public:
PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhraseEdgeQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
namespace doris::segment_v2 {

PhrasePrefixQuery::PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options,
const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhrasePrefixQuery : public Query {
public:
PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhrasePrefixQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ bool OrderedSloppyPhraseMatcher::stretch_to_order(PostingsAndPosition* prev_post
}

PhraseQuery::PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
: _searcher(searcher) {}
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher), _io_ctx(io_ctx) {}

PhraseQuery::~PhraseQuery() {
for (auto& term_doc : _term_docs) {
Expand Down Expand Up @@ -173,7 +173,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(terms[0]);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
_lead1 = TermIterator(term_doc);
return;
Expand All @@ -185,7 +185,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermPositions* term_pos = _searcher->getReader()->termPositions(t);
TermPositions* term_pos = _searcher->getReader()->termPositions(t, _io_ctx);
_term_docs.push_back(term_pos);
if (is_save_iter) {
iterators.emplace_back(term_pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ using Matcher = std::variant<ExactPhraseMatcher, OrderedSloppyPhraseMatcher>;
class PhraseQuery : public Query {
public:
PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhraseQuery() override;

void add(const InvertedIndexQueryInfo& query_info) override;
Expand All @@ -112,6 +112,7 @@ class PhraseQuery : public Query {

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

TermIterator _lead1;
TermIterator _lead2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
namespace doris::segment_v2 {

RegexpQuery::RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_max_expansions(query_options.inverted_index_max_expansions),
_query(searcher, query_options) {}
_query(searcher, query_options, io_ctx) {}

void RegexpQuery::add(const std::wstring& field_name, const std::vector<std::string>& patterns) {
if (patterns.size() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace doris::segment_v2 {
class RegexpQuery : public Query {
public:
RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~RegexpQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& patterns) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
CL_NS(store)::IndexInput* base;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;

protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
Expand All @@ -75,6 +76,7 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
const char* getDirectoryType() const override { return DorisCompoundReader::getClassName(); }
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
};

CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
Expand All @@ -92,6 +94,7 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
base->setIoContext(_io_ctx);
base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
Expand All @@ -111,6 +114,10 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone

void CSIndexInput::close() {}

void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
Expand Down Expand Up @@ -278,6 +285,7 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput*
}

ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, bufferSize);
ret->setIoContext(_stream->getIoContext());
return true;
}

Expand Down
19 changes: 15 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,24 @@

namespace doris::segment_v2 {

Status InvertedIndexFileReader::init(int32_t read_buffer_size, bool open_idx_file_cache) {
Status InvertedIndexFileReader::init(const io::IOContext* io_ctx, int32_t read_buffer_size) {
_io_ctx = io_ctx;
if (!_inited) {
_read_buffer_size = read_buffer_size;
_open_idx_file_cache = open_idx_file_cache;
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
auto st = _init_from_v2(read_buffer_size);
if (!st.ok()) {
return st;
}
}
_inited = true;
} else {
// a new query requires setting a new io_ctx.
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(_io_ctx);
}
}
}
return Status::OK();
}
Expand Down Expand Up @@ -76,7 +83,9 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) {
"CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path,
err.what());
}
index_input->setIdxFileCache(_open_idx_file_cache);
if (index_input) {
index_input->setIoContext(_io_ctx);
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);

// 3. read file
Expand Down Expand Up @@ -196,9 +205,11 @@ Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open(
"CLuceneError occur when open idx file {}, error msg: {}", index_file_path,
err.what()));
}
if (index_input) {
index_input->setIoContext(_io_ctx);
}

// 3. read file in DorisCompoundReader
index_input->setIdxFileCache(_open_idx_file_cache);
compound_reader = std::make_unique<DorisCompoundReader>(index_input, _read_buffer_size);
} catch (CLuceneError& err) {
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class InvertedIndexFileReader {
_storage_format(storage_format),
_idx_file_info(idx_file_info) {}

Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size,
bool open_idx_file_cache = false);
Status init(const io::IOContext* io_ctx,
int32_t read_buffer_size = config::inverted_index_read_buffer_size);
Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta) const;
void debug_file_entries();
std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
Expand All @@ -80,11 +80,11 @@ class InvertedIndexFileReader {
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
int32_t _read_buffer_size = -1;
bool _open_idx_file_cache = false;
InvertedIndexStorageFormatPB _storage_format;
mutable std::shared_mutex _mutex; // Use mutable for const read operations
bool _inited = false;
InvertedIndexFileInfo _idx_file_info;
const io::IOContext* _io_ctx = nullptr;
};

} // namespace segment_v2
Expand Down
36 changes: 34 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ void DorisFSDirectory::FSIndexInput::close() {
}*/
}

void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

const void* DorisFSDirectory::FSIndexInput::getIoContext() {
return _io_ctx;
}

void DorisFSDirectory::FSIndexInput::seekInternal(const int64_t position) {
CND_PRECONDITION(position >= 0 && position < _handle->_length, "Seeking out of range");
_pos = position;
Expand All @@ -238,8 +246,32 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len)

Slice result {b, (size_t)len};
size_t bytes_read = 0;
if (!_handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx).ok()) {
_CLTHROWA(CL_ERR_IO, "read past EOF");

DBUG_EXECUTE_IF(
"DorisFSDirectory::FSIndexInput::readInternal", ({
static thread_local std::unordered_map<const TUniqueId*, io::FileCacheStatistics*>
thread_file_cache_map;
if (_io_ctx) {
auto it = thread_file_cache_map.find(_io_ctx->query_id);
if (it != thread_file_cache_map.end()) {
if (_io_ctx->file_cache_stats != it->second) {
_CLTHROWA(CL_ERR_IO, "File cache statistics mismatch");
}
} else {
thread_file_cache_map[_io_ctx->query_id] = _io_ctx->file_cache_stats;
}
}
}));

if (_io_ctx) {
if (!_handle->_reader->read_at(_pos, result, &bytes_read, _io_ctx).ok()) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
} else {
io::IOContext io_ctx;
if (!_handle->_reader->read_at(_pos, result, &bytes_read, &io_ctx).ok()) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
}
bufferLength = len;
if (bytes_read != len) {
Expand Down
Loading

0 comments on commit 028c8fd

Please sign in to comment.