Skip to content

Commit

Permalink
[opt](inverted index) inverted Index File Cache Queue Optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Dec 26, 2024
1 parent 041db85 commit 7b8d12e
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 38 deletions.
2 changes: 1 addition & 1 deletion be/src/clucene
41 changes: 30 additions & 11 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,18 @@ namespace segment_v2 {
class CSIndexInput : public lucene::store::BufferedIndexInput {
private:
CL_NS(store)::IndexInput* base;
std::string file_name;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;
bool _is_index_file = false; // Indicates if the file is a TII file

protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
void seekInternal(const int64_t /*pos*/) override {}

public:
CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, const int64_t length,
CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
CSIndexInput(const CSIndexInput& clone);
~CSIndexInput() override;
Expand All @@ -78,13 +79,14 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
void setIndexFile(bool isIndexFile) override;
};

CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
const int64_t length, const int32_t read_buffer_size)
CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size)
: BufferedIndexInput(read_buffer_size) {
this->base = base;
this->file_name = file_name;
this->fileOffset = fileOffset;
this->_length = length;
}
Expand All @@ -101,7 +103,27 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
base->setIoContext(_io_ctx);
}

base->setIndexFile(_is_index_file);
DBUG_EXECUTE_IF("CSIndexInput.readInternal", {
for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (!static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be true for inverted index meta "
"files.");
}
}
}
for (const auto& entry : InvertedIndexDescriptor::normal_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be false for non-meta inverted "
"index files.");
}
}
}
});

base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
Expand All @@ -119,6 +141,7 @@ lucene::store::IndexInput* CSIndexInput::clone() const {

CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone) {
this->base = clone.base;
this->file_name = clone.file_name;
this->fileOffset = clone.fileOffset;
this->_length = clone._length;
}
Expand All @@ -129,10 +152,6 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

void CSIndexInput::setIndexFile(bool isIndexFile) {
_is_index_file = isIndexFile;
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
Expand Down Expand Up @@ -299,7 +318,7 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput*
bufferSize = _read_buffer_size;
}

ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, bufferSize);
ret = _CLNEW CSIndexInput(_stream, entry->file_name, entry->offset, entry->length, bufferSize);
return true;
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

namespace doris::segment_v2 {

const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::index_file_info_map = {
{"null_bitmap", 1}, {"segments.gen", 2}, {"segments_", 3}, {"fnm", 4}, {"tii", 5}};

const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::normal_file_info_map = {
{"tis", 1}, {"frq", 2}, {"prx", 3}};

// {tmp_dir}/{rowset_id}_{seg_id}_{index_id}@{suffix}
std::string InvertedIndexDescriptor::get_temporary_index_path(std::string_view tmp_dir_path,
std::string_view rowset_id,
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ namespace segment_v2 {

class InvertedIndexDescriptor {
public:
static const std::unordered_map<std::string, int32_t> index_file_info_map;
static const std::unordered_map<std::string, int32_t> normal_file_info_map;

static constexpr std::string_view segment_suffix = ".dat";
static constexpr std::string_view index_suffix = ".idx";
static std::string get_temporary_index_path(std::string_view tmp_dir_path,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOConte
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);
}
}
}
Expand Down Expand Up @@ -83,6 +84,7 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::I
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);

// 3. read file
int32_t version = _stream->readInt(); // Read version number
Expand Down
15 changes: 6 additions & 9 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,14 @@ Status InvertedIndexFileWriter::close() {

void InvertedIndexFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
auto file_priority = [](const std::string& filename) {
if (filename.find("segments") != std::string::npos) {
return 1;
}
if (filename.find("fnm") != std::string::npos) {
return 2;
}
if (filename.find("tii") != std::string::npos) {
return 3;
for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) {
if (filename.find(entry.first) != std::string::npos) {
return entry.second;
}
}
return 4; // Other files
return 6; // Other files
};

std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, const FileInfo& b) {
int32_t priority_a = file_priority(a.filename);
int32_t priority_b = file_priority(b.filename);
Expand Down
16 changes: 0 additions & 16 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,6 @@ lucene::store::IndexInput* DorisFSDirectory::FSIndexInput::clone() const {
}
void DorisFSDirectory::FSIndexInput::close() {
BufferedIndexInput::close();
/*if (_handle != nullptr) {
std::mutex* lock = _handle->_shared_lock;
bool ref = false;
{
std::lock_guard<std::mutex> wlock(*lock);
//determine if we are about to delete the handle...
ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
//decdelete (deletes if refcount is down to 0
_CLDECDELETE(_handle);
}
//if _handle is not ref by other FSIndexInput, try to release mutex lock, or it will be leaked.
if (!ref) {
delete lock;
}
}*/
}

void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
*searcher = searcher_result;

// When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries.
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);
auto stream = static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput();
stream->setIoContext(nullptr);
stream->setIndexFile(false);

// NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1790

-- !sql --
2000

-- !sql --
4

-- !sql --
58

-- !sql --
0

-- !sql --
16

-- !sql --
12

-- !sql --
16

-- !sql --
12

-- !sql --
10

-- !sql --
88

-- !sql --
648

-- !sql --
386

-- !sql --
78

-- !sql --
746

-- !sql --
476

-- !sql --
2000

Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_index_file_cache_fault_injection", "nonConcurrent") {
def indexTbName = "test_index_file_cache_fault_injection"

sql "DROP TABLE IF EXISTS ${indexTbName}"
sql """
CREATE TABLE ${indexTbName} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` varchar(20) NULL COMMENT "",
`request` text NULL COMMENT "",
`status` int(11) NULL COMMENT "",
`size` int(11) NULL COMMENT "",
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true"
);
"""

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->

// load the json data
streamLoad {
table "${table_name}"

// set http request header params
set 'label', label + "_" + UUID.randomUUID().toString()
set 'read_json_by_line', read_flag
set 'format', format_flag
file file_name // import json file
time 10000 // limit inflight 10s
if (expected_succ_rows >= 0) {
set 'max_filter_ratio', '1'
}

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (ignore_failure && expected_succ_rows < 0) { return }
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
}
}
}

try {
load_httplogs_data.call(indexTbName, 'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(indexTbName, 'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')

sql "sync"
sql """ set enable_common_expr_pushdown = true; """

try {
GetDebugPoint().enableDebugPointForAllBEs("CSIndexInput.readInternal")
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '0'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '1'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '2'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '3'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '4'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '5'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '6'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '7'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '8'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '9'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'a'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'b'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'c'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'd'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'e'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'f'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'g'; """
} finally {
GetDebugPoint().disableDebugPointForAllBEs("CSIndexInput.readInternal")
}
} finally {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ suite("test_inverted_index_cache", "nonConcurrent") {
load_httplogs_data.call(indexTbName, 'test_index_inlist_fault_injection', 'true', 'json', 'documents-1000.json')
sql "sync"

sql """ set enable_common_expr_pushdown = true; """
qt_sql """ select count() from ${indexTbName} where (request match 'images'); """

// query cache hit
Expand Down

0 comments on commit 7b8d12e

Please sign in to comment.