Skip to content

Commit

Permalink
[opt](inverted index) inverted Index File Cache Queue Optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Dec 28, 2024
1 parent 041db85 commit f4059eb
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 49 deletions.
2 changes: 1 addition & 1 deletion be/src/clucene
41 changes: 30 additions & 11 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,18 @@ namespace segment_v2 {
class CSIndexInput : public lucene::store::BufferedIndexInput {
private:
CL_NS(store)::IndexInput* base;
std::string file_name;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;
bool _is_index_file = false; // Indicates if the file is a TII file

protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
void seekInternal(const int64_t /*pos*/) override {}

public:
CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, const int64_t length,
CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
CSIndexInput(const CSIndexInput& clone);
~CSIndexInput() override;
Expand All @@ -78,13 +79,14 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
void setIndexFile(bool isIndexFile) override;
};

CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
const int64_t length, const int32_t read_buffer_size)
CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size)
: BufferedIndexInput(read_buffer_size) {
this->base = base;
this->file_name = file_name;
this->fileOffset = fileOffset;
this->_length = length;
}
Expand All @@ -101,7 +103,27 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
base->setIoContext(_io_ctx);
}

base->setIndexFile(_is_index_file);
DBUG_EXECUTE_IF("CSIndexInput.readInternal", {
for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (!static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be true for inverted index meta "
"files.");
}
}
}
for (const auto& entry : InvertedIndexDescriptor::normal_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be false for non-meta inverted "
"index files.");
}
}
}
});

base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
Expand All @@ -119,6 +141,7 @@ lucene::store::IndexInput* CSIndexInput::clone() const {

CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone) {
this->base = clone.base;
this->file_name = clone.file_name;
this->fileOffset = clone.fileOffset;
this->_length = clone._length;
}
Expand All @@ -129,10 +152,6 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

void CSIndexInput::setIndexFile(bool isIndexFile) {
_is_index_file = isIndexFile;
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
Expand Down Expand Up @@ -299,7 +318,7 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput*
bufferSize = _read_buffer_size;
}

ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, bufferSize);
ret = _CLNEW CSIndexInput(_stream, entry->file_name, entry->offset, entry->length, bufferSize);
return true;
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

namespace doris::segment_v2 {

const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::index_file_info_map = {
{"null_bitmap", 1}, {"segments.gen", 2}, {"segments_", 3}, {"fnm", 4}, {"tii", 5}};

const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::normal_file_info_map = {
{"tis", 1}, {"frq", 2}, {"prx", 3}};

// {tmp_dir}/{rowset_id}_{seg_id}_{index_id}@{suffix}
std::string InvertedIndexDescriptor::get_temporary_index_path(std::string_view tmp_dir_path,
std::string_view rowset_id,
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <stdint.h>

#include <string>
#include <unordered_map>

namespace doris {
struct RowsetId;
Expand All @@ -28,6 +29,9 @@ namespace segment_v2 {

class InvertedIndexDescriptor {
public:
static const std::unordered_map<std::string, int32_t> index_file_info_map;
static const std::unordered_map<std::string, int32_t> normal_file_info_map;

static constexpr std::string_view segment_suffix = ".dat";
static constexpr std::string_view index_suffix = ".idx";
static std::string get_temporary_index_path(std::string_view tmp_dir_path,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOConte
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);
}
}
}
Expand Down Expand Up @@ -83,6 +84,7 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::I
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);

// 3. read file
int32_t version = _stream->readInt(); // Read version number
Expand Down
15 changes: 6 additions & 9 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,14 @@ Status InvertedIndexFileWriter::close() {

void InvertedIndexFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
auto file_priority = [](const std::string& filename) {
if (filename.find("segments") != std::string::npos) {
return 1;
}
if (filename.find("fnm") != std::string::npos) {
return 2;
}
if (filename.find("tii") != std::string::npos) {
return 3;
for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) {
if (filename.find(entry.first) != std::string::npos) {
return entry.second;
}
}
return 4; // Other files
return 6; // Other files
};

std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, const FileInfo& b) {
int32_t priority_a = file_priority(a.filename);
int32_t priority_b = file_priority(b.filename);
Expand Down
16 changes: 0 additions & 16 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,6 @@ lucene::store::IndexInput* DorisFSDirectory::FSIndexInput::clone() const {
}
void DorisFSDirectory::FSIndexInput::close() {
BufferedIndexInput::close();
/*if (_handle != nullptr) {
std::mutex* lock = _handle->_shared_lock;
bool ref = false;
{
std::lock_guard<std::mutex> wlock(*lock);
//determine if we are about to delete the handle...
ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
//decdelete (deletes if refcount is down to 0
_CLDECDELETE(_handle);
}
//if _handle is not ref by other FSIndexInput, try to release mutex lock, or it will be leaked.
if (!ref) {
delete lock;
}
}*/
}

void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
*searcher = searcher_result;

// When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries.
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);
auto stream = static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput();
stream->setIoContext(nullptr);
stream->setIndexFile(false);

// NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
Expand Down
30 changes: 19 additions & 11 deletions be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,23 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
mock_dir->init(_fs, local_fs_index_path.c_str());
std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii", "nullbitmap", "write.lock"};
std::vector<std::string> files = {"segments_0", "segments.gen", "0.fnm",
"0.tii", "null_bitmap", "write.lock"};
for (auto& file : files) {
auto out_file_1 =
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
out_file_1->writeString("test1");
out_file_1->close();
}

EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments")))
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments_0")))
.WillOnce(testing::Return(1000));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments.gen")))
.WillOnce(testing::Return(1200));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("null_bitmap")))
.WillOnce(testing::Return(500));

InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id,
InvertedIndexStorageFormatPB::V2);
Expand All @@ -362,24 +366,28 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
std::vector<FileInfo> sorted_files =
writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1, "suffix1")].get());

// 1. 0.segments (priority 1, size 1000)
// 2. 0.fnm (priority 2, size 2000)
// 3. 0.tii (priority 3, size 1500)
// 4. nullbitmap (priority 4, size 500)
// 1. null_bitmap (priority 1, size 500)
// 2. segments.gen (priority 2, size 1200)
// 3. segments_0 (priority 3, size 1000)
// 4. 0.fnm (priority 4, size 2000)
// 5. 0.tii (priority 5, size 1500)

std::vector<std::string> expected_order = {"0.segments", "0.fnm", "0.tii", "nullbitmap"};
std::vector<std::string> expected_order = {"null_bitmap", "segments.gen", "segments_0", "0.fnm",
"0.tii"};
ASSERT_EQ(sorted_files.size(), expected_order.size());

for (size_t i = 0; i < expected_order.size(); ++i) {
EXPECT_EQ(sorted_files[i].filename, expected_order[i]);
if (sorted_files[i].filename == "0.segments") {
if (sorted_files[i].filename == "null_bitmap") {
EXPECT_EQ(sorted_files[i].filesize, 500);
} else if (sorted_files[i].filename == "segments.gen") {
EXPECT_EQ(sorted_files[i].filesize, 1200);
} else if (sorted_files[i].filename == "segments_0") {
EXPECT_EQ(sorted_files[i].filesize, 1000);
} else if (sorted_files[i].filename == "0.fnm") {
EXPECT_EQ(sorted_files[i].filesize, 2000);
} else if (sorted_files[i].filename == "0.tii") {
EXPECT_EQ(sorted_files[i].filesize, 1500);
} else if (sorted_files[i].filename == "nullbitmap") {
EXPECT_EQ(sorted_files[i].filesize, 500);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1790

-- !sql --
2000

-- !sql --
4

-- !sql --
58

-- !sql --
0

-- !sql --
16

-- !sql --
12

-- !sql --
16

-- !sql --
12

-- !sql --
10

-- !sql --
88

-- !sql --
648

-- !sql --
386

-- !sql --
78

-- !sql --
746

-- !sql --
476

-- !sql --
2000

Loading

0 comments on commit f4059eb

Please sign in to comment.