Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Nov 13, 2024
1 parent 7b2547c commit 5232cea
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 28 deletions.
12 changes: 12 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,18 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_rowset_id", {
DCHECK(compaction_type() == ReaderType::READER_FULL_COMPACTION);
RowsetId id;
id.version = 2;
id.hi = _output_rowset->rowset_meta()->rowset_id().hi + ((int64_t)(1) << 56);
id.mi = _output_rowset->rowset_meta()->rowset_id().mi;
id.lo = _output_rowset->rowset_meta()->rowset_id().lo;
_output_rowset->rowset_meta()->set_rowset_id(id);
LOG(INFO) << "[Debug wrong rowset id]:"
<< _output_rowset->rowset_meta()->rowset_id().to_string();
})

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

// 4. modify rowsets in memory
Expand Down
3 changes: 3 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,6 @@ BvarStatusWithTag<long> g_bvar_checker_check_cost_s("checker", "check_cost_secon
BvarStatusWithTag<long> g_bvar_checker_enqueue_cost_s("checker", "enqueue_cost_seconds");
BvarStatusWithTag<long> g_bvar_checker_last_success_time_ms("checker", "last_success_time_ms");
BvarStatusWithTag<long> g_bvar_checker_instance_volume("checker", "instance_volume");
BvarStatusWithTag<long> g_bvar_inverted_checker_num_scanned("checker", "num_inverted_scanned");
BvarStatusWithTag<long> g_bvar_inverted_checker_num_check_failed("checker",
"num_inverted_check_failed");
2 changes: 2 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,5 @@ extern BvarStatusWithTag<long> g_bvar_checker_check_cost_s;
extern BvarStatusWithTag<long> g_bvar_checker_enqueue_cost_s;
extern BvarStatusWithTag<long> g_bvar_checker_last_success_time_ms;
extern BvarStatusWithTag<long> g_bvar_checker_instance_volume;
extern BvarStatusWithTag<long> g_bvar_inverted_checker_num_scanned;
extern BvarStatusWithTag<long> g_bvar_inverted_checker_num_check_failed;
70 changes: 48 additions & 22 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,20 @@ int Checker::start() {
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
ret = checker->do_check();

if (config::enable_inverted_check) {
if (checker->do_inverted_check() != 0) ret = -1;
if (ret == 0) {
ret = checker->do_inverted_check();
}
}

if (ret < 0) {
// If ret < 0, it means that a temporary error occurred during the check process.
// The check job should not be considered finished, and the next round of check job
// should be retried as soon as possible.
return;
}
if (ret == -1) return;

// If instance checker has been aborted, don't finish this job
if (!checker->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
Expand Down Expand Up @@ -446,9 +456,10 @@ int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance
int InstanceChecker::do_check() {
TEST_SYNC_POINT("InstanceChecker.do_check");
LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
int check_ret = 0;
long num_scanned = 0;
long num_scanned_with_segment = 0;
long num_check_failed = 0;
long num_rowset_loss = 0;
long instance_volume = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
Expand All @@ -457,11 +468,11 @@ int InstanceChecker::do_check() {
LOG(INFO) << "check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
<< " num_scanned_with_segment=" << num_scanned_with_segment
<< " num_check_failed=" << num_check_failed
<< " num_rowset_loss=" << num_rowset_loss
<< " instance_volume=" << instance_volume;
g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
g_bvar_checker_num_check_failed.put(instance_id_, num_check_failed);
g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
// FIXME(plat1ko): What if some list operation failed?
g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
Expand Down Expand Up @@ -492,15 +503,15 @@ int InstanceChecker::do_check() {
.tag("resource_id", rs_meta.resource_id())
.tag("tablet_id", rs_meta.tablet_id())
.tag("rowset_id", rs_meta.rowset_id_v2());
++num_check_failed;
check_ret = -1;
return;
}

std::unique_ptr<ListIterator> list_iter;
int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
&list_iter);
if (ret != 0) { // No need to log, because S3Accessor has logged this error
++num_check_failed;
check_ret = -1;
return;
}

Expand All @@ -512,6 +523,7 @@ int InstanceChecker::do_check() {
instance_volume += tablet_volume;
}

bool data_loss = false;
for (int i = 0; i < rs_meta.num_segments(); ++i) {
auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
if (tablet_files_cache.files.contains(path)) {
Expand All @@ -520,13 +532,16 @@ int InstanceChecker::do_check() {

if (1 == key_exist(txn_kv_.get(), key)) {
// Rowset has been deleted instead of data loss
continue;
break;
}

++num_check_failed;
data_loss = true;
TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
LOG(WARNING) << "object not exist, path=" << path << " key=" << hex(key);
}

if (data_loss) {
++num_rowset_loss;
}
};

// scan visible rowsets
Expand Down Expand Up @@ -555,15 +570,16 @@ int InstanceChecker::do_check() {

doris::RowsetMetaCloudPB rs_meta;
if (!rs_meta.ParseFromArray(v.data(), v.size())) {
++num_check_failed;
++num_rowset_loss;
LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
continue;
}
check_rowset_objects(rs_meta, k);
}
start_key.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more() && !stopped());
return num_check_failed == 0 ? 0 : -2;

return num_rowset_loss > 0 ? 1 : check_ret;
}

int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
Expand Down Expand Up @@ -601,15 +617,18 @@ int InstanceChecker::do_inverted_check() {
}

LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
int check_ret = 0;
long num_scanned = 0;
long num_check_failed = 0;
long num_file_leak = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_check_failed);
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG(INFO) << "inverted check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
<< " num_check_failed=" << num_check_failed;
<< " num_file_leak=" << num_file_leak;
});

struct TabletRowsets {
Expand All @@ -618,6 +637,7 @@ int InstanceChecker::do_inverted_check() {
};
TabletRowsets tablet_rowsets_cache;

// Return 0 if check success, return 1 if file is garbage data, negative if error occurred
auto check_segment_file = [&](const std::string& obj_key) {
std::vector<std::string> str;
butil::SplitString(obj_key, '/', &str);
Expand Down Expand Up @@ -685,12 +705,13 @@ int InstanceChecker::do_inverted_check() {
}
}
} while (it->more() && !stopped());
if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
return 0;
} else {
LOG(WARNING) << "rowset not exists, key=" << obj_key;
return -1;

if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
// Garbage data leak
LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
return 1;
}

return 0;
};

Expand All @@ -707,10 +728,15 @@ int InstanceChecker::do_inverted_check() {

for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
++num_scanned;
if (check_segment_file(file->path) != 0) {
int ret = check_segment_file(file->path);
if (ret != 0) {
LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
<< " path=" << file->path;
++num_check_failed;
if (ret == 1) {
++num_file_leak;
} else {
check_ret = -1;
}
}
}

Expand All @@ -719,7 +745,7 @@ int InstanceChecker::do_inverted_check() {
return -1;
}
}
return num_check_failed == 0 ? 0 : -1;
return num_file_leak > 0 ? 1 : check_ret;
}

} // namespace doris::cloud
13 changes: 7 additions & 6 deletions cloud/src/recycler/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ class InstanceChecker {
explicit InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id);
// Return 0 if success, otherwise error
int init(const InstanceInfoPB& instance);
// Check whether the objects in the object store of the instance belong to the visible rowsets.
// This function is used to verify that there is no garbage data leakage, should only be called in recycler test.
// Return 0 if success, otherwise failed
// Return 0 if success.
// Return 1 if data leak is identified.
// Return negative if a temporary error occurred during the check process.
int do_inverted_check();
// Return 0 if success, the definition of success is the absence of S3 access errors and data loss
// Return -1 if encountering the situation that need to abort checker.
// Return -2 if having S3 access errors or data loss

// Return 0 if success.
// Return 1 if data loss is identified.
// Return negative if a temporary error occurred during the check process.
int do_check();
// If there are multiple buckets, return the minimum lifecycle; if there are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
Expand Down

0 comments on commit 5232cea

Please sign in to comment.