diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index a92fe28abf57f7..abbd84001c8420 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -660,9 +660,11 @@ Status Compaction::do_inverted_index_compaction() { DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta)); } for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; dest_segment_id++) { - auto* dest_dir = + auto dest_dir = DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta)); - dest_index_dirs[dest_segment_id] = dest_dir; + // Destination directories in dest_index_dirs do not need to be deconstructed, + // but their lifecycle must be managed by inverted_index_file_writers. + dest_index_dirs[dest_segment_id] = dest_dir.get(); } auto st = compact_column(index_meta->index_id(), src_idx_dirs, dest_index_dirs, index_tmp_path.native(), trans_vec, dest_segment_num_rows); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp index 88a8f2417228bc..f988c46c027c26 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -76,13 +76,6 @@ Status compact_column(int64_t index_id, // when index_writer is destroyed, if closeDir is set, dir will be close // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed. _CLDECDELETE(dir) - for (auto* d : dest_index_dirs) { - if (d != nullptr) { - // NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize. - //d->close(); - //_CLDELETE(d); - } - } // delete temporary segment_path, only when inverted_index_ram_dir_enable is false if (!config::inverted_index_ram_dir_enable) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index 0e2dbe7d6bdc89..74f7398ea4a46f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -19,17 +19,14 @@ #include +#include #include #include "common/status.h" -#include "io/fs/file_writer.h" -#include "io/fs/local_file_system.h" -#include "olap/rowset/segment_v2/inverted_index_cache.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" #include "olap/rowset/segment_v2/inverted_index_reader.h" #include "olap/tablet_schema.h" -#include "runtime/exec_env.h" namespace doris::segment_v2 { @@ -38,32 +35,11 @@ Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_di return Status::OK(); } -Result InvertedIndexFileWriter::open(const TabletIndex* index_meta) { - auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); - const auto& local_fs = io::global_local_filesystem(); - auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path( - tmp_file_dir.native(), _rowset_id, _seg_id, index_meta->index_id(), - index_meta->get_index_suffix()); - bool exists = false; - auto st = local_fs->exists(local_fs_index_path, &exists); - DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_error", - { st = Status::Error("debug point: no such file error"); }) - if (!st.ok()) { - LOG(ERROR) << "index_path:" << local_fs_index_path << " exists error:" << st; - return ResultError(st); - } - DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_true", { exists = true; }) - if (exists) { - LOG(ERROR) << "try to init a directory:" << local_fs_index_path << " already exists"; - return ResultError( - Status::InternalError("InvertedIndexFileWriter::open directory already exists")); - } - - bool can_use_ram_dir = true; - auto* dir = DorisFSDirectoryFactory::getDirectory(local_fs, local_fs_index_path.c_str(), - can_use_ram_dir); - auto key = std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()); - auto [it, inserted] = _indices_dirs.emplace(key, std::unique_ptr(dir)); +Status InvertedIndexFileWriter::_insert_directory_into_map(int64_t index_id, + const std::string& index_suffix, + std::shared_ptr dir) { + auto key = std::make_pair(index_id, index_suffix); + auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir)); if (!inserted) { LOG(ERROR) << "InvertedIndexFileWriter::open attempted to insert a duplicate key: (" << key.first << ", " << key.second << ")"; @@ -71,8 +47,23 @@ Result InvertedIndexFileWriter::open(const TabletIndex* index for (const auto& entry : _indices_dirs) { LOG(ERROR) << "Key: (" << entry.first.first << ", " << entry.first.second << ")"; } - return ResultError(Status::InternalError( - "InvertedIndexFileWriter::open attempted to insert a duplicate dir")); + return Status::InternalError( + "InvertedIndexFileWriter::open attempted to insert a duplicate dir"); + } + return Status::OK(); +} + +Result> InvertedIndexFileWriter::open( + const TabletIndex* index_meta) { + auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path( + _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), index_meta->get_index_suffix()); + bool can_use_ram_dir = true; + auto dir = std::shared_ptr(DorisFSDirectoryFactory::getDirectory( + _local_fs, local_fs_index_path.c_str(), can_use_ram_dir)); + auto st = + _insert_directory_into_map(index_meta->index_id(), index_meta->get_index_suffix(), dir); + if (!st.ok()) { + return ResultError(st); } return dir; @@ -222,7 +213,7 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire int64_t chunk = bufferLength; while (remainder > 0) { - int64_t len = std::min(std::min(chunk, length), remainder); + int64_t len = std::min({chunk, length, remainder}); input->readBytes(buffer, len); output->writeBytes(buffer, len); remainder -= len; @@ -252,125 +243,46 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire Status InvertedIndexFileWriter::write_v1() { int64_t total_size = 0; + lucene::store::Directory* out_dir = nullptr; + std::unique_ptr output = nullptr; for (const auto& entry : _indices_dirs) { const int64_t index_id = entry.first.first; const auto& index_suffix = entry.first.second; try { - const auto& directory = entry.second; - std::vector files; - directory->list(&files); - // remove write.lock file - auto it = std::find(files.begin(), files.end(), DorisFSDirectory::WRITE_LOCK_FILE); - if (it != files.end()) { - files.erase(it); - } + const auto& directory = entry.second.get(); - std::vector sorted_files; - for (auto file : files) { - FileInfo file_info; - file_info.filename = file; - file_info.filesize = directory->fileLength(file.c_str()); - sorted_files.emplace_back(std::move(file_info)); - } - sort_files(sorted_files); - - int32_t file_count = sorted_files.size(); - - io::Path cfs_path(InvertedIndexDescriptor::get_index_file_path_v1( - _index_path_prefix, index_id, index_suffix)); - auto idx_path = cfs_path.parent_path(); - std::string idx_name = cfs_path.filename(); - // write file entries to ram directory to get header length - lucene::store::RAMDirectory ram_dir; - auto* out_idx = ram_dir.createOutput(idx_name.c_str()); - DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_ram_output_is_nullptr", - { out_idx = nullptr; }) - if (out_idx == nullptr) { - LOG(WARNING) << "Write compound file error: RAMDirectory output is nullptr."; - _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error"); - } + // Prepare sorted file list + auto sorted_files = prepare_sorted_files(directory); + + // Calculate header length + auto [header_length, header_file_count] = + calculate_header_length(sorted_files, directory); + + // Create output stream + auto result = create_output_stream_v1(index_id, index_suffix); + out_dir = result.first; + output = std::move(result.second); - std::unique_ptr ram_output(out_idx); - ram_output->writeVInt(file_count); - // write file entries in ram directory - // number of files, which data are in header - int header_file_count = 0; - int64_t header_file_length = 0; - const int64_t buffer_length = 16384; - uint8_t ram_buffer[buffer_length]; - for (auto file : sorted_files) { - ram_output->writeString(file.filename); // file name - ram_output->writeLong(0); // data offset - ram_output->writeLong(file.filesize); // file length - header_file_length += file.filesize; - if (header_file_length <= DorisFSDirectory::MAX_HEADER_DATA_SIZE) { - copyFile(file.filename.c_str(), directory.get(), ram_output.get(), ram_buffer, - buffer_length); - header_file_count++; - } - } - auto header_len = ram_output->getFilePointer(); - ram_output->close(); - ram_dir.deleteFile(idx_name.c_str()); - ram_dir.close(); - - auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, idx_path.c_str()); - out_dir->set_file_writer_opts(_opts); - - auto* out = out_dir->createOutput(idx_name.c_str()); - DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr", - { out = nullptr; }); - if (out == nullptr) { - LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr."; - _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); - } - std::unique_ptr output(out); size_t start = output->getFilePointer(); - output->writeVInt(file_count); - // write file entries - int64_t data_offset = header_len; - uint8_t header_buffer[buffer_length]; - for (int i = 0; i < sorted_files.size(); ++i) { - auto file = sorted_files[i]; - output->writeString(file.filename); // FileName - // DataOffset - if (i < header_file_count) { - // file data write in header, so we set its offset to -1. - output->writeLong(-1); - } else { - output->writeLong(data_offset); - } - output->writeLong(file.filesize); // FileLength - if (i < header_file_count) { - // append data - copyFile(file.filename.c_str(), directory.get(), output.get(), header_buffer, - buffer_length); - } else { - data_offset += file.filesize; - } - } - // write rest files' data - uint8_t data_buffer[buffer_length]; - for (int i = header_file_count; i < sorted_files.size(); ++i) { - auto file = sorted_files[i]; - copyFile(file.filename.c_str(), directory.get(), output.get(), data_buffer, - buffer_length); - } - out_dir->close(); - // NOTE: need to decrease ref count, but not to delete here, - // because index cache may get the same directory from DIRECTORIES - _CLDECDELETE(out_dir) + // Write header and data + write_header_and_data_v1(output.get(), sorted_files, directory, header_length, + header_file_count); + + // Close and clean up + finalize_output_dir(out_dir); + + // Collect file information auto compound_file_size = output->getFilePointer() - start; output->close(); - //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size; total_size += compound_file_size; - InvertedIndexFileInfo_IndexInfo index_info; - index_info.set_index_id(index_id); - index_info.set_index_suffix(index_suffix); - index_info.set_index_file_size(compound_file_size); - auto* new_index_info = _file_info.add_index_info(); - *new_index_info = index_info; + add_index_info(index_id, index_suffix, compound_file_size); + } catch (CLuceneError& err) { + finalize_output_dir(out_dir); + if (output != nullptr) { + output->close(); + output.reset(); + } auto index_path = InvertedIndexDescriptor::get_index_file_path_v1( _index_path_prefix, index_id, index_suffix); LOG(ERROR) << "CLuceneError occur when write_v1 idx file " << index_path @@ -386,108 +298,267 @@ Status InvertedIndexFileWriter::write_v1() { } Status InvertedIndexFileWriter::write_v2() { - io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; - std::unique_ptr compound_file_output; + lucene::store::Directory* out_dir = nullptr; + std::unique_ptr compound_file_output = nullptr; try { - // Create the output stream to write the compound file + // Calculate header length and initialize offset int64_t current_offset = headerLength(); + // Prepare file metadata + auto file_metadata = prepare_file_metadata_v2(current_offset); - io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; - - auto* out_dir = - DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str()); - out_dir->set_file_writer_opts(_opts); - - DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is nullptr"; - compound_file_output = std::unique_ptr( - out_dir->createOutputV2(_idx_v2_writer.get())); - - // Write the version number - compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2); - - // Write the number of indices - const auto numIndices = static_cast(_indices_dirs.size()); - compound_file_output->writeInt(numIndices); - - std::vector> - file_metadata; // Store file name, offset, file length, and corresponding directory - - // First, write all index information and file metadata - for (const auto& entry : _indices_dirs) { - const int64_t index_id = entry.first.first; - const auto& index_suffix = entry.first.second; - const auto& dir = entry.second; - std::vector files; - dir->list(&files); - - auto it = std::find(files.begin(), files.end(), DorisFSDirectory::WRITE_LOCK_FILE); - if (it != files.end()) { - files.erase(it); - } - // sort file list by file length - std::vector> sorted_files; - for (const auto& file : files) { - sorted_files.emplace_back(file, dir->fileLength(file.c_str())); - } - - std::sort( - sorted_files.begin(), sorted_files.end(), - [](const std::pair& a, - const std::pair& b) { return (a.second < b.second); }); - - int32_t file_count = sorted_files.size(); - - // Write the index ID and the number of files - compound_file_output->writeLong(index_id); - compound_file_output->writeInt(static_cast(index_suffix.length())); - compound_file_output->writeBytes(reinterpret_cast(index_suffix.data()), - index_suffix.length()); - compound_file_output->writeInt(file_count); - - // Calculate the offset for each file and write the file metadata - for (const auto& file : sorted_files) { - int64_t file_length = dir->fileLength(file.first.c_str()); - compound_file_output->writeInt(static_cast(file.first.length())); - compound_file_output->writeBytes( - reinterpret_cast(file.first.data()), file.first.length()); - compound_file_output->writeLong(current_offset); - compound_file_output->writeLong(file_length); - - file_metadata.emplace_back(file.first, current_offset, file_length, dir.get()); - current_offset += file_length; // Update the data offset - } - } + // Create output stream + auto result = create_output_stream_v2(); + out_dir = result.first; + compound_file_output = std::move(result.second); - const int64_t buffer_length = 16384; - uint8_t header_buffer[buffer_length]; + // Write version and number of indices + write_version_and_indices_count(compound_file_output.get()); - // Next, write the file data - for (const auto& info : file_metadata) { - const std::string& file = std::get<0>(info); - auto* dir = std::get<3>(info); + // Write index headers and file metadata + write_index_headers_and_metadata(compound_file_output.get(), file_metadata); - // Write the actual file data - copyFile(file.c_str(), dir, compound_file_output.get(), header_buffer, buffer_length); - } + // Copy file data + copy_files_data_v2(compound_file_output.get(), file_metadata); - out_dir->close(); - // NOTE: need to decrease ref count, but not to delete here, - // because index cache may get the same directory from DIRECTORIES - _CLDECDELETE(out_dir) + // Close and clean up + finalize_output_dir(out_dir); _total_file_size = compound_file_output->getFilePointer(); - compound_file_output->close(); _file_info.set_index_size(_total_file_size); + compound_file_output->close(); + return Status::OK(); } catch (CLuceneError& err) { + io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; LOG(ERROR) << "CLuceneError occur when close idx file " << index_path << " error msg: " << err.what(); - if (compound_file_output) { + if (compound_file_output != nullptr) { compound_file_output->close(); compound_file_output.reset(); } + finalize_output_dir(out_dir); return Status::Error( "CLuceneError occur when close idx file: {}, error msg: {}", index_path.c_str(), err.what()); } - return Status::OK(); +} + +// Helper function implementations + +std::vector InvertedIndexFileWriter::prepare_sorted_files( + lucene::store::Directory* directory) { + std::vector files; + directory->list(&files); + + // Remove write.lock file + files.erase(std::remove(files.begin(), files.end(), DorisFSDirectory::WRITE_LOCK_FILE), + files.end()); + + std::vector sorted_files; + for (const auto& file : files) { + FileInfo file_info; + file_info.filename = file; + file_info.filesize = directory->fileLength(file.c_str()); + sorted_files.push_back(std::move(file_info)); + } + + // Sort the files + sort_files(sorted_files); + return sorted_files; +} + +void InvertedIndexFileWriter::finalize_output_dir(lucene::store::Directory* out_dir) { + if (out_dir != nullptr) { + out_dir->close(); + _CLDECDELETE(out_dir) + } +} + +void InvertedIndexFileWriter::add_index_info(int64_t index_id, const std::string& index_suffix, + int64_t compound_file_size) { + InvertedIndexFileInfo_IndexInfo index_info; + index_info.set_index_id(index_id); + index_info.set_index_suffix(index_suffix); + index_info.set_index_file_size(compound_file_size); + auto* new_index_info = _file_info.add_index_info(); + *new_index_info = index_info; +} + +std::pair InvertedIndexFileWriter::calculate_header_length( + const std::vector& sorted_files, lucene::store::Directory* directory) { + // Use RAMDirectory to calculate header length + lucene::store::RAMDirectory ram_dir; + auto* out_idx = ram_dir.createOutput("temp_idx"); + DBUG_EXECUTE_IF("InvertedIndexFileWriter::calculate_header_length_ram_output_is_nullptr", + { out_idx = nullptr; }) + if (out_idx == nullptr) { + LOG(WARNING) << "InvertedIndexFileWriter::calculate_header_length error: RAMDirectory " + "output is nullptr."; + _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error"); + } + std::unique_ptr ram_output(out_idx); + int32_t file_count = sorted_files.size(); + ram_output->writeVInt(file_count); + + int64_t header_file_length = 0; + const int64_t buffer_length = 16384; + uint8_t ram_buffer[buffer_length]; + int32_t header_file_count = 0; + for (const auto& file : sorted_files) { + ram_output->writeString(file.filename); + ram_output->writeLong(0); + ram_output->writeLong(file.filesize); + header_file_length += file.filesize; + + if (header_file_length <= DorisFSDirectory::MAX_HEADER_DATA_SIZE) { + copyFile(file.filename.c_str(), directory, ram_output.get(), ram_buffer, buffer_length); + header_file_count++; + } + } + + int64_t header_length = ram_output->getFilePointer(); + ram_output->close(); + ram_dir.close(); + return {header_length, header_file_count}; +} + +std::pair> +InvertedIndexFileWriter::create_output_stream_v1(int64_t index_id, + const std::string& index_suffix) { + io::Path cfs_path(InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix, index_id, + index_suffix)); + auto idx_path = cfs_path.parent_path(); + std::string idx_name = cfs_path.filename(); + + auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, idx_path.c_str()); + out_dir->set_file_writer_opts(_opts); + + auto* out = out_dir->createOutput(idx_name.c_str()); + DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr", + { out = nullptr; }); + if (out == nullptr) { + LOG(WARNING) << "InvertedIndexFileWriter::create_output_stream_v1 error: CompoundDirectory " + "output is nullptr."; + _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); + } + + std::unique_ptr output(out); + return {out_dir, std::move(output)}; +} + +void InvertedIndexFileWriter::write_header_and_data_v1(lucene::store::IndexOutput* output, + const std::vector& sorted_files, + lucene::store::Directory* directory, + int64_t header_length, + int32_t header_file_count) { + output->writeVInt(sorted_files.size()); + int64_t data_offset = header_length; + const int64_t buffer_length = 16384; + uint8_t buffer[buffer_length]; + + for (int i = 0; i < sorted_files.size(); ++i) { + auto file = sorted_files[i]; + output->writeString(file.filename); + + // DataOffset + if (i < header_file_count) { + // file data write in header, so we set its offset to -1. + output->writeLong(-1); + } else { + output->writeLong(data_offset); + } + output->writeLong(file.filesize); // FileLength + if (i < header_file_count) { + // append data + copyFile(file.filename.c_str(), directory, output, buffer, buffer_length); + } else { + data_offset += file.filesize; + } + } + + for (size_t i = header_file_count; i < sorted_files.size(); ++i) { + copyFile(sorted_files[i].filename.c_str(), directory, output, buffer, buffer_length); + } +} + +std::pair> +InvertedIndexFileWriter::create_output_stream_v2() { + io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; + auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str()); + out_dir->set_file_writer_opts(_opts); + DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is nullptr"; + auto compound_file_output = std::unique_ptr( + out_dir->createOutputV2(_idx_v2_writer.get())); + return std::make_pair(out_dir, std::move(compound_file_output)); +} + +void InvertedIndexFileWriter::write_version_and_indices_count(lucene::store::IndexOutput* output) { + // Write the version number + output->writeInt(InvertedIndexStorageFormatPB::V2); + + // Write the number of indices + const auto num_indices = static_cast(_indices_dirs.size()); + output->writeInt(num_indices); +} + +std::vector +InvertedIndexFileWriter::prepare_file_metadata_v2(int64_t& current_offset) { + std::vector file_metadata; + + for (const auto& entry : _indices_dirs) { + const int64_t index_id = entry.first.first; + const auto& index_suffix = entry.first.second; + auto* dir = entry.second.get(); + + // Get sorted files + auto sorted_files = prepare_sorted_files(dir); + + for (const auto& file : sorted_files) { + file_metadata.emplace_back(index_id, index_suffix, file.filename, current_offset, + file.filesize, dir); + current_offset += file.filesize; // Update the data offset + } + } + return file_metadata; +} + +void InvertedIndexFileWriter::write_index_headers_and_metadata( + lucene::store::IndexOutput* output, const std::vector& file_metadata) { + // Group files by index_id and index_suffix + std::map, std::vector> indices; + + for (const auto& meta : file_metadata) { + indices[{meta.index_id, meta.index_suffix}].push_back(meta); + } + + for (const auto& index_entry : indices) { + int64_t index_id = index_entry.first.first; + const std::string& index_suffix = index_entry.first.second; + const auto& files = index_entry.second; + + // Write the index ID and the number of files + output->writeLong(index_id); + output->writeInt(static_cast(index_suffix.length())); + output->writeBytes(reinterpret_cast(index_suffix.data()), + index_suffix.length()); + output->writeInt(static_cast(files.size())); + + // Write file metadata + for (const auto& file : files) { + output->writeInt(static_cast(file.filename.length())); + output->writeBytes(reinterpret_cast(file.filename.data()), + file.filename.length()); + output->writeLong(file.offset); + output->writeLong(file.length); + } + } +} + +void InvertedIndexFileWriter::copy_files_data_v2(lucene::store::IndexOutput* output, + const std::vector& file_metadata) { + const int64_t buffer_length = 16384; + uint8_t buffer[buffer_length]; + + for (const auto& meta : file_metadata) { + copyFile(meta.filename.c_str(), meta.directory, output, buffer, buffer_length); + } } } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h index 31e287d6dd3f71..3a2fcc1e6acaa7 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h @@ -28,7 +28,9 @@ #include "io/fs/file_system.h" #include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "runtime/exec_env.h" namespace doris { class TabletIndex; @@ -36,7 +38,7 @@ class TabletIndex; namespace segment_v2 { class DorisFSDirectory; using InvertedIndexDirectoryMap = - std::map, std::unique_ptr>; + std::map, std::shared_ptr>; class InvertedIndexFileWriter; using InvertedIndexFileWriterPtr = std::unique_ptr; @@ -58,16 +60,19 @@ class InvertedIndexFileWriter { _rowset_id(std::move(rowset_id)), _seg_id(seg_id), _storage_format(storage_format), - _idx_v2_writer(std::move(file_writer)) {} + _local_fs(io::global_local_filesystem()), + _idx_v2_writer(std::move(file_writer)) { + auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); + _tmp_dir = tmp_file_dir.native(); + } - Result open(const TabletIndex* index_meta); + Result> open(const TabletIndex* index_meta); Status delete_index(const TabletIndex* index_meta); Status initialize(InvertedIndexDirectoryMap& indices_dirs); - ~InvertedIndexFileWriter() = default; + virtual ~InvertedIndexFileWriter() = default; Status write_v2(); Status write_v1(); Status close(); - int64_t headerLength(); const InvertedIndexFileInfo* get_index_file_info() const { DCHECK(_closed) << debug_string(); return &_file_info; @@ -77,11 +82,7 @@ class InvertedIndexFileWriter { return _total_file_size; } const io::FileSystemSPtr& get_fs() const { return _fs; } - void sort_files(std::vector& file_infos); - void copyFile(const char* fileName, lucene::store::Directory* dir, - lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength); InvertedIndexStorageFormatPB get_storage_format() const { return _storage_format; } - void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = opts; } std::string debug_string() const { @@ -99,12 +100,61 @@ class InvertedIndexFileWriter { } private: + // Helper functions shared between write_v1 and write_v2 + std::vector prepare_sorted_files(lucene::store::Directory* directory); + void sort_files(std::vector& file_infos); + void copyFile(const char* fileName, lucene::store::Directory* dir, + lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength); + void finalize_output_dir(lucene::store::Directory* out_dir); + void add_index_info(int64_t index_id, const std::string& index_suffix, + int64_t compound_file_size); + int64_t headerLength(); + // Helper functions specific to write_v1 + std::pair calculate_header_length(const std::vector& sorted_files, + lucene::store::Directory* directory); + std::pair> + create_output_stream_v1(int64_t index_id, const std::string& index_suffix); + virtual void write_header_and_data_v1(lucene::store::IndexOutput* output, + const std::vector& sorted_files, + lucene::store::Directory* directory, + int64_t header_length, int32_t header_file_count); + // Helper functions specific to write_v2 + std::pair> + create_output_stream_v2(); + void write_version_and_indices_count(lucene::store::IndexOutput* output); + struct FileMetadata { + int64_t index_id; + std::string index_suffix; + std::string filename; + int64_t offset; + int64_t length; + lucene::store::Directory* directory; + + FileMetadata(int64_t id, const std::string& suffix, const std::string& file, int64_t off, + int64_t len, lucene::store::Directory* dir) + : index_id(id), + index_suffix(suffix), + filename(file), + offset(off), + length(len), + directory(dir) {} + }; + std::vector prepare_file_metadata_v2(int64_t& current_offset); + virtual void write_index_headers_and_metadata(lucene::store::IndexOutput* output, + const std::vector& file_metadata); + void copy_files_data_v2(lucene::store::IndexOutput* output, + const std::vector& file_metadata); + Status _insert_directory_into_map(int64_t index_id, const std::string& index_suffix, + std::shared_ptr dir); + // Member variables... InvertedIndexDirectoryMap _indices_dirs; const io::FileSystemSPtr _fs; std::string _index_path_prefix; std::string _rowset_id; int64_t _seg_id; InvertedIndexStorageFormatPB _storage_format; + std::string _tmp_dir; + const std::shared_ptr& _local_fs; // write to disk or stream io::FileWriterPtr _idx_v2_writer = nullptr; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 29fe4609e59e9c..a4f3ca55dd11c0 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -197,7 +197,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { bool create_index = true; bool close_dir_on_shutdown = true; auto index_writer = std::make_unique( - _dir, _analyzer.get(), create_index, close_dir_on_shutdown); + _dir.get(), _analyzer.get(), create_index, close_dir_on_shutdown); DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error", { index_writer->setRAMBufferSizeMB(-100); }) DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error", @@ -708,7 +708,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { std::unique_ptr _char_string_reader = nullptr; std::shared_ptr _bkd_writer = nullptr; InvertedIndexCtxSPtr _inverted_index_ctx = nullptr; - DorisFSDirectory* _dir = nullptr; + std::shared_ptr _dir = nullptr; const KeyCoder* _value_key_coder; const TabletIndex* _index_meta; InvertedIndexParserType _parser_type; diff --git a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp new file mode 100644 index 00000000000000..dd3b4195c141f9 --- /dev/null +++ b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp @@ -0,0 +1,515 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include +#include +#include +#include + +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/segment_v2/inverted_index_fs_directory.h" +#include "olap/storage_engine.h" + +namespace doris { +namespace segment_v2 { + +using namespace doris::vectorized; + +class InvertedIndexFileWriterTest : public ::testing::Test { +protected: + class MockDorisFSDirectoryFileLength : public DorisFSDirectory { + public: + //MOCK_METHOD(lucene::store::IndexOutput*, createOutput, (const char* name), (override)); + MOCK_METHOD(int64_t, fileLength, (const char* name), (const, override)); + //MOCK_METHOD(void, close, (), (override)); + //MOCK_METHOD(const char*, getObjectName, (), (const, override)); + }; + class MockDorisFSDirectoryOpenInput : public DorisFSDirectory { + public: + MOCK_METHOD(bool, openInput, + (const char* name, lucene::store::IndexInput*& ret, CLuceneError& err, + int32_t bufferSize), + (override)); + }; + void SetUp() override { + char buffer[MAX_PATH_LEN]; + ASSERT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + _current_dir = std::string(buffer); + _absolute_dir = _current_dir + "/" + std::string(dest_dir); + ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok()); + ASSERT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok()); + // tmp dir + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok()); + std::vector paths; + paths.emplace_back(std::string(tmp_dir), -1); + auto tmp_file_dirs = std::make_unique(paths); + EXPECT_TRUE(tmp_file_dirs->init().ok()); + ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs)); + + // use memory limit + int64_t inverted_index_cache_limit = 0; + _inverted_index_searcher_cache = std::unique_ptr( + InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit, + 256)); + + ExecEnv::GetInstance()->set_inverted_index_searcher_cache( + _inverted_index_searcher_cache.get()); + doris::EngineOptions options; + auto engine = std::make_unique(options); + _engine_ref = engine.get(); + _data_dir = std::make_unique(*_engine_ref, _absolute_dir); + ASSERT_TRUE(_data_dir->update_capacity().ok()); + ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); + + _fs = io::global_local_filesystem(); + _index_path_prefix = _absolute_dir + "/index_test"; + _rowset_id = "test_rowset"; + _seg_id = 1; + } + + void TearDown() override { + ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok()); + ExecEnv::GetInstance()->set_storage_engine(nullptr); + } + + std::unique_ptr create_mock_tablet_index(int64_t index_id, + const std::string& index_suffix) { + TabletIndexPB index_pb; + index_pb.set_index_id(index_id); + index_pb.set_index_suffix_name(index_suffix); + index_pb.set_index_type(IndexType::INVERTED); + auto index = std::make_unique(); + index->init_from_pb(index_pb); + return index; + } + + std::string _current_dir; + std::string _absolute_dir; + io::FileSystemSPtr _fs; + std::string _index_path_prefix; + std::string _rowset_id; + int64_t _seg_id; + StorageEngine* _engine_ref = nullptr; + std::unique_ptr _data_dir = nullptr; + std::unique_ptr _inverted_index_searcher_cache; + + constexpr static uint32_t MAX_PATH_LEN = 1024; + constexpr static std::string_view dest_dir = "./ut_dir/inverted_index_file_writer_test"; + constexpr static std::string_view tmp_dir = "./ut_dir/tmp"; +}; + +TEST_F(InvertedIndexFileWriterTest, InitializeTest) { + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2); + + InvertedIndexDirectoryMap indices_dirs; + indices_dirs.emplace(std::make_pair(1, "suffix1"), std::make_unique()); + indices_dirs.emplace(std::make_pair(2, "suffix2"), std::make_unique()); + + Status status = writer.initialize(indices_dirs); + ASSERT_TRUE(status.ok()); + + ASSERT_EQ(writer.get_storage_format(), InvertedIndexStorageFormatPB::V2); +} + +TEST_F(InvertedIndexFileWriterTest, OpenTest) { + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2); + + int64_t index_id = 1; + std::string index_suffix = "suffix1"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + auto open_result = writer.open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + ASSERT_NE(dir, nullptr); + + auto key = std::make_pair(index_id, index_suffix); + ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end()); + ASSERT_TRUE(writer._indices_dirs.find(key)->second.get() == dir.get()); +} + +TEST_F(InvertedIndexFileWriterTest, DeleteIndexTest) { + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2); + + InvertedIndexDirectoryMap indices_dirs; + int64_t index_id = 1; + std::string index_suffix = "suffix1"; + auto st = writer._insert_directory_into_map(index_id, index_suffix, + std::make_shared()); + if (!st.ok()) { + std::cerr << "_insert_directory_into_map error in DeleteIndexTest: " << st.msg() + << std::endl; + ASSERT_TRUE(false); + return; + } + auto key = std::make_pair(index_id, index_suffix); + ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end()); + + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + Status del_status = writer.delete_index(index_meta.get()); + ASSERT_TRUE(del_status.ok()); + ASSERT_TRUE(writer._indices_dirs.find(key) == writer._indices_dirs.end()); + + Status del_nonexist_status = writer.delete_index(index_meta.get()); + ASSERT_TRUE(del_nonexist_status.ok()); +} + +TEST_F(InvertedIndexFileWriterTest, WriteV1Test) { + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V1); + + int64_t index_id = 1; + std::string index_suffix = "suffix1"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + auto open_result = writer.open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + auto out_file = std::unique_ptr(dir->createOutput("write_v1_test")); + out_file->writeString("test1"); + out_file->close(); + dir->close(); + + Status close_status = writer.close(); + if (!close_status.ok()) std::cout << "close error:" << close_status.msg() << std::endl; + ASSERT_TRUE(close_status.ok()); + + const InvertedIndexFileInfo* file_info = writer.get_index_file_info(); + ASSERT_NE(file_info, nullptr); + auto index_info = file_info->index_info(0); + ASSERT_GT(index_info.index_file_size(), 0); + + int64_t total_size = writer.get_index_file_total_size(); + ASSERT_GT(total_size, 0); + ASSERT_EQ(total_size, index_info.index_file_size()); + std::cout << "total_size:" << total_size << std::endl; +} + +TEST_F(InvertedIndexFileWriterTest, WriteV2Test) { + io::FileWriterPtr file_writer; + std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); + io::FileWriterOptions opts; + Status st = _fs->create_file(index_path, &file_writer, &opts); + ASSERT_TRUE(st.ok()); + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2, std::move(file_writer)); + + int64_t index_id_1 = 1; + std::string index_suffix_1 = "suffix1"; + auto index_meta_1 = create_mock_tablet_index(index_id_1, index_suffix_1); + ASSERT_NE(index_meta_1, nullptr); + auto open_result_1 = writer.open(index_meta_1.get()); + ASSERT_TRUE(open_result_1.has_value()); + auto dir_1 = open_result_1.value(); + auto out_file_1 = std::unique_ptr( + dir_1->createOutput("write_v2_test_index_1")); + out_file_1->writeString("test1"); + out_file_1->close(); + dir_1->close(); + int64_t index_id_2 = 2; + std::string index_suffix_2 = "suffix2"; + auto index_meta_2 = create_mock_tablet_index(index_id_2, index_suffix_2); + ASSERT_NE(index_meta_2, nullptr); + auto open_result_2 = writer.open(index_meta_2.get()); + ASSERT_TRUE(open_result_2.has_value()); + auto dir_2 = open_result_2.value(); + auto out_file_2 = std::unique_ptr( + dir_2->createOutput("write_v2_test_index_2")); + out_file_2->writeString("test2"); + out_file_2->close(); + dir_2->close(); + Status close_status = writer.close(); + ASSERT_TRUE(close_status.ok()); + + const InvertedIndexFileInfo* file_info = writer.get_index_file_info(); + ASSERT_NE(file_info, nullptr); + ASSERT_GT(file_info->index_size(), 0); + + int64_t total_size = writer.get_index_file_total_size(); + ASSERT_GT(total_size, 0); + ASSERT_EQ(total_size, file_info->index_size()); + std::cout << "total_size:" << total_size << std::endl; +} + +TEST_F(InvertedIndexFileWriterTest, HeaderLengthTest) { + InvertedIndexDirectoryMap indices_dirs; + auto mock_dir1 = std::make_shared(); + auto mock_dir2 = std::make_shared(); + std::string local_fs_index_path_1 = InvertedIndexDescriptor::get_temporary_index_path( + ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), _rowset_id, + _seg_id, 1, "suffix1"); + std::string local_fs_index_path_2 = InvertedIndexDescriptor::get_temporary_index_path( + ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), _rowset_id, + _seg_id, 2, "suffix2"); + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path_1).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path_1).ok()); + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path_2).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path_2).ok()); + mock_dir1->init(_fs, local_fs_index_path_1.c_str()); + mock_dir2->init(_fs, local_fs_index_path_2.c_str()); + std::vector files1 = {"file1.dat", "file2.dat"}; + std::vector files2 = {"file3.dat"}; + for (auto& file : files1) { + auto out_file_1 = + std::unique_ptr(mock_dir1->createOutput(file.c_str())); + out_file_1->writeString("test1"); + out_file_1->close(); + } + for (auto& file : files2) { + auto out_file_2 = + std::unique_ptr(mock_dir2->createOutput(file.c_str())); + out_file_2->writeString("test2"); + out_file_2->close(); + } + auto insertDirectory = [&](InvertedIndexFileWriter& writer, int64_t index_id, + const std::string& suffix, + std::shared_ptr& mock_dir) { + Status st = writer._insert_directory_into_map(index_id, suffix, mock_dir); + if (!st.ok()) { + std::cerr << "_insert_directory_into_map error in HeaderLengthTest: " << st.msg() + << std::endl; + assert(false); + return; + } + }; + + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2); + insertDirectory(writer, 1, "suffix1", mock_dir1); + insertDirectory(writer, 2, "suffix2", mock_dir2); + + int64_t header_length = writer.headerLength(); + + // sizeof(int32_t) * 2 + // + (sizeof(int64_t) + sizeof(int32_t) + suffix.length() + sizeof(int32_t)) * num_indices + // + (sizeof(int32_t) + filename.length() + sizeof(int64_t) + sizeof(int64_t)) * num_files + int64_t expected_header_length = 0; + expected_header_length += sizeof(int32_t) * 2; // version and num_indices + + // Index 1 + expected_header_length += sizeof(int64_t); // index_id + expected_header_length += sizeof(int32_t); // suffix size + expected_header_length += 7; // "suffix1" + expected_header_length += sizeof(int32_t); // file_count + expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) + sizeof(int64_t); // file1.dat + expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) + sizeof(int64_t); // file2.dat + + // Index 2 + expected_header_length += sizeof(int64_t); // index_id + expected_header_length += sizeof(int32_t); // suffix size + expected_header_length += 7; // "suffix2" + expected_header_length += sizeof(int32_t); // file_count + expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) + sizeof(int64_t); // file3.dat + + ASSERT_EQ(header_length, expected_header_length); +} + +TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) { + auto mock_dir = std::make_shared(); + std::string local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path( + ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), _rowset_id, + _seg_id, 1, "suffix1"); + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok()); + mock_dir->init(_fs, local_fs_index_path.c_str()); + std::vector files = {"0.segments", "0.fnm", "0.tii", "nullbitmap", "write.lock"}; + for (auto& file : files) { + auto out_file_1 = + std::unique_ptr(mock_dir->createOutput(file.c_str())); + out_file_1->writeString("test1"); + out_file_1->close(); + } + + EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments"))) + .WillOnce(testing::Return(1000)); + EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000)); + EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500)); + EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500)); + + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2); + auto st = writer._insert_directory_into_map(1, "suffix1", mock_dir); + if (!st.ok()) { + std::cerr << "_insert_directory_into_map error in PrepareSortedFilesTest: " << st.msg() + << std::endl; + ASSERT_TRUE(false); + return; + } + + std::vector sorted_files = + writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1, "suffix1")].get()); + + // 1. 0.segments (priority 1, size 1000) + // 2. 0.fnm (priority 2, size 2000) + // 3. 0.tii (priority 3, size 1500) + // 4. nullbitmap (priority 4, size 500) + + std::vector expected_order = {"0.segments", "0.fnm", "0.tii", "nullbitmap"}; + ASSERT_EQ(sorted_files.size(), expected_order.size()); + + for (size_t i = 0; i < expected_order.size(); ++i) { + EXPECT_EQ(sorted_files[i].filename, expected_order[i]); + if (sorted_files[i].filename == "0.segments") { + EXPECT_EQ(sorted_files[i].filesize, 1000); + } else if (sorted_files[i].filename == "0.fnm") { + EXPECT_EQ(sorted_files[i].filesize, 2000); + } else if (sorted_files[i].filename == "0.tii") { + EXPECT_EQ(sorted_files[i].filesize, 1500); + } else if (sorted_files[i].filename == "nullbitmap") { + EXPECT_EQ(sorted_files[i].filesize, 500); + } + } +} +/*TEST_F(InvertedIndexFileWriterTest, CopyFileTest_OpenInputFailure) { + auto mock_dir = std::make_shared(); + std::string local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path( + ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), _rowset_id, + _seg_id, 1, "suffix1"); + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok()); + mock_dir->init(_fs, local_fs_index_path.c_str()); + std::vector files = {"0.segments", "0.fnm", "0.tii", "nullbitmap", "write.lock"}; + for (auto& file : files) { + auto out_file_1 = + std::unique_ptr(mock_dir->createOutput(file.c_str())); + out_file_1->writeString("test1"); + out_file_1->close(); + } + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2); + auto st = writer._insert_directory_into_map(1, "suffix1", mock_dir); + if (!st.ok()) { + std::cerr << "_insert_directory_into_map error in CopyFileTest_OpenInputFailure: " + << st.msg() << std::endl; + ASSERT_TRUE(false); + return; + } + + EXPECT_CALL(*mock_dir, + openInput(::testing::StrEq("0.segments"), ::testing::_, ::testing::_, ::testing::_)) + .WillOnce(::testing::Invoke([&](const char* name, lucene::store::IndexInput*& ret, + CLuceneError& err_ref, int bufferSize) { + err_ref.set(CL_ERR_IO, fmt::format("Could not open file, file is {}", name).data()); + return false; + })); + + uint8_t buffer[16384]; + std::string error_message; + try { + writer.copyFile("0.segments", mock_dir.get(), nullptr, buffer, sizeof(buffer)); + } catch (CLuceneError& err) { + error_message = err.what(); + } + ASSERT_EQ(error_message, "Could not open file, file is 0.segments"); +}*/ +class InvertedIndexFileWriterMock : public InvertedIndexFileWriter { +public: + InvertedIndexFileWriterMock(const io::FileSystemSPtr& fs, const std::string& index_path_prefix, + const std::string& rowset_id, int32_t segment_id, + InvertedIndexStorageFormatPB storage_format) + : InvertedIndexFileWriter(fs, index_path_prefix, rowset_id, segment_id, + storage_format) {} + + MOCK_METHOD(void, write_header_and_data_v1, + (lucene::store::IndexOutput * output, const std::vector& files, + lucene::store::Directory* dir, int64_t header_length, int32_t file_count), + (override)); +}; +TEST_F(InvertedIndexFileWriterTest, WriteV1ExceptionHandlingTest) { + InvertedIndexFileWriterMock writer_mock(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V1); + + int64_t index_id = 1; + std::string index_suffix = "suffix1"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + auto open_result = writer_mock.open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + + auto out_file = std::unique_ptr(dir->createOutput("test_file")); + out_file->writeString("test data"); + out_file->close(); + dir->close(); + EXPECT_CALL(writer_mock, write_header_and_data_v1(::testing::_, ::testing::_, ::testing::_, + ::testing::_, ::testing::_)) + .WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated exception", false))); + + Status status = writer_mock.write_v1(); + ASSERT_FALSE(status.ok()); + ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR); +} +class InvertedIndexFileWriterMockV2 : public InvertedIndexFileWriter { +public: + InvertedIndexFileWriterMockV2(const io::FileSystemSPtr& fs, + const std::string& index_path_prefix, + const std::string& rowset_id, int32_t segment_id, + InvertedIndexStorageFormatPB storage_format, + io::FileWriterPtr file_writer) + : InvertedIndexFileWriter(fs, index_path_prefix, rowset_id, segment_id, storage_format, + std::move(file_writer)) {} + + MOCK_METHOD(void, write_index_headers_and_metadata, + (lucene::store::IndexOutput * compound_file_output, + const std::vector& file_metadata), + (override)); +}; + +TEST_F(InvertedIndexFileWriterTest, WriteV2ExceptionHandlingTest) { + io::FileWriterPtr file_writer; + std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); + io::FileWriterOptions opts; + Status st = _fs->create_file(index_path, &file_writer, &opts); + ASSERT_TRUE(st.ok()); + InvertedIndexFileWriterMockV2 writer_mock(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2, + std::move(file_writer)); + + int64_t index_id = 1; + std::string index_suffix = "suffix1"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + auto open_result = writer_mock.open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + + auto out_file = std::unique_ptr(dir->createOutput("test_file")); + out_file->writeString("test data"); + out_file->close(); + dir->close(); + + EXPECT_CALL(writer_mock, write_index_headers_and_metadata(::testing::_, ::testing::_)) + .WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated exception", false))); + + Status status = writer_mock.write_v2(); + ASSERT_FALSE(status.ok()); + ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR); +} + +} // namespace segment_v2 +} // namespace doris