diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index d4a374ad206485..34e52ccb5b0be5 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -486,10 +486,35 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) { Status Compaction::do_inverted_index_compaction() { const auto& ctx = _output_rs_writer->context(); if (!config::inverted_index_compaction_enable || _input_row_num <= 0 || - !_stats.rowid_conversion || ctx.columns_to_do_index_compaction.empty()) { + ctx.columns_to_do_index_compaction.empty()) { return Status::OK(); } + auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { + LOG(WARNING) << "failed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id + << ". index_id=" << index_id; + for (auto& rowset : _input_rowsets) { + rowset->set_skip_index_compaction(column_uniq_id); + LOG(INFO) << "mark skipping inverted index compaction next time" + << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() + << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; + } + }; + + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_rowid_conversion_null", + { _stats.rowid_conversion = nullptr; }) + if (!_stats.rowid_conversion) { + LOG(WARNING) << "failed to do index compaction, rowid conversion is null" + << ". tablet=" << _tablet->tablet_id() + << ", input row number=" << _input_row_num; + mark_skip_index_compaction(ctx, error_handler); + + return Status::Error( + "failed to do index compaction, rowid conversion is null. tablet={}", + _tablet->tablet_id()); + } + OlapStopWatch inverted_watch; // translation vec @@ -512,8 +537,7 @@ Status Compaction::do_inverted_index_compaction() { auto src_segment_num = src_seg_to_id_map.size(); auto dest_segment_num = dest_segment_num_rows.size(); - DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", - { dest_segment_num = 0; }) + // when all the input rowsets are deleted, the output rowset will be empty and dest_segment_num will be 0. if (dest_segment_num <= 0) { LOG(INFO) << "skip doing index compaction due to no output segments" << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num @@ -591,27 +615,62 @@ Status Compaction::do_inverted_index_compaction() { DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_find_rowset_error", { find_it = rs_id_to_rowset_map.end(); }) if (find_it == rs_id_to_rowset_map.end()) [[unlikely]] { - // DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id; - return Status::InternalError("cannot find rowset. tablet_id={} rowset_id={}", - _tablet->tablet_id(), rowset_id.to_string()); + LOG(WARNING) << "failed to do index compaction, cannot find rowset. tablet_id=" + << _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string(); + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "failed to do index compaction, cannot find rowset. tablet_id={} rowset_id={}", + _tablet->tablet_id(), rowset_id.to_string()); } auto* rowset = find_it->second; auto fs = rowset->rowset_meta()->fs(); DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_get_fs_error", { fs = nullptr; }) if (!fs) { - return Status::InternalError("get fs failed, resource_id={}", - rowset->rowset_meta()->resource_id()); + LOG(WARNING) << "failed to do index compaction, get fs failed. resource_id=" + << rowset->rowset_meta()->resource_id(); + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "get fs failed, resource_id={}", rowset->rowset_meta()->resource_id()); } - auto seg_path = DORIS_TRY(rowset->segment_path(seg_id)); + auto seg_path = rowset->segment_path(seg_id); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_seg_path_nullptr", { + seg_path = ResultError(Status::Error( + "do_inverted_index_compaction_seg_path_nullptr")); + }) + if (!seg_path.has_value()) { + LOG(WARNING) << "failed to do index compaction, get segment path failed. tablet_id=" + << _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string() + << " seg_id=" << seg_id; + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "get segment path failed. tablet_id={} rowset_id={} seg_id={}", + _tablet->tablet_id(), rowset_id.to_string(), seg_id); + } auto inverted_index_file_reader = std::make_unique( - fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, + fs, + std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value())}, _cur_tablet_schema->get_inverted_index_storage_format(), rowset->rowset_meta()->inverted_index_file_info(seg_id)); - RETURN_NOT_OK_STATUS_WITH_WARN( - inverted_index_file_reader->init(config::inverted_index_read_buffer_size), - "inverted_index_file_reader init faiqled"); + auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_init_inverted_index_file_reader", + { + st = Status::Error( + "debug point: " + "Compaction::do_inverted_index_compaction_init_inverted_index_" + "file_reader error"); + }) + if (!st.ok()) { + LOG(WARNING) << "failed to do index compaction, init inverted index file reader " + "failed. tablet_id=" + << _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string() + << " seg_id=" << seg_id; + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "init inverted index file reader failed. tablet_id={} rowset_id={} seg_id={}", + _tablet->tablet_id(), rowset_id.to_string(), seg_id); + } inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader); } @@ -619,7 +678,20 @@ Status Compaction::do_inverted_index_compaction() { // format: rowsetId_segmentId auto& inverted_index_file_writers = dynamic_cast(_output_rs_writer.get()) ->inverted_index_file_writers(); - DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num); + DBUG_EXECUTE_IF( + "Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error", + { inverted_index_file_writers.clear(); }) + if (inverted_index_file_writers.size() != dest_segment_num) { + LOG(WARNING) << "failed to do index compaction, dest segment num not match. tablet_id=" + << _tablet->tablet_id() << " dest_segment_num=" << dest_segment_num + << " inverted_index_file_writers.size()=" + << inverted_index_file_writers.size(); + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "dest segment num not match. tablet_id={} dest_segment_num={} " + "inverted_index_file_writers.size()={}", + _tablet->tablet_id(), dest_segment_num, inverted_index_file_writers.size()); + } // use tmp file dir to store index files auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); @@ -628,18 +700,6 @@ Status Compaction::do_inverted_index_compaction() { << ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num << ", destination index size=" << dest_segment_num << "."; - auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id - << ". index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; - } - }; - Status status = Status::OK(); for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) { auto col = _cur_tablet_schema->column_by_uid(column_uniq_id); @@ -649,6 +709,10 @@ Status Compaction::do_inverted_index_compaction() { if (index_meta == nullptr) { status = Status::Error( fmt::format("Can not find index_meta for col {}", col.name())); + LOG(WARNING) << "failed to do index compaction, can not find index_meta for column" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id; + error_handler(-1, column_uniq_id); break; } @@ -662,6 +726,11 @@ Status Compaction::do_inverted_index_compaction() { "debug point: Compaction::open_index_file_reader error")); }) if (!res.has_value()) { + LOG(WARNING) << "failed to do index compaction, open inverted index file " + "reader failed" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id + << ", src_segment_id=" << src_segment_id; throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg()); } src_idx_dirs[src_segment_id] = std::move(res.value()); @@ -673,6 +742,11 @@ Status Compaction::do_inverted_index_compaction() { "debug point: Compaction::open_inverted_index_file_writer error")); }) if (!res.has_value()) { + LOG(WARNING) << "failed to do index compaction, open inverted index file " + "writer failed" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id + << ", dest_segment_id=" << dest_segment_id; throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg()); } // Destination directories in dest_index_dirs do not need to be deconstructed, @@ -705,6 +779,23 @@ Status Compaction::do_inverted_index_compaction() { return Status::OK(); } +void Compaction::mark_skip_index_compaction( + const RowsetWriterContext& context, + const std::function& error_handler) { + for (auto&& column_uniq_id : context.columns_to_do_index_compaction) { + auto col = _cur_tablet_schema->column_by_uid(column_uniq_id); + const auto* index_meta = _cur_tablet_schema->inverted_index(col); + if (index_meta == nullptr) { + LOG(WARNING) << "mark skip index compaction, can not find index_meta for column" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id; + error_handler(-1, column_uniq_id); + continue; + } + error_handler(index_meta->index_id(), column_uniq_id); + } +} + void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { for (const auto& index : _cur_tablet_schema->inverted_indexes()) { auto col_unique_ids = index->col_unique_ids(); @@ -780,7 +871,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { // TODO: inverted_index_path auto seg_path = rowset->segment_path(i); DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_seg_path_nullptr", { - seg_path = ResultError(Status::Error("error")); + seg_path = ResultError(Status::Error( + "construct_skip_inverted_index_seg_path_nullptr")); }) if (!seg_path) { LOG(WARNING) << seg_path.error(); @@ -791,8 +883,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { try { auto inverted_index_file_reader = std::make_unique( fs, - std::string { - InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)}, + std::string {InvertedIndexDescriptor::get_index_file_path_prefix( + seg_path.value())}, _cur_tablet_schema->get_inverted_index_storage_format(), rowset->rowset_meta()->inverted_index_file_info(i)); auto st = inverted_index_file_reader->init( diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index d81c1444e08e6a..88bdca59cf8f63 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -70,6 +70,10 @@ class Compaction { // merge inverted index files Status do_inverted_index_compaction(); + // mark all columns in columns_to_do_index_compaction to skip index compaction next time. + void mark_skip_index_compaction(const RowsetWriterContext& context, + const std::function& error_handler); + void construct_index_compaction_columns(RowsetWriterContext& ctx); virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) = 0; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp index f988c46c027c26..dcbdca921ab8e8 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -79,7 +79,16 @@ Status compact_column(int64_t index_id, // delete temporary segment_path, only when inverted_index_ram_dir_enable is false if (!config::inverted_index_ram_dir_enable) { - std::ignore = io::global_local_filesystem()->delete_directory(tmp_path.data()); + auto st = io::global_local_filesystem()->delete_directory(tmp_path.data()); + DBUG_EXECUTE_IF("compact_column_delete_tmp_path_error", { + st = Status::Error( + "debug point: compact_column_delete_tmp_path_error in index compaction"); + }) + if (!st.ok()) { + LOG(WARNING) << "compact column failed to delete tmp path: " << tmp_path + << ", error: " << st.to_string(); + return st; + } } return Status::OK(); } diff --git a/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy index ac3cd8125a863c..b54f6374d833b8 100644 --- a/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy @@ -181,30 +181,26 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") { "compact_column_create_index_writer_error", "compact_column_indexCompaction_error", "compact_column_index_writer_close_error", - "compact_column_src_index_dirs_close_error", + "compact_column_delete_tmp_path_error", "Compaction::do_inverted_index_compaction_find_rowset_error", "Compaction::do_inverted_index_compaction_get_fs_error", - "Compaction::do_inverted_index_compaction_index_file_reader_init_error", - // "Compaction::do_inverted_index_compaction_file_size_status_not_ok", // v2 do not do index compaction "Compaction::do_inverted_index_compaction_can_not_find_index_meta", - "Compaction::do_inverted_index_compaction_index_properties_different", - "Compaction::do_inverted_index_compaction_index_file_writer_close_not_ok", - "Compaction::construct_skip_inverted_index_index_reader_close_error" + "Compaction::do_inverted_index_compaction_rowid_conversion_null", + "Compaction::do_inverted_index_compaction_seg_path_nullptr", + "Compaction::do_inverted_index_compaction_init_inverted_index_file_reader", + "Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error" ] def debug_points_normal_compaction = [ - "compact_column_local_tmp_dir_delete_error", - // "Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", // query result not match without inverted index - "Compaction::do_inverted_index_compaction_index_file_reader_init_not_found", "Compaction::construct_skip_inverted_index_is_skip_index_compaction", "Compaction::construct_skip_inverted_index_get_fs_error", "Compaction::construct_skip_inverted_index_index_meta_nullptr", "Compaction::construct_skip_inverted_index_seg_path_nullptr", + "Compaction::do_inverted_index_compaction_index_properties_different", + "Compaction::construct_skip_inverted_index_index_files_count", + "Compaction::construct_skip_inverted_index_index_reader_close_error", "Compaction::construct_skip_inverted_index_index_file_reader_init_status_not_ok", - "Compaction::construct_skip_inverted_index_index_file_reader_exist_status_not_ok", - "Compaction::construct_skip_inverted_index_index_file_reader_exist_false", - "Compaction::construct_skip_inverted_index_index_file_reader_open_error", - "Compaction::construct_skip_inverted_index_index_files_count" + "Compaction::construct_skip_inverted_index_index_file_reader_open_error" ] def run_test = { tablets, debug_point, abnormal -> @@ -221,6 +217,9 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") { } } + if (debug_point == "compact_column_delete_tmp_path_error") { + set_be_config.call("inverted_index_ram_dir_enable", "false") + } // before full compaction, there are 7 rowsets. int rowsetCount = get_rowset_count.call(tablets); assert (rowsetCount == 7 * replicaNum) @@ -258,6 +257,10 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") { } run_sql.call() + + if (debug_point == "compact_column_delete_tmp_path_error") { + set_be_config.call("inverted_index_ram_dir_enable", "true") + } } def create_and_test_table = { table_name, key_type, debug_points, is_abnormal ->