diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 04b0b4a3382671..b593b71263b057 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -220,4 +220,6 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000"); // Max aborted txn num for the same label name CONF_mInt64(max_num_aborted_txn, "100"); + +CONF_Bool(enable_check_instance_id, "true"); } // namespace doris::cloud::config diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 876a8817b85f0e..c98db9b41b531d 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -88,46 +88,54 @@ std::string get_instance_id(const std::shared_ptr& rc_mgr, std::vector nodes; std::string err = rc_mgr->get_node(cloud_unique_id, &nodes); { TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); } + std::string instance_id; if (!err.empty()) { // cache can't find cloud_unique_id, so degraded by parse cloud_unique_id // cloud_unique_id encode: ${version}:${instance_id}:${unique_id} // check it split by ':' c - auto vec = split(cloud_unique_id, ':'); - std::stringstream ss; - for (int i = 0; i < vec.size(); ++i) { - ss << "idx " << i << "= [" << vec[i] << "] "; - } - LOG(INFO) << "degraded to get instance_id, cloud_unique_id: " << cloud_unique_id - << "after split: " << ss.str(); - if (vec.size() != 3) { - LOG(WARNING) << "cloud unique id is not degraded format, failed to check instance " - "info, cloud_unique_id=" - << cloud_unique_id << " , err=" << err; + auto [valid, id] = ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id); + if (!valid) { + LOG(WARNING) << "use degraded format cloud_unique_id, but cloud_unique_id not degrade " + "format, cloud_unique_id=" + << cloud_unique_id; return ""; } - // version: vec[0], instance_id: vec[1], unique_id: vec[2] - switch (std::atoi(vec[0].c_str())) { - case 1: - // just return instance id; - return vec[1]; - default: - LOG(WARNING) << "cloud unique id degraded state, but version not eq configure, " + + // check instance_id valid by get fdb + if (config::enable_check_instance_id && !rc_mgr->is_instance_id_registered(id)) { + LOG(WARNING) << "use degraded format cloud_unique_id, but check instance failed, " "cloud_unique_id=" - << cloud_unique_id << ", err=" << err; + << cloud_unique_id; return ""; } + return id; } - std::string instance_id; - for (auto& i : nodes) { - if (!instance_id.empty() && instance_id != i.instance_id) { + for (auto& node : nodes) { + if (!instance_id.empty() && instance_id != node.instance_id) { LOG(WARNING) << "cloud_unique_id is one-to-many instance_id, " << " cloud_unique_id=" << cloud_unique_id << " current_instance_id=" << instance_id - << " later_instance_id=" << i.instance_id; + << " later_instance_id=" << node.instance_id; + } + instance_id = node.instance_id; // The last wins + // check cache unique_id + std::string cloud_unique_id_in_cache = node.node_info.cloud_unique_id(); + auto [valid, id] = + ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id_in_cache); + if (!valid) { + continue; + } + + if (id != node.instance_id || id != instance_id) { + LOG(WARNING) << "in cache, node=" << node.node_info.DebugString() + << ", cloud_unique_id=" << cloud_unique_id + << " current_instance_id=" << instance_id + << ", later_instance_id=" << node.instance_id; + continue; } - instance_id = i.instance_id; // The last wins } + return instance_id; } diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 5d4a4d6922771b..92020005c3a5d0 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -1943,6 +1943,16 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; instance_id = request->has_instance_id() ? request->instance_id() : ""; if (!cloud_unique_id.empty() && instance_id.empty()) { + auto [is_degraded_format, id] = + ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id); + if (config::enable_check_instance_id && is_degraded_format && + !resource_mgr_->is_instance_id_registered(id)) { + msg = "use degrade cloud_unique_id, but instance_id invalid, cloud_unique_id=" + + cloud_unique_id; + LOG(WARNING) << msg; + code = MetaServiceCode::INVALID_ARGUMENT; + return; + } instance_id = get_instance_id(resource_mgr_, cloud_unique_id); if (instance_id.empty()) { code = MetaServiceCode::INVALID_ARGUMENT; @@ -1992,7 +2002,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, case AlterClusterRequest::ADD_NODE: { resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false); if (msg != "") { - LOG(INFO) << msg; + LOG(WARNING) << msg; break; } std::vector to_add; @@ -2016,7 +2026,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, case AlterClusterRequest::DROP_NODE: { resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false); if (msg != "") { - LOG(INFO) << msg; + LOG(WARNING) << msg; break; } std::vector to_add; @@ -2039,7 +2049,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, case AlterClusterRequest::DECOMMISSION_NODE: { resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false); if (msg != "") { - LOG(INFO) << msg; + LOG(WARNING) << msg; break; } @@ -2101,7 +2111,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, case AlterClusterRequest::NOTIFY_DECOMMISSIONED: { resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false); if (msg != "") { - LOG(INFO) << msg; + LOG(WARNING) << msg; break; } diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index 43f0a7368d812b..9c37d781765510 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -23,6 +23,7 @@ #include #include "common/logging.h" +#include "common/string_util.h" #include "common/util.h" #include "cpp/sync_point.h" #include "meta-service/keys.h" @@ -159,6 +160,16 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std:: int master_num = 0; int follower_num = 0; for (auto& n : cluster.nodes()) { + // check here cloud_unique_id + std::string cloud_unique_id = n.cloud_unique_id(); + auto [is_degrade_format, instance_id] = get_instance_id_by_cloud_unique_id(cloud_unique_id); + if (config::enable_check_instance_id && is_degrade_format && + !is_instance_id_registered(instance_id)) { + ss << "node=" << n.DebugString() + << " cloud_unique_id use degrade format, but check instance failed"; + *err = ss.str(); + return false; + } if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() && n.edit_log_port() && n.has_node_type() && (n.node_type() == NodeInfoPB_NodeType_FE_MASTER || @@ -199,6 +210,27 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std:: return no_err; } +std::pair ResourceManager::get_instance_id_by_cloud_unique_id( + const std::string& cloud_unique_id) { + auto v = split(cloud_unique_id, ':'); + if (v.size() != 3) return {false, ""}; + // degraded format check it + int version = std::atoi(v[0].c_str()); + if (version != 1) return {false, ""}; + return {true, v[1]}; +} + +bool ResourceManager::is_instance_id_registered(const std::string& instance_id) { + // check kv + auto [c0, m0] = get_instance(nullptr, instance_id, nullptr); + { TEST_SYNC_POINT_CALLBACK("is_instance_id_registered", &c0); } + if (c0 != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to check instance instance_id=" << instance_id + << ", code=" << format_as(c0) << ", info=" + m0; + } + return c0 == TxnErrorCode::TXN_OK; +} + std::pair ResourceManager::add_cluster(const std::string& instance_id, const ClusterInfo& cluster) { std::string msg; @@ -624,7 +656,7 @@ std::pair ResourceManager::get_instance(std::shared_p return ec; } - if (!inst_pb->ParseFromString(val)) { + if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) { code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR; msg = "failed to parse InstanceInfoPB"; return ec; diff --git a/cloud/src/resource-manager/resource_manager.h b/cloud/src/resource-manager/resource_manager.h index 5000764dee8a0b..9e6f4548d244b7 100644 --- a/cloud/src/resource-manager/resource_manager.h +++ b/cloud/src/resource-manager/resource_manager.h @@ -114,6 +114,25 @@ class ResourceManager { bool check_cluster_params_valid(const ClusterPB& cluster, std::string* err, bool check_master_num); + /** + * Check cloud_unique_id is degraded format, and get instance_id from cloud_unique_id + * degraded format : "${version}:${instance_id}:${unique_id}" + * @param degraded cloud_unique_id + * + * @return a pair, if is_degraded_format == true , instance_id, if is_degraded_format == false, instance_id="" + */ + static std::pair get_instance_id_by_cloud_unique_id( + const std::string& cloud_unique_id); + + /** + * check instance_id is a valid instance, check by get fdb kv + * + * @param instance_id + * + * @return true, instance_id in fdb kv + */ + bool is_instance_id_registered(const std::string& instance_id); + /** * Refreshes the cache of given instance. This process removes the instance in cache * and then replaces it with persisted instance state read from underlying KV storage. diff --git a/cloud/test/fdb_injection_test.cpp b/cloud/test/fdb_injection_test.cpp index 125ae2b6b04170..08ba3e50e5253f 100644 --- a/cloud/test/fdb_injection_test.cpp +++ b/cloud/test/fdb_injection_test.cpp @@ -70,6 +70,7 @@ int main(int argc, char** argv) { cloud::config::txn_store_retry_base_intervals_ms = 1; cloud::config::fdb_cluster_file_path = "fdb.cluster"; cloud::config::write_schema_kv = true; + cloud::config::enable_check_instance_id = false; auto sp = SyncPoint::get_instance(); sp->enable_processing(); diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index e6b9e9dddeed7c..c67b49aac3f6f7 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -56,6 +56,7 @@ int main(int argc, char** argv) { config::enable_txn_store_retry = true; config::txn_store_retry_base_intervals_ms = 1; config::txn_store_retry_times = 20; + config::enable_check_instance_id = false; if (!doris::cloud::init_glog("meta_service_test")) { std::cerr << "failed to init glog" << std::endl; @@ -264,6 +265,21 @@ TEST(MetaServiceTest, GetInstanceIdTest) { "12345678901:ALBJLH4Q:m-n3qdpyal27rh8iprxx"); ASSERT_EQ(instance_id, ""); + config::enable_check_instance_id = true; + auto ms = get_meta_service(false); + instance_id = + get_instance_id(ms->resource_mgr(), "1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx"); + ASSERT_EQ(instance_id, ""); + + sp->set_call_back("is_instance_id_registered", [&](auto&& args) { + TxnErrorCode* c0 = try_any_cast(args[0]); + *c0 = TxnErrorCode::TXN_OK; + }); + instance_id = + get_instance_id(ms->resource_mgr(), "1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx"); + ASSERT_EQ(instance_id, "ALBJLH4Q-check-invalid"); + config::enable_check_instance_id = false; + sp->clear_all_call_backs(); sp->clear_trace(); sp->disable_processing();