Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick](branch-3.0) Pick "[Enhancement](MS) Add fix tablet data size api for meta service (#41782)" #43460

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_compaction_data_size", {
compaction_job->set_size_input_rowsets(1);
compaction_job->set_size_output_rowsets(10000001);
})
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
Expand Down
50 changes: 50 additions & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2188,4 +2188,54 @@ std::pair<MetaServiceCode, std::string> MetaServiceImpl::get_instance_info(
return {code, std::move(msg)};
}

std::pair<std::string, std::string> init_key_pair(std::string instance_id, int64_t table_id) {
std::string begin_key = stats_tablet_key({instance_id, table_id, 0, 0, 0});
std::string end_key = stats_tablet_key({instance_id, table_id + 1, 0, 0, 0});
return std::make_pair(begin_key, end_key);
}

MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string cloud_unique_id_str,
std::string table_id_str) {
// parse params
int64_t table_id;
std::string instance_id;
MetaServiceResponseStatus st = parse_fix_tablet_stats_param(
resource_mgr_, table_id_str, cloud_unique_id_str, table_id, instance_id);
if (st.code() != MetaServiceCode::OK) {
return st;
}

std::pair<std::string, std::string> key_pair = init_key_pair(instance_id, table_id);
std::string old_begin_key;
while (old_begin_key < key_pair.first) {
// get tablet stats
std::vector<std::shared_ptr<TabletStatsPB>> tablet_stat_shared_ptr_vec_batch;
old_begin_key = key_pair.first;

// fix tablet stats
size_t retry = 0;
do {
st = fix_tablet_stats_internal(txn_kv_, key_pair, tablet_stat_shared_ptr_vec_batch,
instance_id);
if (st.code() != MetaServiceCode::OK) {
LOG_WARNING("failed to fix tablet stats")
.tag("err", st.msg())
.tag("table id", table_id)
.tag("retry time", retry);
}
retry++;
} while (st.code() != MetaServiceCode::OK && retry < 3);
if (st.code() != MetaServiceCode::OK) {
return st;
}

// Check tablet stats
st = check_new_tablet_stats(txn_kv_, instance_id, tablet_stat_shared_ptr_vec_batch);
if (st.code() != MetaServiceCode::OK) {
return st;
}
}
return st;
}

} // namespace doris::cloud
7 changes: 7 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class Transaction;

constexpr std::string_view BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault";

void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,
const std::string& instance_id, int64_t tablet_id, MetaServiceCode& code,
std::string& msg, GetRowsetResponse* response);

class MetaServiceImpl : public cloud::MetaService {
public:
MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, std::shared_ptr<ResourceManager> resource_mgr,
Expand Down Expand Up @@ -298,6 +302,9 @@ class MetaServiceImpl : public cloud::MetaService {
const std::string& cloud_unique_id,
InstanceInfoPB* instance);

MetaServiceResponseStatus fix_tablet_stats(std::string cloud_unique_id_str,
std::string table_id_str);

private:
std::pair<MetaServiceCode, std::string> alter_instance(
const AlterInstanceRequest* request,
Expand Down
13 changes: 13 additions & 0 deletions cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ static HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Con
return http_text_reply(resp.status(), body);
}

static HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id");
std::string_view table_id = http_query(uri, "table_id");

MetaServiceResponseStatus st =
service->fix_tablet_stats(std::string(cloud_unique_id), std::string(table_id));
return http_text_reply(st, st.DebugString());
}

static HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) {
GetStageRequest req;
PARSE_MESSAGE_OR_RETURN(ctrl, req);
Expand Down Expand Up @@ -575,13 +585,16 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"get_value", process_get_value},
{"show_meta_ranges", process_show_meta_ranges},
{"txn_lazy_commit", process_txn_lazy_commit},
{"fix_tablet_stats", process_fix_tablet_stats},
{"v1/decode_key", process_decode_key},
{"v1/encode_key", process_encode_key},
{"v1/get_value", process_get_value},
{"v1/show_meta_ranges", process_show_meta_ranges},
{"v1/txn_lazy_commit", process_txn_lazy_commit},
// for get
{"get_instance", process_get_instance_info},
// for get
{"get_instance", process_get_instance_info},
{"get_obj_store_info", process_get_obj_store_info},
{"get_cluster", process_get_cluster},
{"get_tablet_stats", process_get_tablet_stats},
Expand Down
245 changes: 245 additions & 0 deletions cloud/src/meta-service/meta_service_tablet_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

#include "meta-service/meta_service_tablet_stats.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'meta-service/meta_service_tablet_stats.h' file not found [clang-diagnostic-error]

#include "meta-service/meta_service_tablet_stats.h"
         ^


#include <fmt/core.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>

#include "common/logging.h"
#include "common/util.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"

namespace doris::cloud {

Expand Down Expand Up @@ -156,4 +165,240 @@ void internal_get_tablet_stats(MetaServiceCode& code, std::string& msg, Transact
merge_tablet_stats(stats, detached_stats);
}

MetaServiceResponseStatus parse_fix_tablet_stats_param(
std::shared_ptr<ResourceManager> resource_mgr, const std::string& table_id_str,
const std::string& cloud_unique_id_str, int64_t& table_id, std::string& instance_id) {
MetaServiceCode code = MetaServiceCode::OK;
std::string msg;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);

// parse params
try {
table_id = std::stoll(table_id_str);
} catch (...) {
st.set_code(MetaServiceCode::INVALID_ARGUMENT);
st.set_msg("Invalid table_id, table_id: " + table_id_str);
return st;
}

instance_id = get_instance_id(resource_mgr, cloud_unique_id_str);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id_str;
st.set_code(code);
st.set_msg(msg);
return st;
}
return st;
}

MetaServiceResponseStatus fix_tablet_stats_internal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'fix_tablet_stats_internal' exceeds recommended size/complexity thresholds [readability-function-size]

MetaServiceResponseStatus fix_tablet_stats_internal(
                          ^
Additional context

cloud/src/meta-service/meta_service_tablet_stats.cpp:196: 125 lines including whitespace and comments (threshold 80)

MetaServiceResponseStatus fix_tablet_stats_internal(
                          ^

std::shared_ptr<TxnKv> txn_kv, std::pair<std::string, std::string>& key_pair,
std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec_batch,
const std::string& instance_id, size_t batch_size) {
std::unique_ptr<Transaction> txn;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);
MetaServiceCode code = MetaServiceCode::OK;
std::unique_ptr<RangeGetIterator> it;
std::vector<std::shared_ptr<TabletStatsPB>> tmp_tablet_stat_vec;

TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::CREATE>(err));
st.set_msg("failed to create txn");
return st;
}

// read tablet stats
err = txn->get(key_pair.first, key_pair.second, &it, true);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::READ>(err));
st.set_msg(fmt::format("failed to get tablet stats, err={} ", err));
return st;
}

size_t tablet_cnt = 0;
while (it->has_next() && tablet_cnt < batch_size) {
auto [k, v] = it->next();
key_pair.first = k;
auto k1 = k;
k1.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&k1, &out);

// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} -> TabletStatsPB
if (out.size() == 7) {
tablet_cnt++;
TabletStatsPB tablet_stat;
tablet_stat.ParseFromArray(v.data(), v.size());
tmp_tablet_stat_vec.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat));
}
}
if (it->has_next()) {
key_pair.first = it->next().first;
}

for (const auto& tablet_stat_ptr : tmp_tablet_stat_vec) {
GetRowsetResponse resp;
std::string msg;
// get rowsets in tablet and accumulate disk size
internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id,
tablet_stat_ptr->idx().tablet_id(), code, msg, &resp);
if (code != MetaServiceCode::OK) {
st.set_code(code);
st.set_msg(msg);
return st;
}
int64_t total_disk_size = 0;
for (const auto& rs_meta : resp.rowset_meta()) {
total_disk_size += rs_meta.total_disk_size();
}

// set new disk size to tabletPB and write it back
TabletStatsPB tablet_stat;
tablet_stat.CopyFrom(*tablet_stat_ptr);
tablet_stat.set_data_size(total_disk_size);
// record tablet stats batch
tablet_stat_shared_ptr_vec_batch.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat));
std::string tablet_stat_key;
std::string tablet_stat_value;
tablet_stat_key = stats_tablet_key(
{instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(),
tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()});
if (!tablet_stat.SerializeToString(&tablet_stat_value)) {
st.set_code(MetaServiceCode::PROTOBUF_SERIALIZE_ERR);
st.set_msg("failed to serialize tablet stat");
return st;
}
txn->put(tablet_stat_key, tablet_stat_value);

// read num segs
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "num_segs" -> int64
std::string tablet_stat_num_segs_key;
stats_tablet_num_segs_key(
{instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(),
tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()},
&tablet_stat_num_segs_key);
int64_t tablet_stat_num_segs = 0;
std::string tablet_stat_num_segs_value(sizeof(tablet_stat_num_segs), '\0');
err = txn->get(tablet_stat_num_segs_key, &tablet_stat_num_segs_value);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
st.set_code(cast_as<ErrCategory::READ>(err));
}
if (tablet_stat_num_segs_value.size() != sizeof(tablet_stat_num_segs)) [[unlikely]] {
LOG(WARNING) << " malformed tablet stats value v.size="
<< tablet_stat_num_segs_value.size()
<< " value=" << hex(tablet_stat_num_segs_value);
}
std::memcpy(&tablet_stat_num_segs, tablet_stat_num_segs_value.data(),
sizeof(tablet_stat_num_segs));
if constexpr (std::endian::native == std::endian::big) {
tablet_stat_num_segs = bswap_64(tablet_stat_num_segs);
}

if (tablet_stat_num_segs > 0) {
// set tablet stats data size = 0
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} "data_size" -> int64
std::string tablet_stat_data_size_key;
stats_tablet_data_size_key(
{instance_id, tablet_stat.idx().table_id(), tablet_stat.idx().index_id(),
tablet_stat.idx().partition_id(), tablet_stat.idx().tablet_id()},
&tablet_stat_data_size_key);
int64_t tablet_stat_data_size = 0;
std::string tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0');
memcpy(tablet_stat_data_size_value.data(), &tablet_stat_data_size,
sizeof(tablet_stat_data_size));
txn->put(tablet_stat_data_size_key, tablet_stat_data_size_value);
}
}

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::COMMIT>(err));
st.set_msg("failed to commit txn");
return st;
}
return st;
}

MetaServiceResponseStatus check_new_tablet_stats(
std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id,
const std::vector<std::shared_ptr<TabletStatsPB>>& tablet_stat_shared_ptr_vec_batch) {
std::unique_ptr<Transaction> txn;
MetaServiceResponseStatus st;
st.set_code(MetaServiceCode::OK);

TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
st.set_code(cast_as<ErrCategory::CREATE>(err));
st.set_msg("failed to create txn");
return st;
}

for (const auto& tablet_stat_ptr : tablet_stat_shared_ptr_vec_batch) {
// check tablet stats
std::string tablet_stat_key;
std::string tablet_stat_value;
tablet_stat_key = stats_tablet_key(
{instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(),
tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()});
err = txn->get(tablet_stat_key, &tablet_stat_value);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
st.set_code(cast_as<ErrCategory::READ>(err));
return st;
}
TabletStatsPB tablet_stat_check;
tablet_stat_check.ParseFromArray(tablet_stat_value.data(), tablet_stat_value.size());
if (tablet_stat_check.DebugString() != tablet_stat_ptr->DebugString() &&
// If anyone data size of tablet_stat_check and tablet_stat_ptr is twice bigger than another,
// we need to rewrite it this tablet_stat.
(tablet_stat_check.data_size() > 2 * tablet_stat_ptr->data_size() ||
tablet_stat_ptr->data_size() > 2 * tablet_stat_check.data_size())) {
LOG_WARNING("[fix tablet stats]:tablet stats check failed")
.tag("tablet stat", tablet_stat_ptr->DebugString())
.tag("check tabelt stat", tablet_stat_check.DebugString());
}

// check data size
std::string tablet_stat_data_size_key;
stats_tablet_data_size_key(
{instance_id, tablet_stat_ptr->idx().table_id(), tablet_stat_ptr->idx().index_id(),
tablet_stat_ptr->idx().partition_id(), tablet_stat_ptr->idx().tablet_id()},
&tablet_stat_data_size_key);
int64_t tablet_stat_data_size = 0;
std::string tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0');
err = txn->get(tablet_stat_data_size_key, &tablet_stat_data_size_value);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
st.set_code(cast_as<ErrCategory::READ>(err));
return st;
}
int64_t tablet_stat_data_size_check;

if (tablet_stat_data_size_value.size() != sizeof(tablet_stat_data_size_check))
[[unlikely]] {
LOG(WARNING) << " malformed tablet stats value v.size="
<< tablet_stat_data_size_value.size()
<< " value=" << hex(tablet_stat_data_size_value);
}
std::memcpy(&tablet_stat_data_size_check, tablet_stat_data_size_value.data(),
sizeof(tablet_stat_data_size_check));
if constexpr (std::endian::native == std::endian::big) {
tablet_stat_data_size_check = bswap_64(tablet_stat_data_size_check);
}
if (tablet_stat_data_size_check != tablet_stat_data_size &&
// ditto
(tablet_stat_data_size_check > 2 * tablet_stat_data_size ||
tablet_stat_data_size > 2 * tablet_stat_data_size_check)) {
LOG_WARNING("[fix tablet stats]:data size check failed")
.tag("data size", tablet_stat_data_size)
.tag("check data size", tablet_stat_data_size_check);
}
}

return st;
}

} // namespace doris::cloud
Loading
Loading