Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](inverted index) inverted Index File Cache Queue Optimization #46024

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/clucene
41 changes: 30 additions & 11 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,18 @@ namespace segment_v2 {
class CSIndexInput : public lucene::store::BufferedIndexInput {
private:
CL_NS(store)::IndexInput* base;
std::string file_name;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;
bool _is_index_file = false; // Indicates if the file is a TII file

protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
void seekInternal(const int64_t /*pos*/) override {}

public:
CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, const int64_t length,
CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
CSIndexInput(const CSIndexInput& clone);
~CSIndexInput() override;
Expand All @@ -78,13 +79,14 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
void setIndexFile(bool isIndexFile) override;
};

CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
const int64_t length, const int32_t read_buffer_size)
CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size)
: BufferedIndexInput(read_buffer_size) {
this->base = base;
this->file_name = file_name;
this->fileOffset = fileOffset;
this->_length = length;
}
Expand All @@ -101,7 +103,27 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
base->setIoContext(_io_ctx);
}

base->setIndexFile(_is_index_file);
DBUG_EXECUTE_IF("CSIndexInput.readInternal", {
for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (!static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be true for inverted index meta "
"files.");
}
}
}
for (const auto& entry : InvertedIndexDescriptor::normal_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be false for non-meta inverted "
"index files.");
}
}
}
});

base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
Expand All @@ -119,6 +141,7 @@ lucene::store::IndexInput* CSIndexInput::clone() const {

CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone) {
this->base = clone.base;
this->file_name = clone.file_name;
this->fileOffset = clone.fileOffset;
this->_length = clone._length;
}
Expand All @@ -129,10 +152,6 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

void CSIndexInput::setIndexFile(bool isIndexFile) {
_is_index_file = isIndexFile;
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
Expand Down Expand Up @@ -299,7 +318,7 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput*
bufferSize = _read_buffer_size;
}

ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, bufferSize);
ret = _CLNEW CSIndexInput(_stream, entry->file_name, entry->offset, entry->length, bufferSize);
return true;
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

namespace doris::segment_v2 {

const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::index_file_info_map = {
{"null_bitmap", 1}, {"segments.gen", 2}, {"segments_", 3}, {"fnm", 4}, {"tii", 5}};

const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::normal_file_info_map = {
{"tis", 1}, {"frq", 2}, {"prx", 3}};

// {tmp_dir}/{rowset_id}_{seg_id}_{index_id}@{suffix}
std::string InvertedIndexDescriptor::get_temporary_index_path(std::string_view tmp_dir_path,
std::string_view rowset_id,
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <stdint.h>

#include <string>
#include <unordered_map>

namespace doris {
struct RowsetId;
Expand All @@ -28,6 +29,9 @@ namespace segment_v2 {

class InvertedIndexDescriptor {
public:
static const std::unordered_map<std::string, int32_t> index_file_info_map;
static const std::unordered_map<std::string, int32_t> normal_file_info_map;

static constexpr std::string_view segment_suffix = ".dat";
static constexpr std::string_view index_suffix = ".idx";
static std::string get_temporary_index_path(std::string_view tmp_dir_path,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOConte
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);
}
}
}
Expand Down Expand Up @@ -83,6 +84,7 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::I
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);

// 3. read file
int32_t version = _stream->readInt(); // Read version number
Expand Down
15 changes: 6 additions & 9 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,14 @@ Status InvertedIndexFileWriter::close() {

void InvertedIndexFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
auto file_priority = [](const std::string& filename) {
if (filename.find("segments") != std::string::npos) {
return 1;
}
if (filename.find("fnm") != std::string::npos) {
return 2;
}
if (filename.find("tii") != std::string::npos) {
return 3;
for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) {
if (filename.find(entry.first) != std::string::npos) {
return entry.second;
}
}
return 4; // Other files
return 6; // Other files
};

std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, const FileInfo& b) {
int32_t priority_a = file_priority(a.filename);
int32_t priority_b = file_priority(b.filename);
Expand Down
16 changes: 0 additions & 16 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,6 @@ lucene::store::IndexInput* DorisFSDirectory::FSIndexInput::clone() const {
}
void DorisFSDirectory::FSIndexInput::close() {
BufferedIndexInput::close();
/*if (_handle != nullptr) {
std::mutex* lock = _handle->_shared_lock;
bool ref = false;
{
std::lock_guard<std::mutex> wlock(*lock);
//determine if we are about to delete the handle...
ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
//decdelete (deletes if refcount is down to 0
_CLDECDELETE(_handle);
}

//if _handle is not ref by other FSIndexInput, try to release mutex lock, or it will be leaked.
if (!ref) {
delete lock;
}
}*/
}

void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
*searcher = searcher_result;

// When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries.
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);
auto stream = static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput();
stream->setIoContext(nullptr);
stream->setIndexFile(false);

// NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
Expand Down
30 changes: 19 additions & 11 deletions be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,23 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
mock_dir->init(_fs, local_fs_index_path.c_str());
std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii", "nullbitmap", "write.lock"};
std::vector<std::string> files = {"segments_0", "segments.gen", "0.fnm",
"0.tii", "null_bitmap", "write.lock"};
for (auto& file : files) {
auto out_file_1 =
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
out_file_1->writeString("test1");
out_file_1->close();
}

EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments")))
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments_0")))
.WillOnce(testing::Return(1000));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments.gen")))
.WillOnce(testing::Return(1200));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("null_bitmap")))
.WillOnce(testing::Return(500));

InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id,
InvertedIndexStorageFormatPB::V2);
Expand All @@ -362,24 +366,28 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
std::vector<FileInfo> sorted_files =
writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1, "suffix1")].get());

// 1. 0.segments (priority 1, size 1000)
// 2. 0.fnm (priority 2, size 2000)
// 3. 0.tii (priority 3, size 1500)
// 4. nullbitmap (priority 4, size 500)
// 1. null_bitmap (priority 1, size 500)
// 2. segments.gen (priority 2, size 1200)
// 3. segments_0 (priority 3, size 1000)
// 4. 0.fnm (priority 4, size 2000)
// 5. 0.tii (priority 5, size 1500)

std::vector<std::string> expected_order = {"0.segments", "0.fnm", "0.tii", "nullbitmap"};
std::vector<std::string> expected_order = {"null_bitmap", "segments.gen", "segments_0", "0.fnm",
"0.tii"};
ASSERT_EQ(sorted_files.size(), expected_order.size());

for (size_t i = 0; i < expected_order.size(); ++i) {
EXPECT_EQ(sorted_files[i].filename, expected_order[i]);
if (sorted_files[i].filename == "0.segments") {
if (sorted_files[i].filename == "null_bitmap") {
EXPECT_EQ(sorted_files[i].filesize, 500);
} else if (sorted_files[i].filename == "segments.gen") {
EXPECT_EQ(sorted_files[i].filesize, 1200);
} else if (sorted_files[i].filename == "segments_0") {
EXPECT_EQ(sorted_files[i].filesize, 1000);
} else if (sorted_files[i].filename == "0.fnm") {
EXPECT_EQ(sorted_files[i].filesize, 2000);
} else if (sorted_files[i].filename == "0.tii") {
EXPECT_EQ(sorted_files[i].filesize, 1500);
} else if (sorted_files[i].filename == "nullbitmap") {
EXPECT_EQ(sorted_files[i].filesize, 500);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1790

-- !sql --
2000

-- !sql --
4

-- !sql --
58

-- !sql --
0

-- !sql --
16

-- !sql --
12

-- !sql --
16

-- !sql --
12

-- !sql --
10

-- !sql --
88

-- !sql --
648

-- !sql --
386

-- !sql --
78

-- !sql --
746

-- !sql --
476

-- !sql --
2000

Loading
Loading