Skip to content

Commit

Permalink
11
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Oct 22, 2024
1 parent 0a873f5 commit 61a13a3
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 13 deletions.
28 changes: 27 additions & 1 deletion cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,33 @@ MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string cloud_un
}

// step 3: check new data
st = check_tablet_stats_data(tablet_stat_shared_ptr_vec, txn_kv_, instance_id, table_id);
std::vector<std::shared_ptr<TabletStatsPB>> conflict_tablet_stat_shared_ptr_vec;
st = check_tablet_stats_data(tablet_stat_shared_ptr_vec, txn_kv_, instance_id, table_id,
conflict_tablet_stat_shared_ptr_vec);
if (st.code() != MetaServiceCode::OK) {
return st;
}

// step 4: deal with unchaged tablet stat due to conflict txn
size_t retry = 0;
while (!conflict_tablet_stat_shared_ptr_vec.empty() && retry < 5) {
retry++;
std::ostringstream oss;
std::transform(conflict_tablet_stat_shared_ptr_vec.begin(),
conflict_tablet_stat_shared_ptr_vec.end(),
std::ostream_iterator<std::string>(oss, ","),
[](const std::shared_ptr<TabletStatsPB>& s) {
return std::to_string(s->idx().tablet_id());
});
LOG(INFO) << fmt::format("retry time: {}, conflict tablet stat vec: [{}]", retry,
oss.str());
st = deal_with_conflict(conflict_tablet_stat_shared_ptr_vec, txn_kv_, instance_id,
table_id);
if (st.code() != MetaServiceCode::OK) {
return st;
}
}

st.set_code(MetaServiceCode::OK);
st.set_msg("");
return st;
Expand Down
101 changes: 89 additions & 12 deletions cloud/src/meta-service/meta_service_tablet_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,17 +458,13 @@ MetaServiceResponseStatus fix_tablet_stats_data(
}
begin_key = it->next_begin_key();
} while (it->more());
st = commit_batch_final(txn, committed, tablet_cnt);
if (st.code() != MetaServiceCode::OK) {
return st;
}

return st;
return commit_batch_final(txn, committed, tablet_cnt);
}

MetaServiceResponseStatus check_tablet_stats(std::shared_ptr<TabletStatsPB> tablet_stat_ptr,
std::unique_ptr<Transaction>& txn,
const std::string& instance_id, int64_t tablet_cnt) {
MetaServiceResponseStatus check_tablet_stats(
std::shared_ptr<TabletStatsPB> tablet_stat_ptr, std::unique_ptr<Transaction>& txn,
const std::string& instance_id, int64_t tablet_cnt,
std::vector<std::shared_ptr<TabletStatsPB>>& conflict_tablet_stat_shared_ptr_vec) {
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);
TxnErrorCode err;
Expand All @@ -493,6 +489,7 @@ MetaServiceResponseStatus check_tablet_stats(std::shared_ptr<TabletStatsPB> tabl
"get tabletPB {}",
tablet_cnt, tablet_stat_ptr->idx().tablet_id(), tablet_stat_check.DebugString());
if (tablet_stat_check.DebugString() != tablet_stat_ptr->DebugString()) {
conflict_tablet_stat_shared_ptr_vec.emplace_back(tablet_stat_ptr);
LOG(WARNING) << fmt::format(
"[Sub txn id {} Tablet id {} fix tablet stats phase 5]: check "
"correctness get tabletPB failed, tablet_stat {}, tablet_stat_check {}",
Expand Down Expand Up @@ -555,7 +552,8 @@ MetaServiceResponseStatus check_data_size(std::shared_ptr<TabletStatsPB> tablet_

MetaServiceResponseStatus check_tablet_stats_data(
std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec,
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, int64_t table_id) {
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, int64_t table_id,
std::vector<std::shared_ptr<TabletStatsPB>>& conflict_tablet_stat_shared_ptr_vec) {
std::string msg;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);
Expand All @@ -574,7 +572,8 @@ MetaServiceResponseStatus check_tablet_stats_data(
}
// =======================================================
// phase 5: get tabletPB to check correctness
st = check_tablet_stats(tablet_stat_ptr, txn, instance_id, tablet_cnt);
st = check_tablet_stats(tablet_stat_ptr, txn, instance_id, tablet_cnt,
conflict_tablet_stat_shared_ptr_vec);
if (st.code() != MetaServiceCode::OK) {
return st;
}
Expand All @@ -592,11 +591,89 @@ MetaServiceResponseStatus check_tablet_stats_data(
}
}

return commit_batch_final(txn, committed, tablet_cnt);
}

MetaServiceResponseStatus deal_with_conflict(
std::vector<std::shared_ptr<TabletStatsPB>>& conflict_tablet_stat_shared_ptr_vec,
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, int64_t table_id) {
std::string msg;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);
bool committed = true;
int64_t tablet_cnt = 0;

std::unique_ptr<Transaction> txn;
std::vector<std::shared_ptr<TabletStatsPB>> tablet_stat_shared_ptr_vec;

for (const auto& conflict_ptr : conflict_tablet_stat_shared_ptr_vec) {
tablet_cnt++;
auto err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::CREATE>(err));
st.set_msg(msg);
return st;
}
std::string tablet_stat_key;
std::string tablet_stat_value;
tablet_stat_key = stats_tablet_key(
{instance_id, conflict_ptr->idx().table_id(), conflict_ptr->idx().index_id(),
conflict_ptr->idx().partition_id(), conflict_ptr->idx().tablet_id()});
if (!conflict_ptr->SerializeToString(&tablet_stat_value)) {
st.set_code(MetaServiceCode::PROTOBUF_SERIALIZE_ERR);
st.set_msg("failed to serialize tablet stat");
return st;
}
txn->put(tablet_stat_key, tablet_stat_value);
st = commit_batch(txn, committed, tablet_cnt);
if (st.code() != MetaServiceCode::OK) {
return st;
}
tablet_stat_shared_ptr_vec.emplace_back(conflict_ptr);
}
st = commit_batch_final(txn, committed, tablet_cnt);
if (st.code() != MetaServiceCode::OK) {
return st;
}
return st;

conflict_tablet_stat_shared_ptr_vec.clear();

tablet_cnt = 0;
for (const auto& tablet_stat_ptr : tablet_stat_shared_ptr_vec) {
std::string tablet_stat_key;
std::string tablet_stat_value;
tablet_stat_key = stats_tablet_key(
{instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(),
tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()});
auto err = txn->get(tablet_stat_key, &tablet_stat_value);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::READ>(err));

LOG(INFO) << fmt::format(
"[Tablet id {} deal with conflict]: get tablet "
"stats failed, err {}",
tablet_stat_ptr->idx().tablet_id(), err);
}
TabletStatsPB tablet_stat_check;
tablet_stat_check.ParseFromArray(tablet_stat_value.data(), tablet_stat_value.size());
LOG(INFO) << fmt::format(
"[Sub txn id {} Tablet id {} deal with conflict]: check correctness "
"get tabletPB {}",
tablet_stat_ptr->idx().tablet_id(), tablet_stat_check.DebugString());
if (tablet_stat_check.DebugString() != tablet_stat_ptr->DebugString()) {
conflict_tablet_stat_shared_ptr_vec.emplace_back(tablet_stat_ptr);
LOG(WARNING) << fmt::format(
"[Tablet id {} deal with conflict]: check "
"correctness get tabletPB failed, tablet_stat {}, tablet_stat_check {}",
tablet_stat_ptr->idx().tablet_id(), tablet_stat_ptr->DebugString(),
tablet_stat_check.DebugString());
}
st = commit_batch(txn, committed, tablet_cnt);
if (st.code() != MetaServiceCode::OK) {
return st;
}
}
return commit_batch_final(txn, committed, tablet_cnt);
}

} // namespace doris::cloud
5 changes: 5 additions & 0 deletions cloud/src/meta-service/meta_service_tablet_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ MetaServiceResponseStatus fix_tablet_stats_data(

MetaServiceResponseStatus check_tablet_stats_data(
std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec,
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, int64_t table_id,
std::vector<std::shared_ptr<TabletStatsPB>>& conflict_tablet_stat_shared_ptr_vec);

MetaServiceResponseStatus deal_with_conflict(
std::vector<std::shared_ptr<TabletStatsPB>>& conflict_tablet_stat_shared_ptr_vec,
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, int64_t table_id);

} // namespace doris::cloud

0 comments on commit 61a13a3

Please sign in to comment.