From f4059eb7cfdad028346cedcc967975bbe1893432 Mon Sep 17 00:00:00 2001 From: zzzxl1993 Date: Thu, 26 Dec 2024 16:22:25 +0800 Subject: [PATCH] [opt](inverted index) inverted Index File Cache Queue Optimization --- be/src/clucene | 2 +- .../inverted_index_compound_reader.cpp | 41 +++++-- .../rowset/segment_v2/inverted_index_desc.cpp | 6 ++ .../rowset/segment_v2/inverted_index_desc.h | 4 + .../segment_v2/inverted_index_file_reader.cpp | 2 + .../segment_v2/inverted_index_file_writer.cpp | 15 ++- .../inverted_index_fs_directory.cpp | 16 --- .../segment_v2/inverted_index_reader.cpp | 4 +- .../inverted_index_file_writer_test.cpp | 30 ++++-- .../test_index_file_cache_fault_injection.out | 52 +++++++++ ...st_index_file_cache_fault_injection.groovy | 101 ++++++++++++++++++ .../test_inverted_index_cache.groovy | 1 + 12 files changed, 225 insertions(+), 49 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out create mode 100644 regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy diff --git a/be/src/clucene b/be/src/clucene index 2204eaec46a68e..a1b2a2aefcecad 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit 2204eaec46a68e5e9a1876b7021f24839ecb2cf0 +Subproject commit a1b2a2aefcecad157448de49a88abf01d18d9ccf diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index f1b2b0eaedd4fd..87b0f925566946 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -57,17 +57,18 @@ namespace segment_v2 { class CSIndexInput : public lucene::store::BufferedIndexInput { private: CL_NS(store)::IndexInput* base; + std::string file_name; int64_t fileOffset; int64_t _length; const io::IOContext* _io_ctx = nullptr; - bool _is_index_file = false; // Indicates if the file is a TII file protected: void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override; void seekInternal(const int64_t /*pos*/) override {} public: - CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, const int64_t length, + CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name, + const int64_t fileOffset, const int64_t length, const int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE); CSIndexInput(const CSIndexInput& clone); ~CSIndexInput() override; @@ -78,13 +79,14 @@ class CSIndexInput : public lucene::store::BufferedIndexInput { const char* getObjectName() const override { return getClassName(); } static const char* getClassName() { return "CSIndexInput"; } void setIoContext(const void* io_ctx) override; - void setIndexFile(bool isIndexFile) override; }; -CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, - const int64_t length, const int32_t read_buffer_size) +CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name, + const int64_t fileOffset, const int64_t length, + const int32_t read_buffer_size) : BufferedIndexInput(read_buffer_size) { this->base = base; + this->file_name = file_name; this->fileOffset = fileOffset; this->_length = length; } @@ -101,7 +103,27 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) { base->setIoContext(_io_ctx); } - base->setIndexFile(_is_index_file); + DBUG_EXECUTE_IF("CSIndexInput.readInternal", { + for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) { + if (file_name.find(entry.first) != std::string::npos) { + if (!static_cast(base->getIoContext())->is_index_data) { + _CLTHROWA(CL_ERR_IO, + "The 'is_index_data' flag should be true for inverted index meta " + "files."); + } + } + } + for (const auto& entry : InvertedIndexDescriptor::normal_file_info_map) { + if (file_name.find(entry.first) != std::string::npos) { + if (static_cast(base->getIoContext())->is_index_data) { + _CLTHROWA(CL_ERR_IO, + "The 'is_index_data' flag should be false for non-meta inverted " + "index files."); + } + } + } + }); + base->seek(fileOffset + start); bool read_from_buffer = true; base->readBytes(b, len, read_from_buffer); @@ -119,6 +141,7 @@ lucene::store::IndexInput* CSIndexInput::clone() const { CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone) { this->base = clone.base; + this->file_name = clone.file_name; this->fileOffset = clone.fileOffset; this->_length = clone._length; } @@ -129,10 +152,6 @@ void CSIndexInput::setIoContext(const void* io_ctx) { _io_ctx = static_cast(io_ctx); } -void CSIndexInput::setIndexFile(bool isIndexFile) { - _is_index_file = isIndexFile; -} - DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size) : _ram_dir(new lucene::store::RAMDirectory()), _stream(stream), @@ -299,7 +318,7 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput* bufferSize = _read_buffer_size; } - ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, bufferSize); + ret = _CLNEW CSIndexInput(_stream, entry->file_name, entry->offset, entry->length, bufferSize); return true; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp b/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp index 8eac73f13a6f6d..e909bc1e0a9bc7 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp @@ -24,6 +24,12 @@ namespace doris::segment_v2 { +const std::unordered_map InvertedIndexDescriptor::index_file_info_map = { + {"null_bitmap", 1}, {"segments.gen", 2}, {"segments_", 3}, {"fnm", 4}, {"tii", 5}}; + +const std::unordered_map InvertedIndexDescriptor::normal_file_info_map = { + {"tis", 1}, {"frq", 2}, {"prx", 3}}; + // {tmp_dir}/{rowset_id}_{seg_id}_{index_id}@{suffix} std::string InvertedIndexDescriptor::get_temporary_index_path(std::string_view tmp_dir_path, std::string_view rowset_id, diff --git a/be/src/olap/rowset/segment_v2/inverted_index_desc.h b/be/src/olap/rowset/segment_v2/inverted_index_desc.h index 37f9cf3f4a1513..f421c7f37903ae 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_desc.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_desc.h @@ -20,6 +20,7 @@ #include #include +#include namespace doris { struct RowsetId; @@ -28,6 +29,9 @@ namespace segment_v2 { class InvertedIndexDescriptor { public: + static const std::unordered_map index_file_info_map; + static const std::unordered_map normal_file_info_map; + static constexpr std::string_view segment_suffix = ".dat"; static constexpr std::string_view index_suffix = ".idx"; static std::string get_temporary_index_path(std::string_view tmp_dir_path, diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp index 813a78f2a3fa86..57d0ee5f14018e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp @@ -41,6 +41,7 @@ Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOConte if (_storage_format == InvertedIndexStorageFormatPB::V2) { if (_stream) { _stream->setIoContext(io_ctx); + _stream->setIndexFile(true); } } } @@ -83,6 +84,7 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::I } _stream = std::unique_ptr(index_input); _stream->setIoContext(io_ctx); + _stream->setIndexFile(true); // 3. read file int32_t version = _stream->readInt(); // Read version number diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index 4d6892aa78568f..e42e7144ea9e37 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -171,17 +171,14 @@ Status InvertedIndexFileWriter::close() { void InvertedIndexFileWriter::sort_files(std::vector& file_infos) { auto file_priority = [](const std::string& filename) { - if (filename.find("segments") != std::string::npos) { - return 1; - } - if (filename.find("fnm") != std::string::npos) { - return 2; - } - if (filename.find("tii") != std::string::npos) { - return 3; + for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) { + if (filename.find(entry.first) != std::string::npos) { + return entry.second; + } } - return 4; // Other files + return 6; // Other files }; + std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, const FileInfo& b) { int32_t priority_a = file_priority(a.filename); int32_t priority_b = file_priority(b.filename); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index fe0a81c41a6970..ff37ca9135caaf 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -168,22 +168,6 @@ lucene::store::IndexInput* DorisFSDirectory::FSIndexInput::clone() const { } void DorisFSDirectory::FSIndexInput::close() { BufferedIndexInput::close(); - /*if (_handle != nullptr) { - std::mutex* lock = _handle->_shared_lock; - bool ref = false; - { - std::lock_guard wlock(*lock); - //determine if we are about to delete the handle... - ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1); - //decdelete (deletes if refcount is down to 0 - _CLDECDELETE(_handle); - } - - //if _handle is not ref by other FSIndexInput, try to release mutex lock, or it will be leaked. - if (!ref) { - delete lock; - } - }*/ } void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index 9790d7273e1bff..4fe45283cd2fff 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -245,7 +245,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir, *searcher = searcher_result; // When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries. - static_cast(dir)->getDorisIndexInput()->setIoContext(nullptr); + auto stream = static_cast(dir)->getDorisIndexInput(); + stream->setIoContext(nullptr); + stream->setIndexFile(false); // NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand. mem_tracker->consume(index_searcher_builder->get_reader_size()); diff --git a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp index 41703d49d5e013..94dc2c03bac8cb 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp @@ -335,7 +335,8 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) { EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok()); EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok()); mock_dir->init(_fs, local_fs_index_path.c_str()); - std::vector files = {"0.segments", "0.fnm", "0.tii", "nullbitmap", "write.lock"}; + std::vector files = {"segments_0", "segments.gen", "0.fnm", + "0.tii", "null_bitmap", "write.lock"}; for (auto& file : files) { auto out_file_1 = std::unique_ptr(mock_dir->createOutput(file.c_str())); @@ -343,11 +344,14 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) { out_file_1->close(); } - EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments"))) + EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments_0"))) .WillOnce(testing::Return(1000)); + EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments.gen"))) + .WillOnce(testing::Return(1200)); EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000)); EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500)); - EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500)); + EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("null_bitmap"))) + .WillOnce(testing::Return(500)); InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, InvertedIndexStorageFormatPB::V2); @@ -362,24 +366,28 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) { std::vector sorted_files = writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1, "suffix1")].get()); - // 1. 0.segments (priority 1, size 1000) - // 2. 0.fnm (priority 2, size 2000) - // 3. 0.tii (priority 3, size 1500) - // 4. nullbitmap (priority 4, size 500) + // 1. null_bitmap (priority 1, size 500) + // 2. segments.gen (priority 2, size 1200) + // 3. segments_0 (priority 3, size 1000) + // 4. 0.fnm (priority 4, size 2000) + // 5. 0.tii (priority 5, size 1500) - std::vector expected_order = {"0.segments", "0.fnm", "0.tii", "nullbitmap"}; + std::vector expected_order = {"null_bitmap", "segments.gen", "segments_0", "0.fnm", + "0.tii"}; ASSERT_EQ(sorted_files.size(), expected_order.size()); for (size_t i = 0; i < expected_order.size(); ++i) { EXPECT_EQ(sorted_files[i].filename, expected_order[i]); - if (sorted_files[i].filename == "0.segments") { + if (sorted_files[i].filename == "null_bitmap") { + EXPECT_EQ(sorted_files[i].filesize, 500); + } else if (sorted_files[i].filename == "segments.gen") { + EXPECT_EQ(sorted_files[i].filesize, 1200); + } else if (sorted_files[i].filename == "segments_0") { EXPECT_EQ(sorted_files[i].filesize, 1000); } else if (sorted_files[i].filename == "0.fnm") { EXPECT_EQ(sorted_files[i].filesize, 2000); } else if (sorted_files[i].filename == "0.tii") { EXPECT_EQ(sorted_files[i].filesize, 1500); - } else if (sorted_files[i].filename == "nullbitmap") { - EXPECT_EQ(sorted_files[i].filesize, 500); } } } diff --git a/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out b/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out new file mode 100644 index 00000000000000..b096fdedd128f9 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out @@ -0,0 +1,52 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1790 + +-- !sql -- +2000 + +-- !sql -- +4 + +-- !sql -- +58 + +-- !sql -- +0 + +-- !sql -- +16 + +-- !sql -- +12 + +-- !sql -- +16 + +-- !sql -- +12 + +-- !sql -- +10 + +-- !sql -- +88 + +-- !sql -- +648 + +-- !sql -- +386 + +-- !sql -- +78 + +-- !sql -- +746 + +-- !sql -- +476 + +-- !sql -- +2000 + diff --git a/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy new file mode 100644 index 00000000000000..8a04c15b839f46 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_index_file_cache_fault_injection", "nonConcurrent") { + def indexTbName = "test_index_file_cache_fault_injection" + + sql "DROP TABLE IF EXISTS ${indexTbName}" + sql """ + CREATE TABLE ${indexTbName} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '', + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + } + } + } + + try { + load_httplogs_data.call(indexTbName, 'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(indexTbName, 'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json') + + sql "sync" + sql """ set enable_common_expr_pushdown = true; """ + + try { + GetDebugPoint().enableDebugPointForAllBEs("CSIndexInput.readInternal") + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '0'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '1'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '2'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '3'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '4'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '5'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '6'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '7'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '8'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '9'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'a'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'b'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'c'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'd'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'e'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'f'; """ + qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'g'; """ + } finally { + GetDebugPoint().disableDebugPointForAllBEs("CSIndexInput.readInternal") + } + } finally { + } +} \ No newline at end of file diff --git a/regression-test/suites/fault_injection_p0/test_inverted_index_cache.groovy b/regression-test/suites/fault_injection_p0/test_inverted_index_cache.groovy index fd250a7d4fd528..9371f6401cbf39 100644 --- a/regression-test/suites/fault_injection_p0/test_inverted_index_cache.groovy +++ b/regression-test/suites/fault_injection_p0/test_inverted_index_cache.groovy @@ -71,6 +71,7 @@ suite("test_inverted_index_cache", "nonConcurrent") { load_httplogs_data.call(indexTbName, 'test_index_inlist_fault_injection', 'true', 'json', 'documents-1000.json') sql "sync" + sql """ set enable_common_expr_pushdown = true; """ qt_sql """ select count() from ${indexTbName} where (request match 'images'); """ // query cache hit