Skip to content

Commit

Permalink
[fix](cloud) Check instance_id valid when use cloud_unique_id degrade…
Browse files Browse the repository at this point in the history
… format (apache#43253)
  • Loading branch information
deardeng authored Nov 12, 2024
1 parent 2218e09 commit d05487c
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 29 deletions.
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,6 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000");

// Max aborted txn num for the same label name
CONF_mInt64(max_num_aborted_txn, "100");

CONF_Bool(enable_check_instance_id, "true");
} // namespace doris::cloud::config
56 changes: 32 additions & 24 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,46 +88,54 @@ std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr,
std::vector<NodeInfo> nodes;
std::string err = rc_mgr->get_node(cloud_unique_id, &nodes);
{ TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); }
std::string instance_id;
if (!err.empty()) {
// cache can't find cloud_unique_id, so degraded by parse cloud_unique_id
// cloud_unique_id encode: ${version}:${instance_id}:${unique_id}
// check it split by ':' c
auto vec = split(cloud_unique_id, ':');
std::stringstream ss;
for (int i = 0; i < vec.size(); ++i) {
ss << "idx " << i << "= [" << vec[i] << "] ";
}
LOG(INFO) << "degraded to get instance_id, cloud_unique_id: " << cloud_unique_id
<< "after split: " << ss.str();
if (vec.size() != 3) {
LOG(WARNING) << "cloud unique id is not degraded format, failed to check instance "
"info, cloud_unique_id="
<< cloud_unique_id << " , err=" << err;
auto [valid, id] = ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (!valid) {
LOG(WARNING) << "use degraded format cloud_unique_id, but cloud_unique_id not degrade "
"format, cloud_unique_id="
<< cloud_unique_id;
return "";
}
// version: vec[0], instance_id: vec[1], unique_id: vec[2]
switch (std::atoi(vec[0].c_str())) {
case 1:
// just return instance id;
return vec[1];
default:
LOG(WARNING) << "cloud unique id degraded state, but version not eq configure, "

// check instance_id valid by get fdb
if (config::enable_check_instance_id && !rc_mgr->is_instance_id_registered(id)) {
LOG(WARNING) << "use degraded format cloud_unique_id, but check instance failed, "
"cloud_unique_id="
<< cloud_unique_id << ", err=" << err;
<< cloud_unique_id;
return "";
}
return id;
}

std::string instance_id;
for (auto& i : nodes) {
if (!instance_id.empty() && instance_id != i.instance_id) {
for (auto& node : nodes) {
if (!instance_id.empty() && instance_id != node.instance_id) {
LOG(WARNING) << "cloud_unique_id is one-to-many instance_id, "
<< " cloud_unique_id=" << cloud_unique_id
<< " current_instance_id=" << instance_id
<< " later_instance_id=" << i.instance_id;
<< " later_instance_id=" << node.instance_id;
}
instance_id = node.instance_id; // The last wins
// check cache unique_id
std::string cloud_unique_id_in_cache = node.node_info.cloud_unique_id();
auto [valid, id] =
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id_in_cache);
if (!valid) {
continue;
}

if (id != node.instance_id || id != instance_id) {
LOG(WARNING) << "in cache, node=" << node.node_info.DebugString()
<< ", cloud_unique_id=" << cloud_unique_id
<< " current_instance_id=" << instance_id
<< ", later_instance_id=" << node.instance_id;
continue;
}
instance_id = i.instance_id; // The last wins
}

return instance_id;
}

Expand Down
18 changes: 14 additions & 4 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,16 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = request->has_instance_id() ? request->instance_id() : "";
if (!cloud_unique_id.empty() && instance_id.empty()) {
auto [is_degraded_format, id] =
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (config::enable_check_instance_id && is_degraded_format &&
!resource_mgr_->is_instance_id_registered(id)) {
msg = "use degrade cloud_unique_id, but instance_id invalid, cloud_unique_id=" +
cloud_unique_id;
LOG(WARNING) << msg;
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
Expand Down Expand Up @@ -1992,7 +2002,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::ADD_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
Expand All @@ -2016,7 +2026,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::DROP_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
Expand All @@ -2039,7 +2049,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::DECOMMISSION_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}

Expand Down Expand Up @@ -2101,7 +2111,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::NOTIFY_DECOMMISSIONED: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}

Expand Down
34 changes: 33 additions & 1 deletion cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sstream>

#include "common/logging.h"
#include "common/string_util.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
Expand Down Expand Up @@ -159,6 +160,16 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
int master_num = 0;
int follower_num = 0;
for (auto& n : cluster.nodes()) {
// check here cloud_unique_id
std::string cloud_unique_id = n.cloud_unique_id();
auto [is_degrade_format, instance_id] = get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (config::enable_check_instance_id && is_degrade_format &&
!is_instance_id_registered(instance_id)) {
ss << "node=" << n.DebugString()
<< " cloud_unique_id use degrade format, but check instance failed";
*err = ss.str();
return false;
}
if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() && n.edit_log_port() &&
n.has_node_type() &&
(n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
Expand Down Expand Up @@ -199,6 +210,27 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
return no_err;
}

std::pair<bool, std::string> ResourceManager::get_instance_id_by_cloud_unique_id(
const std::string& cloud_unique_id) {
auto v = split(cloud_unique_id, ':');
if (v.size() != 3) return {false, ""};
// degraded format check it
int version = std::atoi(v[0].c_str());
if (version != 1) return {false, ""};
return {true, v[1]};
}

bool ResourceManager::is_instance_id_registered(const std::string& instance_id) {
// check kv
auto [c0, m0] = get_instance(nullptr, instance_id, nullptr);
{ TEST_SYNC_POINT_CALLBACK("is_instance_id_registered", &c0); }
if (c0 != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to check instance instance_id=" << instance_id
<< ", code=" << format_as(c0) << ", info=" + m0;
}
return c0 == TxnErrorCode::TXN_OK;
}

std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const std::string& instance_id,
const ClusterInfo& cluster) {
std::string msg;
Expand Down Expand Up @@ -624,7 +656,7 @@ std::pair<TxnErrorCode, std::string> ResourceManager::get_instance(std::shared_p
return ec;
}

if (!inst_pb->ParseFromString(val)) {
if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) {
code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
msg = "failed to parse InstanceInfoPB";
return ec;
Expand Down
19 changes: 19 additions & 0 deletions cloud/src/resource-manager/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,25 @@ class ResourceManager {
bool check_cluster_params_valid(const ClusterPB& cluster, std::string* err,
bool check_master_num);

/**
* Check cloud_unique_id is degraded format, and get instance_id from cloud_unique_id
* degraded format : "${version}:${instance_id}:${unique_id}"
* @param degraded cloud_unique_id
*
* @return a <is_degraded_format, instance_id> pair, if is_degraded_format == true , instance_id, if is_degraded_format == false, instance_id=""
*/
static std::pair<bool, std::string> get_instance_id_by_cloud_unique_id(
const std::string& cloud_unique_id);

/**
* check instance_id is a valid instance, check by get fdb kv
*
* @param instance_id
*
* @return true, instance_id in fdb kv
*/
bool is_instance_id_registered(const std::string& instance_id);

/**
* Refreshes the cache of given instance. This process removes the instance in cache
* and then replaces it with persisted instance state read from underlying KV storage.
Expand Down
1 change: 1 addition & 0 deletions cloud/test/fdb_injection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ int main(int argc, char** argv) {
cloud::config::txn_store_retry_base_intervals_ms = 1;
cloud::config::fdb_cluster_file_path = "fdb.cluster";
cloud::config::write_schema_kv = true;
cloud::config::enable_check_instance_id = false;

auto sp = SyncPoint::get_instance();
sp->enable_processing();
Expand Down
16 changes: 16 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ int main(int argc, char** argv) {
config::enable_txn_store_retry = true;
config::txn_store_retry_base_intervals_ms = 1;
config::txn_store_retry_times = 20;
config::enable_check_instance_id = false;

if (!doris::cloud::init_glog("meta_service_test")) {
std::cerr << "failed to init glog" << std::endl;
Expand Down Expand Up @@ -264,6 +265,21 @@ TEST(MetaServiceTest, GetInstanceIdTest) {
"12345678901:ALBJLH4Q:m-n3qdpyal27rh8iprxx");
ASSERT_EQ(instance_id, "");

config::enable_check_instance_id = true;
auto ms = get_meta_service(false);
instance_id =
get_instance_id(ms->resource_mgr(), "1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
ASSERT_EQ(instance_id, "");

sp->set_call_back("is_instance_id_registered", [&](auto&& args) {
TxnErrorCode* c0 = try_any_cast<TxnErrorCode*>(args[0]);
*c0 = TxnErrorCode::TXN_OK;
});
instance_id =
get_instance_id(ms->resource_mgr(), "1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
ASSERT_EQ(instance_id, "ALBJLH4Q-check-invalid");
config::enable_check_instance_id = false;

sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
Expand Down

0 comments on commit d05487c

Please sign in to comment.