Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test](index compaction)Add index compaction exception fault injection cases #45127

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 114 additions & 29 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,35 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
Status Compaction::do_inverted_index_compaction() {
qidaye marked this conversation as resolved.
Show resolved Hide resolved
const auto& ctx = _output_rs_writer->context();
if (!config::inverted_index_compaction_enable || _input_row_num <= 0 ||
!_stats.rowid_conversion || ctx.columns_to_do_index_compaction.empty()) {
ctx.columns_to_do_index_compaction.empty()) {
return Status::OK();
}

auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) {
LOG(WARNING) << "failed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id
<< ". index_id=" << index_id;
for (auto& rowset : _input_rowsets) {
rowset->set_skip_index_compaction(column_uniq_id);
LOG(INFO) << "mark skipping inverted index compaction next time"
<< ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id()
<< ", column uniq id=" << column_uniq_id << ", index_id=" << index_id;
}
};

DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_rowid_conversion_null",
{ _stats.rowid_conversion = nullptr; })
if (!_stats.rowid_conversion) {
LOG(WARNING) << "failed to do index compaction, rowid conversion is null"
<< ". tablet=" << _tablet->tablet_id()
<< ", input row number=" << _input_row_num;
mark_skip_index_compaction(ctx, error_handler);

return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"failed to do index compaction, rowid conversion is null. tablet={}",
_tablet->tablet_id());
}

OlapStopWatch inverted_watch;

// translation vec
Expand All @@ -521,8 +546,7 @@ Status Compaction::do_inverted_index_compaction() {
auto src_segment_num = src_seg_to_id_map.size();
auto dest_segment_num = dest_segment_num_rows.size();

DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_dest_segment_num_is_zero",
{ dest_segment_num = 0; })
// when all the input rowsets are deleted, the output rowset will be empty and dest_segment_num will be 0.
if (dest_segment_num <= 0) {
LOG(INFO) << "skip doing index compaction due to no output segments"
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
Expand Down Expand Up @@ -600,35 +624,83 @@ Status Compaction::do_inverted_index_compaction() {
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_find_rowset_error",
{ find_it = rs_id_to_rowset_map.end(); })
if (find_it == rs_id_to_rowset_map.end()) [[unlikely]] {
// DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id;
return Status::InternalError("cannot find rowset. tablet_id={} rowset_id={}",
_tablet->tablet_id(), rowset_id.to_string());
LOG(WARNING) << "failed to do index compaction, cannot find rowset. tablet_id="
<< _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string();
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"failed to do index compaction, cannot find rowset. tablet_id={} rowset_id={}",
_tablet->tablet_id(), rowset_id.to_string());
}

auto* rowset = find_it->second;
auto fs = rowset->rowset_meta()->fs();
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_get_fs_error", { fs = nullptr; })
if (!fs) {
return Status::InternalError("get fs failed, resource_id={}",
rowset->rowset_meta()->resource_id());
LOG(WARNING) << "failed to do index compaction, get fs failed. resource_id="
<< rowset->rowset_meta()->resource_id();
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"get fs failed, resource_id={}", rowset->rowset_meta()->resource_id());
}

auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
auto seg_path = rowset->segment_path(seg_id);
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_seg_path_nullptr", {
seg_path = ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>(
"do_inverted_index_compaction_seg_path_nullptr"));
})
if (!seg_path.has_value()) {
LOG(WARNING) << "failed to do index compaction, get segment path failed. tablet_id="
<< _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string()
<< " seg_id=" << seg_id;
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"get segment path failed. tablet_id={} rowset_id={} seg_id={}",
_tablet->tablet_id(), rowset_id.to_string(), seg_id);
}
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
fs,
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value())},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(seg_id));
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_reader->init(config::inverted_index_read_buffer_size),
"inverted_index_file_reader init faiqled");
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_init_inverted_index_file_reader",
{
st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: "
"Compaction::do_inverted_index_compaction_init_inverted_index_"
"file_reader error");
})
if (!st.ok()) {
LOG(WARNING) << "failed to do index compaction, init inverted index file reader "
"failed. tablet_id="
<< _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string()
<< " seg_id=" << seg_id;
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"init inverted index file reader failed. tablet_id={} rowset_id={} seg_id={}",
_tablet->tablet_id(), rowset_id.to_string(), seg_id);
}
inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader);
}

// dest index files
// format: rowsetId_segmentId
auto& inverted_index_file_writers = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get())
->inverted_index_file_writers();
DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num);
DBUG_EXECUTE_IF(
"Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error",
{ inverted_index_file_writers.clear(); })
if (inverted_index_file_writers.size() != dest_segment_num) {
LOG(WARNING) << "failed to do index compaction, dest segment num not match. tablet_id="
<< _tablet->tablet_id() << " dest_segment_num=" << dest_segment_num
<< " inverted_index_file_writers.size()="
<< inverted_index_file_writers.size();
mark_skip_index_compaction(ctx, error_handler);
return Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"dest segment num not match. tablet_id={} dest_segment_num={} "
"inverted_index_file_writers.size()={}",
_tablet->tablet_id(), dest_segment_num, inverted_index_file_writers.size());
}

// use tmp file dir to store index files
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
Expand All @@ -637,18 +709,6 @@ Status Compaction::do_inverted_index_compaction() {
<< ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num
<< ", destination index size=" << dest_segment_num << ".";

auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) {
LOG(WARNING) << "failed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id
<< ". index_id=" << index_id;
for (auto& rowset : _input_rowsets) {
rowset->set_skip_index_compaction(column_uniq_id);
LOG(INFO) << "mark skipping inverted index compaction next time"
<< ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id()
<< ", column uniq id=" << column_uniq_id << ", index_id=" << index_id;
}
};

Status status = Status::OK();
for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
Expand All @@ -658,6 +718,10 @@ Status Compaction::do_inverted_index_compaction() {
if (index_meta == nullptr) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
fmt::format("Can not find index_meta for col {}", col.name()));
LOG(WARNING) << "failed to do index compaction, can not find index_meta for column"
<< ". tablet=" << _tablet->tablet_id()
<< ", column uniq id=" << column_uniq_id;
error_handler(-1, column_uniq_id);
break;
}

Expand All @@ -671,6 +735,11 @@ Status Compaction::do_inverted_index_compaction() {
"debug point: Compaction::open_index_file_reader error"));
})
if (!res.has_value()) {
LOG(WARNING) << "failed to do index compaction, open inverted index file "
"reader failed"
<< ". tablet=" << _tablet->tablet_id()
<< ", column uniq id=" << column_uniq_id
<< ", src_segment_id=" << src_segment_id;
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
src_idx_dirs[src_segment_id] = std::move(res.value());
Expand All @@ -682,6 +751,11 @@ Status Compaction::do_inverted_index_compaction() {
"debug point: Compaction::open_inverted_index_file_writer error"));
})
if (!res.has_value()) {
LOG(WARNING) << "failed to do index compaction, open inverted index file "
"writer failed"
<< ". tablet=" << _tablet->tablet_id()
<< ", column uniq id=" << column_uniq_id
<< ", dest_segment_id=" << dest_segment_id;
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
// Destination directories in dest_index_dirs do not need to be deconstructed,
Expand Down Expand Up @@ -714,6 +788,16 @@ Status Compaction::do_inverted_index_compaction() {
return Status::OK();
}

void Compaction::mark_skip_index_compaction(
const RowsetWriterContext& context,
const std::function<void(int64_t, int64_t)>& error_handler) {
for (auto&& column_uniq_id : context.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->inverted_index(col);
qidaye marked this conversation as resolved.
Show resolved Hide resolved
error_handler(index_meta->index_id(), column_uniq_id);
}
}

void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
for (const auto& index : _cur_tablet_schema->inverted_indexes()) {
auto col_unique_ids = index->col_unique_ids();
Expand Down Expand Up @@ -789,7 +873,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
// TODO: inverted_index_path
auto seg_path = rowset->segment_path(i);
DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_seg_path_nullptr", {
seg_path = ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>("error"));
seg_path = ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>(
"construct_skip_inverted_index_seg_path_nullptr"));
})
if (!seg_path) {
LOG(WARNING) << seg_path.error();
Expand All @@ -800,8 +885,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
try {
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs,
std::string {
InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)},
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(
seg_path.value())},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(i));
auto st = inverted_index_file_reader->init(
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class Compaction {
// merge inverted index files
Status do_inverted_index_compaction();

// mark all columns in columns_to_do_index_compaction to skip index compaction next time.
void mark_skip_index_compaction(const RowsetWriterContext& context,
const std::function<void(int64_t, int64_t)>& error_handler);

void construct_index_compaction_columns(RowsetWriterContext& ctx);

virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) = 0;
Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@ Status compact_column(int64_t index_id,

// delete temporary segment_path, only when inverted_index_ram_dir_enable is false
if (!config::inverted_index_ram_dir_enable) {
std::ignore = io::global_local_filesystem()->delete_directory(tmp_path.data());
auto st = io::global_local_filesystem()->delete_directory(tmp_path.data());
DBUG_EXECUTE_IF("compact_column_delete_tmp_path_error", {
st = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
"debug point: compact_column_delete_tmp_path_error in index compaction");
})
if (!st.ok()) {
LOG(WARNING) << "compact column failed to delete tmp path: " << tmp_path
<< ", error: " << st.to_string();
return st;
}
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,30 +181,26 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") {
"compact_column_create_index_writer_error",
"compact_column_indexCompaction_error",
"compact_column_index_writer_close_error",
"compact_column_src_index_dirs_close_error",
"compact_column_delete_tmp_path_error",
"Compaction::do_inverted_index_compaction_find_rowset_error",
"Compaction::do_inverted_index_compaction_get_fs_error",
"Compaction::do_inverted_index_compaction_index_file_reader_init_error",
// "Compaction::do_inverted_index_compaction_file_size_status_not_ok", // v2 do not do index compaction
"Compaction::do_inverted_index_compaction_can_not_find_index_meta",
"Compaction::do_inverted_index_compaction_index_properties_different",
"Compaction::do_inverted_index_compaction_index_file_writer_close_not_ok",
"Compaction::construct_skip_inverted_index_index_reader_close_error"
"Compaction::do_inverted_index_compaction_rowid_conversion_null",
"Compaction::do_inverted_index_compaction_seg_path_nullptr",
"Compaction::do_inverted_index_compaction_init_inverted_index_file_reader",
"Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error"
]

def debug_points_normal_compaction = [
"compact_column_local_tmp_dir_delete_error",
// "Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", // query result not match without inverted index
"Compaction::do_inverted_index_compaction_index_file_reader_init_not_found",
"Compaction::construct_skip_inverted_index_is_skip_index_compaction",
"Compaction::construct_skip_inverted_index_get_fs_error",
"Compaction::construct_skip_inverted_index_index_meta_nullptr",
"Compaction::construct_skip_inverted_index_seg_path_nullptr",
"Compaction::do_inverted_index_compaction_index_properties_different",
"Compaction::construct_skip_inverted_index_index_files_count",
"Compaction::construct_skip_inverted_index_index_reader_close_error",
"Compaction::construct_skip_inverted_index_index_file_reader_init_status_not_ok",
"Compaction::construct_skip_inverted_index_index_file_reader_exist_status_not_ok",
"Compaction::construct_skip_inverted_index_index_file_reader_exist_false",
"Compaction::construct_skip_inverted_index_index_file_reader_open_error",
"Compaction::construct_skip_inverted_index_index_files_count"
"Compaction::construct_skip_inverted_index_index_file_reader_open_error"
]

def run_test = { tablets, debug_point, abnormal ->
Expand All @@ -221,6 +217,9 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") {
}
}

if (debug_point == "compact_column_delete_tmp_path_error") {
set_be_config.call("inverted_index_ram_dir_enable", "false")
}
// before full compaction, there are 7 rowsets.
int rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 7 * replicaNum)
Expand Down Expand Up @@ -258,6 +257,10 @@ suite("test_index_compaction_exception_fault_injection", "nonConcurrent") {
}

run_sql.call()

if (debug_point == "compact_column_delete_tmp_path_error") {
set_be_config.call("inverted_index_ram_dir_enable", "true")
}
}

def create_and_test_table = { table_name, key_type, debug_points, is_abnormal ->
Expand Down
Loading