From 5232cea1536eacd317becc3a2a8098cf2314712c Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Sat, 2 Nov 2024 00:35:14 +0800 Subject: [PATCH] 1 --- be/src/olap/compaction.cpp | 12 ++++++ cloud/src/common/bvars.cpp | 3 ++ cloud/src/common/bvars.h | 2 + cloud/src/recycler/checker.cpp | 70 +++++++++++++++++++++++----------- cloud/src/recycler/checker.h | 13 ++++--- 5 files changed, 72 insertions(+), 28 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index a40e28669e90cc..9b46782f84f1d2 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1127,6 +1127,18 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { RETURN_IF_ERROR(merge_input_rowsets()); + DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_rowset_id", { + DCHECK(compaction_type() == ReaderType::READER_FULL_COMPACTION); + RowsetId id; + id.version = 2; + id.hi = _output_rowset->rowset_meta()->rowset_id().hi + ((int64_t)(1) << 56); + id.mi = _output_rowset->rowset_meta()->rowset_id().mi; + id.lo = _output_rowset->rowset_meta()->rowset_id().lo; + _output_rowset->rowset_meta()->set_rowset_id(id); + LOG(INFO) << "[Debug wrong rowset id]:" + << _output_rowset->rowset_meta()->rowset_id().to_string(); + }) + RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get())); // 4. modify rowsets in memory diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index f9b11aa85b4897..2e2e312d0c1413 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -193,3 +193,6 @@ BvarStatusWithTag g_bvar_checker_check_cost_s("checker", "check_cost_secon BvarStatusWithTag g_bvar_checker_enqueue_cost_s("checker", "enqueue_cost_seconds"); BvarStatusWithTag g_bvar_checker_last_success_time_ms("checker", "last_success_time_ms"); BvarStatusWithTag g_bvar_checker_instance_volume("checker", "instance_volume"); +BvarStatusWithTag g_bvar_inverted_checker_num_scanned("checker", "num_inverted_scanned"); +BvarStatusWithTag g_bvar_inverted_checker_num_check_failed("checker", + "num_inverted_check_failed"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 4848ec4b456cef..c2e21c66daad0d 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -247,3 +247,5 @@ extern BvarStatusWithTag g_bvar_checker_check_cost_s; extern BvarStatusWithTag g_bvar_checker_enqueue_cost_s; extern BvarStatusWithTag g_bvar_checker_last_success_time_ms; extern BvarStatusWithTag g_bvar_checker_instance_volume; +extern BvarStatusWithTag g_bvar_inverted_checker_num_scanned; +extern BvarStatusWithTag g_bvar_inverted_checker_num_check_failed; diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp index c3e9f69ed9d6bc..e2ae2e4646d6e6 100644 --- a/cloud/src/recycler/checker.cpp +++ b/cloud/src/recycler/checker.cpp @@ -169,10 +169,20 @@ int Checker::start() { duration_cast(system_clock::now().time_since_epoch()).count(); g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s); ret = checker->do_check(); + if (config::enable_inverted_check) { - if (checker->do_inverted_check() != 0) ret = -1; + if (ret == 0) { + ret = checker->do_inverted_check(); + } + } + + if (ret < 0) { + // If ret < 0, it means that a temporary error occurred during the check process. + // The check job should not be considered finished, and the next round of check job + // should be retried as soon as possible. + return; } - if (ret == -1) return; + // If instance checker has been aborted, don't finish this job if (!checker->stopped()) { finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(), @@ -446,9 +456,10 @@ int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance int InstanceChecker::do_check() { TEST_SYNC_POINT("InstanceChecker.do_check"); LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_; + int check_ret = 0; long num_scanned = 0; long num_scanned_with_segment = 0; - long num_check_failed = 0; + long num_rowset_loss = 0; long instance_volume = 0; using namespace std::chrono; auto start_time = steady_clock::now(); @@ -457,11 +468,11 @@ int InstanceChecker::do_check() { LOG(INFO) << "check instance objects finished, cost=" << cost << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned << " num_scanned_with_segment=" << num_scanned_with_segment - << " num_check_failed=" << num_check_failed + << " num_rowset_loss=" << num_rowset_loss << " instance_volume=" << instance_volume; g_bvar_checker_num_scanned.put(instance_id_, num_scanned); g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment); - g_bvar_checker_num_check_failed.put(instance_id_, num_check_failed); + g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss); g_bvar_checker_check_cost_s.put(instance_id_, static_cast(cost)); // FIXME(plat1ko): What if some list operation failed? g_bvar_checker_instance_volume.put(instance_id_, instance_volume); @@ -492,7 +503,7 @@ int InstanceChecker::do_check() { .tag("resource_id", rs_meta.resource_id()) .tag("tablet_id", rs_meta.tablet_id()) .tag("rowset_id", rs_meta.rowset_id_v2()); - ++num_check_failed; + check_ret = -1; return; } @@ -500,7 +511,7 @@ int InstanceChecker::do_check() { int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()), &list_iter); if (ret != 0) { // No need to log, because S3Accessor has logged this error - ++num_check_failed; + check_ret = -1; return; } @@ -512,6 +523,7 @@ int InstanceChecker::do_check() { instance_volume += tablet_volume; } + bool data_loss = false; for (int i = 0; i < rs_meta.num_segments(); ++i) { auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i); if (tablet_files_cache.files.contains(path)) { @@ -520,13 +532,16 @@ int InstanceChecker::do_check() { if (1 == key_exist(txn_kv_.get(), key)) { // Rowset has been deleted instead of data loss - continue; + break; } - - ++num_check_failed; + data_loss = true; TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path); LOG(WARNING) << "object not exist, path=" << path << " key=" << hex(key); } + + if (data_loss) { + ++num_rowset_loss; + } }; // scan visible rowsets @@ -555,7 +570,7 @@ int InstanceChecker::do_check() { doris::RowsetMetaCloudPB rs_meta; if (!rs_meta.ParseFromArray(v.data(), v.size())) { - ++num_check_failed; + ++num_rowset_loss; LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v); continue; } @@ -563,7 +578,8 @@ int InstanceChecker::do_check() { } start_key.push_back('\x00'); // Update to next smallest key for iteration } while (it->more() && !stopped()); - return num_check_failed == 0 ? 0 : -2; + + return num_rowset_loss > 0 ? 1 : check_ret; } int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) { @@ -601,15 +617,18 @@ int InstanceChecker::do_inverted_check() { } LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_; + int check_ret = 0; long num_scanned = 0; - long num_check_failed = 0; + long num_file_leak = 0; using namespace std::chrono; auto start_time = steady_clock::now(); std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { + g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned); + g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_check_failed); auto cost = duration(steady_clock::now() - start_time).count(); LOG(INFO) << "inverted check instance objects finished, cost=" << cost << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned - << " num_check_failed=" << num_check_failed; + << " num_file_leak=" << num_file_leak; }); struct TabletRowsets { @@ -618,6 +637,7 @@ int InstanceChecker::do_inverted_check() { }; TabletRowsets tablet_rowsets_cache; + // Return 0 if check success, return 1 if file is garbage data, negative if error occurred auto check_segment_file = [&](const std::string& obj_key) { std::vector str; butil::SplitString(obj_key, '/', &str); @@ -685,12 +705,13 @@ int InstanceChecker::do_inverted_check() { } } } while (it->more() && !stopped()); - if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) { - return 0; - } else { - LOG(WARNING) << "rowset not exists, key=" << obj_key; - return -1; + + if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) { + // Garbage data leak + LOG(WARNING) << "rowset should be recycled, key=" << obj_key; + return 1; } + return 0; }; @@ -707,10 +728,15 @@ int InstanceChecker::do_inverted_check() { for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) { ++num_scanned; - if (check_segment_file(file->path) != 0) { + int ret = check_segment_file(file->path); + if (ret != 0) { LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri() << " path=" << file->path; - ++num_check_failed; + if (ret == 1) { + ++num_file_leak; + } else { + check_ret = -1; + } } } @@ -719,7 +745,7 @@ int InstanceChecker::do_inverted_check() { return -1; } } - return num_check_failed == 0 ? 0 : -1; + return num_file_leak > 0 ? 1 : check_ret; } } // namespace doris::cloud diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h index 4cd851d521847b..03717a69b5ea20 100644 --- a/cloud/src/recycler/checker.h +++ b/cloud/src/recycler/checker.h @@ -77,13 +77,14 @@ class InstanceChecker { explicit InstanceChecker(std::shared_ptr txn_kv, const std::string& instance_id); // Return 0 if success, otherwise error int init(const InstanceInfoPB& instance); - // Check whether the objects in the object store of the instance belong to the visible rowsets. - // This function is used to verify that there is no garbage data leakage, should only be called in recycler test. - // Return 0 if success, otherwise failed + // Return 0 if success. + // Return 1 if data leak is identified. + // Return negative if a temporary error occurred during the check process. int do_inverted_check(); - // Return 0 if success, the definition of success is the absence of S3 access errors and data loss - // Return -1 if encountering the situation that need to abort checker. - // Return -2 if having S3 access errors or data loss + + // Return 0 if success. + // Return 1 if data loss is identified. + // Return negative if a temporary error occurred during the check process. int do_check(); // If there are multiple buckets, return the minimum lifecycle; if there are no buckets (i.e. // all accessors are HdfsAccessor), return INT64_MAX.