Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement](checker) Enhance inverted checker #43294

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,18 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_rowset_id", {
DCHECK(compaction_type() == ReaderType::READER_FULL_COMPACTION);
RowsetId id;
id.version = 2;
id.hi = _output_rowset->rowset_meta()->rowset_id().hi + ((int64_t)(1) << 56);
id.mi = _output_rowset->rowset_meta()->rowset_id().mi;
id.lo = _output_rowset->rowset_meta()->rowset_id().lo;
_output_rowset->rowset_meta()->set_rowset_id(id);
LOG(INFO) << "[Debug wrong rowset id]:"
<< _output_rowset->rowset_meta()->rowset_id().to_string();
})

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

// 4. modify rowsets in memory
Expand Down
3 changes: 3 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,6 @@ BvarStatusWithTag<long> g_bvar_checker_check_cost_s("checker", "check_cost_secon
BvarStatusWithTag<long> g_bvar_checker_enqueue_cost_s("checker", "enqueue_cost_seconds");
BvarStatusWithTag<long> g_bvar_checker_last_success_time_ms("checker", "last_success_time_ms");
BvarStatusWithTag<long> g_bvar_checker_instance_volume("checker", "instance_volume");
BvarStatusWithTag<long> g_bvar_inverted_checker_num_scanned("checker", "num_inverted_scanned");
Yukang-Lian marked this conversation as resolved.
Show resolved Hide resolved
BvarStatusWithTag<long> g_bvar_inverted_checker_num_check_failed("checker",
"num_inverted_check_failed");
2 changes: 2 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,5 @@ extern BvarStatusWithTag<long> g_bvar_checker_check_cost_s;
extern BvarStatusWithTag<long> g_bvar_checker_enqueue_cost_s;
extern BvarStatusWithTag<long> g_bvar_checker_last_success_time_ms;
extern BvarStatusWithTag<long> g_bvar_checker_instance_volume;
extern BvarStatusWithTag<long> g_bvar_inverted_checker_num_scanned;
extern BvarStatusWithTag<long> g_bvar_inverted_checker_num_check_failed;
70 changes: 48 additions & 22 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,20 @@ int Checker::start() {
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
ret = checker->do_check();

if (config::enable_inverted_check) {
if (checker->do_inverted_check() != 0) ret = -1;
if (ret == 0) {
ret = checker->do_inverted_check();
}
}

if (ret < 0) {
// If ret < 0, it means that a temporary error occurred during the check process.
// The check job should not be considered finished, and the next round of check job
// should be retried as soon as possible.
return;
}
if (ret == -1) return;

// If instance checker has been aborted, don't finish this job
if (!checker->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
Expand Down Expand Up @@ -446,9 +456,10 @@ int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance
int InstanceChecker::do_check() {
TEST_SYNC_POINT("InstanceChecker.do_check");
LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
int check_ret = 0;
long num_scanned = 0;
long num_scanned_with_segment = 0;
long num_check_failed = 0;
long num_rowset_loss = 0;
long instance_volume = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
Expand All @@ -457,11 +468,11 @@ int InstanceChecker::do_check() {
LOG(INFO) << "check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
<< " num_scanned_with_segment=" << num_scanned_with_segment
<< " num_check_failed=" << num_check_failed
<< " num_rowset_loss=" << num_rowset_loss
<< " instance_volume=" << instance_volume;
g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
g_bvar_checker_num_check_failed.put(instance_id_, num_check_failed);
g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
// FIXME(plat1ko): What if some list operation failed?
g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
Expand Down Expand Up @@ -492,15 +503,15 @@ int InstanceChecker::do_check() {
.tag("resource_id", rs_meta.resource_id())
.tag("tablet_id", rs_meta.tablet_id())
.tag("rowset_id", rs_meta.rowset_id_v2());
++num_check_failed;
check_ret = -1;
return;
}

std::unique_ptr<ListIterator> list_iter;
int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
&list_iter);
if (ret != 0) { // No need to log, because S3Accessor has logged this error
++num_check_failed;
check_ret = -1;
return;
}

Expand All @@ -512,6 +523,7 @@ int InstanceChecker::do_check() {
instance_volume += tablet_volume;
}

bool data_loss = false;
for (int i = 0; i < rs_meta.num_segments(); ++i) {
auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
if (tablet_files_cache.files.contains(path)) {
Expand All @@ -520,13 +532,16 @@ int InstanceChecker::do_check() {

if (1 == key_exist(txn_kv_.get(), key)) {
// Rowset has been deleted instead of data loss
continue;
break;
}

++num_check_failed;
data_loss = true;
TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
LOG(WARNING) << "object not exist, path=" << path << " key=" << hex(key);
}

if (data_loss) {
++num_rowset_loss;
}
};

// scan visible rowsets
Expand Down Expand Up @@ -555,15 +570,16 @@ int InstanceChecker::do_check() {

doris::RowsetMetaCloudPB rs_meta;
if (!rs_meta.ParseFromArray(v.data(), v.size())) {
++num_check_failed;
++num_rowset_loss;
LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
continue;
}
check_rowset_objects(rs_meta, k);
}
start_key.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more() && !stopped());
return num_check_failed == 0 ? 0 : -2;

return num_rowset_loss > 0 ? 1 : check_ret;
}

int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
Expand Down Expand Up @@ -601,15 +617,18 @@ int InstanceChecker::do_inverted_check() {
}

LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
int check_ret = 0;
long num_scanned = 0;
long num_check_failed = 0;
long num_file_leak = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_file_leak);
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG(INFO) << "inverted check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
<< " num_check_failed=" << num_check_failed;
<< " num_file_leak=" << num_file_leak;
});

struct TabletRowsets {
Expand All @@ -618,6 +637,7 @@ int InstanceChecker::do_inverted_check() {
};
TabletRowsets tablet_rowsets_cache;

// Return 0 if check success, return 1 if file is garbage data, negative if error occurred
auto check_segment_file = [&](const std::string& obj_key) {
std::vector<std::string> str;
butil::SplitString(obj_key, '/', &str);
Expand Down Expand Up @@ -685,12 +705,13 @@ int InstanceChecker::do_inverted_check() {
}
}
} while (it->more() && !stopped());
if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
return 0;
} else {
LOG(WARNING) << "rowset not exists, key=" << obj_key;
return -1;

if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
// Garbage data leak
LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
return 1;
}

return 0;
};

Expand All @@ -707,10 +728,15 @@ int InstanceChecker::do_inverted_check() {

for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
++num_scanned;
if (check_segment_file(file->path) != 0) {
int ret = check_segment_file(file->path);
if (ret != 0) {
LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
<< " path=" << file->path;
++num_check_failed;
if (ret == 1) {
++num_file_leak;
} else {
check_ret = -1;
}
}
}

Expand All @@ -719,7 +745,7 @@ int InstanceChecker::do_inverted_check() {
return -1;
}
}
return num_check_failed == 0 ? 0 : -1;
return num_file_leak > 0 ? 1 : check_ret;
}

} // namespace doris::cloud
13 changes: 7 additions & 6 deletions cloud/src/recycler/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ class InstanceChecker {
explicit InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id);
// Return 0 if success, otherwise error
int init(const InstanceInfoPB& instance);
// Check whether the objects in the object store of the instance belong to the visible rowsets.
// This function is used to verify that there is no garbage data leakage, should only be called in recycler test.
// Return 0 if success, otherwise failed
// Return 0 if success.
// Return 1 if data leak is identified.
// Return negative if a temporary error occurred during the check process.
int do_inverted_check();
// Return 0 if success, the definition of success is the absence of S3 access errors and data loss
// Return -1 if encountering the situation that need to abort checker.
// Return -2 if having S3 access errors or data loss

// Return 0 if success.
// Return 1 if data loss is identified.
// Return negative if a temporary error occurred during the check process.
int do_check();
// If there are multiple buckets, return the minimum lifecycle; if there are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
Expand Down
Loading