Skip to content

Commit

Permalink
[test](index compaction)Add index compaction exception fault injectio…
Browse files Browse the repository at this point in the history
…n cases (#45127)

Problem Summary:

1. Add skip index compaction logic when index compaction encounters
exceptions
2. Optimize fault injection regression cases
  • Loading branch information
qidaye authored and Your Name committed Dec 13, 2024
1 parent 0f95d59 commit 3d83142
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 43 deletions.
150 changes: 121 additions & 29 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,35 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
Status Compaction::do_inverted_index_compaction() {
const auto& ctx = _output_rs_writer->context();
if (!config::inverted_index_compaction_enable || _input_row_num <= 0 ||
!_stats.rowid_conversion || ctx.columns_to_do_index_compaction.empty()) {
ctx.columns_to_do_index_compaction.empty()) {
return Status::OK();
}

auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) {
LOG(WARNING) << "failed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id
<< ". index_id=" << index_id;
for (auto& rowset : _input_rowsets) {
rowset->set_skip_index_compaction(column_uniq_id);
LOG(INFO) << "mark skipping inverted index compaction next time"
<< ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id()
<< ", column uniq id=" << column_uniq_id << ", index_id=" << index_id;
}
};

DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_rowid_conversion_null",
{ _stats.rowid_conversion = nullptr; })
if (!_stats.rowid_conversion) {
LOG(WARNING) << "failed to do index compaction, rowid conversion is null"
<< ". tablet=" << _tablet->tablet_id()
<< ", input row number=" << _input_row_num;
mark_skip_index_compaction(ctx, error_handler);

return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"failed to do index compaction, rowid conversion is null. tablet={}",
_tablet->tablet_id());
}

OlapStopWatch inverted_watch;

// translation vec
Expand All @@ -512,8 +537,7 @@ Status Compaction::do_inverted_index_compaction() {
auto src_segment_num = src_seg_to_id_map.size();
auto dest_segment_num = dest_segment_num_rows.size();

DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_dest_segment_num_is_zero",
{ dest_segment_num = 0; })
// when all the input rowsets are deleted, the output rowset will be empty and dest_segment_num will be 0.
if (dest_segment_num <= 0) {
LOG(INFO) << "skip doing index compaction due to no output segments"
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
Expand Down Expand Up @@ -591,35 +615,83 @@ Status Compaction::do_inverted_index_compaction() {
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_find_rowset_error",
{ find_it = rs_id_to_rowset_map.end(); })
if (find_it == rs_id_to_rowset_map.end()) [[unlikely]] {
// DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id;
return Status::InternalError("cannot find rowset. tablet_id={} rowset_id={}",
_tablet->tablet_id(), rowset_id.to_string());
LOG(WARNING) << "failed to do index compaction, cannot find rowset. tablet_id="
<< _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string();
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"failed to do index compaction, cannot find rowset. tablet_id={} rowset_id={}",
_tablet->tablet_id(), rowset_id.to_string());
}

auto* rowset = find_it->second;
auto fs = rowset->rowset_meta()->fs();
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_get_fs_error", { fs = nullptr; })
if (!fs) {
return Status::InternalError("get fs failed, resource_id={}",
rowset->rowset_meta()->resource_id());
LOG(WARNING) << "failed to do index compaction, get fs failed. resource_id="
<< rowset->rowset_meta()->resource_id();
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"get fs failed, resource_id={}", rowset->rowset_meta()->resource_id());
}

auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
auto seg_path = rowset->segment_path(seg_id);
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_seg_path_nullptr", {
seg_path = ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>(
"do_inverted_index_compaction_seg_path_nullptr"));
})
if (!seg_path.has_value()) {
LOG(WARNING) << "failed to do index compaction, get segment path failed. tablet_id="
<< _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string()
<< " seg_id=" << seg_id;
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"get segment path failed. tablet_id={} rowset_id={} seg_id={}",
_tablet->tablet_id(), rowset_id.to_string(), seg_id);
}
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
fs,
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value())},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(seg_id));
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_reader->init(config::inverted_index_read_buffer_size),
"inverted_index_file_reader init faiqled");
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_init_inverted_index_file_reader",
{
st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: "
"Compaction::do_inverted_index_compaction_init_inverted_index_"
"file_reader error");
})
if (!st.ok()) {
LOG(WARNING) << "failed to do index compaction, init inverted index file reader "
"failed. tablet_id="
<< _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string()
<< " seg_id=" << seg_id;
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"init inverted index file reader failed. tablet_id={} rowset_id={} seg_id={}",
_tablet->tablet_id(), rowset_id.to_string(), seg_id);
}
inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader);
}

// dest index files
// format: rowsetId_segmentId
auto& inverted_index_file_writers = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get())
->inverted_index_file_writers();
DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num);
DBUG_EXECUTE_IF(
"Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error",
{ inverted_index_file_writers.clear(); })
if (inverted_index_file_writers.size() != dest_segment_num) {
LOG(WARNING) << "failed to do index compaction, dest segment num not match. tablet_id="
<< _tablet->tablet_id() << " dest_segment_num=" << dest_segment_num
<< " inverted_index_file_writers.size()="
<< inverted_index_file_writers.size();
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"dest segment num not match. tablet_id={} dest_segment_num={} "
"inverted_index_file_writers.size()={}",
_tablet->tablet_id(), dest_segment_num, inverted_index_file_writers.size());
}

// use tmp file dir to store index files
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
Expand All @@ -628,18 +700,6 @@ Status Compaction::do_inverted_index_compaction() {
<< ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num
<< ", destination index size=" << dest_segment_num << ".";

auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) {
LOG(WARNING) << "failed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id
<< ". index_id=" << index_id;
for (auto& rowset : _input_rowsets) {
rowset->set_skip_index_compaction(column_uniq_id);
LOG(INFO) << "mark skipping inverted index compaction next time"
<< ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id()
<< ", column uniq id=" << column_uniq_id << ", index_id=" << index_id;
}
};

Status status = Status::OK();
for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
Expand All @@ -649,6 +709,10 @@ Status Compaction::do_inverted_index_compaction() {
if (index_meta == nullptr) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
fmt::format("Can not find index_meta for col {}", col.name()));
LOG(WARNING) << "failed to do index compaction, can not find index_meta for column"
<< ". tablet=" << _tablet->tablet_id()
<< ", column uniq id=" << column_uniq_id;
error_handler(-1, column_uniq_id);
break;
}

Expand All @@ -662,6 +726,11 @@ Status Compaction::do_inverted_index_compaction() {
"debug point: Compaction::open_index_file_reader error"));
})
if (!res.has_value()) {
LOG(WARNING) << "failed to do index compaction, open inverted index file "
"reader failed"
<< ". tablet=" << _tablet->tablet_id()
<< ", column uniq id=" << column_uniq_id
<< ", src_segment_id=" << src_segment_id;
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
src_idx_dirs[src_segment_id] = std::move(res.value());
Expand All @@ -673,6 +742,11 @@ Status Compaction::do_inverted_index_compaction() {
"debug point: Compaction::open_inverted_index_file_writer error"));
})
if (!res.has_value()) {
LOG(WARNING) << "failed to do index compaction, open inverted index file "
"writer failed"
<< ". tablet=" << _tablet->tablet_id()
<< ", column uniq id=" << column_uniq_id
<< ", dest_segment_id=" << dest_segment_id;
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
// Destination directories in dest_index_dirs do not need to be deconstructed,
Expand Down Expand Up @@ -705,6 +779,23 @@ Status Compaction::do_inverted_index_compaction() {
return Status::OK();
}

void Compaction::mark_skip_index_compaction(
const RowsetWriterContext& context,
const std::function<void(int64_t, int64_t)>& error_handler) {
for (auto&& column_uniq_id : context.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->inverted_index(col);
if (index_meta == nullptr) {
LOG(WARNING) << "mark skip index compaction, can not find index_meta for column"
<< ". tablet=" << _tablet->tablet_id()
<< ", column uniq id=" << column_uniq_id;
error_handler(-1, column_uniq_id);
continue;
}
error_handler(index_meta->index_id(), column_uniq_id);
}
}

void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
for (const auto& index : _cur_tablet_schema->inverted_indexes()) {
auto col_unique_ids = index->col_unique_ids();
Expand Down Expand Up @@ -780,7 +871,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
// TODO: inverted_index_path
auto seg_path = rowset->segment_path(i);
DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_seg_path_nullptr", {
seg_path = ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>("error"));
seg_path = ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>(
"construct_skip_inverted_index_seg_path_nullptr"));
})
if (!seg_path) {
LOG(WARNING) << seg_path.error();
Expand All @@ -791,8 +883,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
try {
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs,
std::string {
InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)},
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(
seg_path.value())},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(i));
auto st = inverted_index_file_reader->init(
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class Compaction {
// merge inverted index files
Status do_inverted_index_compaction();

// mark all columns in columns_to_do_index_compaction to skip index compaction next time.
void mark_skip_index_compaction(const RowsetWriterContext& context,
const std::function<void(int64_t, int64_t)>& error_handler);

void construct_index_compaction_columns(RowsetWriterContext& ctx);

virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) = 0;
Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@ Status compact_column(int64_t index_id,

// delete temporary segment_path, only when inverted_index_ram_dir_enable is false
if (!config::inverted_index_ram_dir_enable) {
std::ignore = io::global_local_filesystem()->delete_directory(tmp_path.data());
auto st = io::global_local_filesystem()->delete_directory(tmp_path.data());
DBUG_EXECUTE_IF("compact_column_delete_tmp_path_error", {
st = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
"debug point: compact_column_delete_tmp_path_error in index compaction");
})
if (!st.ok()) {
LOG(WARNING) << "compact column failed to delete tmp path: " << tmp_path
<< ", error: " << st.to_string();
return st;
}
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,30 +181,26 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") {
"compact_column_create_index_writer_error",
"compact_column_indexCompaction_error",
"compact_column_index_writer_close_error",
"compact_column_src_index_dirs_close_error",
"compact_column_delete_tmp_path_error",
"Compaction::do_inverted_index_compaction_find_rowset_error",
"Compaction::do_inverted_index_compaction_get_fs_error",
"Compaction::do_inverted_index_compaction_index_file_reader_init_error",
// "Compaction::do_inverted_index_compaction_file_size_status_not_ok", // v2 do not do index compaction
"Compaction::do_inverted_index_compaction_can_not_find_index_meta",
"Compaction::do_inverted_index_compaction_index_properties_different",
"Compaction::do_inverted_index_compaction_index_file_writer_close_not_ok",
"Compaction::construct_skip_inverted_index_index_reader_close_error"
"Compaction::do_inverted_index_compaction_rowid_conversion_null",
"Compaction::do_inverted_index_compaction_seg_path_nullptr",
"Compaction::do_inverted_index_compaction_init_inverted_index_file_reader",
"Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error"
]

def debug_points_normal_compaction = [
"compact_column_local_tmp_dir_delete_error",
// "Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", // query result not match without inverted index
"Compaction::do_inverted_index_compaction_index_file_reader_init_not_found",
"Compaction::construct_skip_inverted_index_is_skip_index_compaction",
"Compaction::construct_skip_inverted_index_get_fs_error",
"Compaction::construct_skip_inverted_index_index_meta_nullptr",
"Compaction::construct_skip_inverted_index_seg_path_nullptr",
"Compaction::do_inverted_index_compaction_index_properties_different",
"Compaction::construct_skip_inverted_index_index_files_count",
"Compaction::construct_skip_inverted_index_index_reader_close_error",
"Compaction::construct_skip_inverted_index_index_file_reader_init_status_not_ok",
"Compaction::construct_skip_inverted_index_index_file_reader_exist_status_not_ok",
"Compaction::construct_skip_inverted_index_index_file_reader_exist_false",
"Compaction::construct_skip_inverted_index_index_file_reader_open_error",
"Compaction::construct_skip_inverted_index_index_files_count"
"Compaction::construct_skip_inverted_index_index_file_reader_open_error"
]

def run_test = { tablets, debug_point, abnormal ->
Expand All @@ -221,6 +217,9 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") {
}
}

if (debug_point == "compact_column_delete_tmp_path_error") {
set_be_config.call("inverted_index_ram_dir_enable", "false")
}
// before full compaction, there are 7 rowsets.
int rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 7 * replicaNum)
Expand Down Expand Up @@ -258,6 +257,10 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") {
}

run_sql.call()

if (debug_point == "compact_column_delete_tmp_path_error") {
set_be_config.call("inverted_index_ram_dir_enable", "true")
}
}

def create_and_test_table = { table_name, key_type, debug_points, is_abnormal ->
Expand Down

0 comments on commit 3d83142

Please sign in to comment.