diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 43a426a89593d5..02f106f90d9a14 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -225,4 +225,7 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000"); CONF_mInt64(max_num_aborted_txn, "100"); CONF_Bool(enable_check_instance_id, "true"); + +// Check if ip eq 127.0.0.1, ms/recycler exit +CONF_Bool(enable_loopback_address_for_ms, "false"); } // namespace doris::cloud::config diff --git a/cloud/src/common/network_util.cpp b/cloud/src/common/network_util.cpp index 9f1b085163d772..d31ca7ad937ae7 100644 --- a/cloud/src/common/network_util.cpp +++ b/cloud/src/common/network_util.cpp @@ -29,6 +29,7 @@ #include #include +#include "common/config.h" #include "common/logging.h" namespace doris::cloud { @@ -160,6 +161,16 @@ static bool get_hosts_v4(std::vector* hosts) { std::string get_local_ip(const std::string& priority_networks) { std::string localhost_str = butil::my_ip_cstr(); + std::unique_ptr> defer((int*)0x01, [&localhost_str](int*) { + // Check if ip eq 127.0.0.1, ms/recycler exit + LOG(INFO) << "get the IP for ms is " << localhost_str; + if (config::enable_loopback_address_for_ms || localhost_str != "127.0.0.1") return; + LOG(WARNING) << "localhost IP is loopback address (127.0.0.1), " + << "there may be multiple NICs for use, " + << "please set priority_network with a CIDR expression in doris_cloud.conf " + << "to choose a non-loopback address accordingly"; + exit(-1); + }); if (priority_networks == "") { LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" << localhost_str; return localhost_str; diff --git a/cloud/src/meta-service/injection_point_http.cpp b/cloud/src/meta-service/injection_point_http.cpp index 80d1bcfdf2e4d8..18b2ddcebb07fd 100644 --- a/cloud/src/meta-service/injection_point_http.cpp +++ b/cloud/src/meta-service/injection_point_http.cpp @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -18,6 +17,7 @@ #include #include +#include #include "common/config.h" #include "common/logging.h" @@ -34,6 +34,11 @@ namespace doris::cloud { std::map> suite_map; std::once_flag register_suites_once; +// define a struct to store value only +struct TypedValue { + std::variant value; +}; + inline std::default_random_engine make_random_engine() { return std::default_random_engine( static_cast(std::chrono::steady_clock::now().time_since_epoch().count())); @@ -88,6 +93,108 @@ static void register_suites() { }); } +bool url_decode(const std::string& in, std::string* out) { + out->clear(); + out->reserve(in.size()); + + for (size_t i = 0; i < in.size(); ++i) { + if (in[i] == '%') { + if (i + 3 <= in.size()) { + int value = 0; + std::istringstream is(in.substr(i + 1, 2)); + + if (is >> std::hex >> value) { + (*out) += static_cast(value); + i += 2; + } else { + return false; + } + } else { + return false; + } + } else if (in[i] == '+') { + (*out) += ' '; + } else { + (*out) += in[i]; + } + } + + return true; +} + +HttpResponse set_value(const std::string& point, const brpc::URI& uri) { + std::string value_str(http_query(uri, "value")); + std::string decoded_value; + if (!url_decode(value_str, &decoded_value)) { + auto msg = fmt::format("failed to decode value: {}", value_str); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + rapidjson::Document doc; + if (doc.Parse(decoded_value.c_str()).HasParseError()) { + auto msg = fmt::format("invalid json value: {}", decoded_value); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + if (!doc.IsArray()) { + auto msg = "value must be a json array"; + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + // use vector to keep order + std::vector parsed_values; + + for (const auto& value : doc.GetArray()) { + TypedValue typed_value; + try { + if (value.IsBool()) { + typed_value.value = value.GetBool(); + } else if (value.IsInt64()) { + typed_value.value = value.GetInt64(); + } else if (value.IsString()) { + typed_value.value = value.GetString(); + } else { + auto msg = "value must be boolean, integer or string"; + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + parsed_values.push_back(std::move(typed_value)); + } catch (const std::exception& e) { + auto msg = fmt::format("failed to parse value"); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + } + + auto sp = SyncPoint::get_instance(); + sp->set_call_back(point, [point, parsed_values = std::move(parsed_values)](auto&& args) { + LOG(INFO) << "injection point hit, point=" << point; + for (size_t i = 0; i < parsed_values.size(); i++) { + const auto& typed_value = parsed_values[i]; + std::visit( + [&](const auto& v) { + LOG(INFO) << "index=" << i << " value=" << v + << " type=" << typeid(v).name(); + if constexpr (std::is_same_v, int64_t>) { + // process int64_t + *try_any_cast(args[i]) = v; + } else if constexpr (std::is_same_v, bool>) { + // process bool + *try_any_cast(args[i]) = v; + } else if constexpr (std::is_same_v, + std::string>) { + // process string + *try_any_cast(args[i]) = v; + } + }, + typed_value.value); + } + }); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) { std::string duration_str(http_query(uri, "duration")); int64_t duration = 0; @@ -136,6 +243,8 @@ HttpResponse handle_set(const brpc::URI& uri) { return set_sleep(point, uri); } else if (behavior == "return") { return set_return(point, uri); + } else if (behavior == "change_args") { + return set_value(point, uri); } return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown behavior: " + behavior); @@ -202,6 +311,15 @@ HttpResponse handle_disable(const brpc::URI& uri) { // curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set // &name=${injection_point_name}&behavior=return" # return void + +// ATTN: change_args use uri encode, see example test_ms_api.groovy +// use inject point in cpp file +// bool testBool = false; +// std::string testString = "world"; +// TEST_SYNC_POINT_CALLBACK("resource_manager::set_safe_drop_time", &exceed_time, &testBool, &testString); + +// curl http://175.40.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&o +// p=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=%5B-1%2Ctrue%2C%22hello%22%5D // ``` HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) { diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index cc459c090bfd28..a8611df3631b85 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -2273,6 +2273,14 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, code = MetaServiceCode::UNDEFINED_ERR; } + // ugly but easy to repair + // not change cloud.proto add err_code + if (request->op() == AlterClusterRequest::DROP_NODE && + msg.find("not found") != std::string::npos) { + // see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return http status code 404 + code = MetaServiceCode::CLUSTER_NOT_FOUND; + } + if (code != MetaServiceCode::OK) return; auto f = new std::function([instance_id = request->instance_id(), txn_kv = txn_kv_] { @@ -2386,6 +2394,7 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller, response->mutable_cluster()->CopyFrom(instance.clusters()); LOG_EVERY_N(INFO, 100) << "get all cluster info, " << msg; } else { + bool is_instance_changed = false; for (int i = 0; i < instance.clusters_size(); ++i) { auto& c = instance.clusters(i); std::set mysql_users; @@ -2401,6 +2410,24 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller, << " cluster=" << msg; } } + if (is_instance_changed) { + val = instance.SerializeAsString(); + if (val.empty()) { + msg = "failed to serialize"; + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + return; + } + + txn->put(key, val); + LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key) + << " json=" << proto_to_json(instance); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = fmt::format("failed to commit kv txn, err={}", err); + LOG(WARNING) << msg; + } + } } if (response->cluster().empty()) { diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index 9c37d781765510..3addfecdb85069 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -183,7 +183,7 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std:: continue; } ss << "check cluster params failed, edit_log_port is required for frontends while " - "heatbeat_port is required for banckens, node : " + "heatbeat_port is required for banckends, node : " << proto_to_json(n); *err = ss.str(); no_err = false; @@ -191,19 +191,11 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std:: } if (check_master_num && ClusterPB::SQL == cluster.type()) { - no_err = false; - if (master_num > 0 && follower_num > 0) { - ss << "cluster is SQL type, and use multi follower mode, cant set master node, master " - "count: " - << master_num << " follower count: " << follower_num; - } else if (!follower_num && master_num != 1) { - ss << "cluster is SQL type, must have only one master node, now master count: " - << master_num; - } else { - // followers mode - // 1. followers 2. observers + followers - no_err = true; - ss << ""; + if (master_num == 0 && follower_num == 0) { + ss << "cluster is SQL type, but not set master and follower node, master count=" + << master_num << " follower count=" << follower_num + << " so sql cluster can't get a Master node"; + no_err = false; } *err = ss.str(); } @@ -231,6 +223,85 @@ bool ResourceManager::is_instance_id_registered(const std::string& instance_id) return c0 == TxnErrorCode::TXN_OK; } +/** + * Gets addr and port from NodeInfoPB + * @param node + * @return + */ +static std::pair get_node_endpoint_from_cluster(const ClusterPB::Type& type, + const NodeInfoPB& node) { + std::string addr = node.has_host() ? node.host() : (node.has_ip() ? node.ip() : ""); + int32_t port = (ClusterPB::SQL == type) + ? node.edit_log_port() + : (ClusterPB::COMPUTE == type ? node.heartbeat_port() : -1); + return std::make_pair(addr, port); +} + +/** + * Gets nodes endpoint from InstanceInfoPB which are registered + * @param instance + * @return + */ +static std::pair, std::set> get_nodes_endpoint_registered( + const InstanceInfoPB& instance) { + std::set instance_sql_node_endpoints; + std::set instance_compute_node_endpoints; + for (auto& instance_cluster : instance.clusters()) { + for (auto& node : instance_cluster.nodes()) { + const auto& [addr, port] = + get_node_endpoint_from_cluster(instance_cluster.type(), node); + if (ClusterPB::SQL == instance_cluster.type()) { + instance_sql_node_endpoints.insert(addr + ":" + std::to_string(port)); + } else if (ClusterPB::COMPUTE == instance_cluster.type()) { + instance_compute_node_endpoints.insert(addr + ":" + std::to_string(port)); + } + } + } + return std::make_pair(instance_sql_node_endpoints, instance_compute_node_endpoints); +} + +/** + * When add_cluster or add_node, check its node has been registered + * @param cluster type, for check port + * @param node, which node to add + * @param registered_fes, that fes has been registered in kv + * @param registered_bes, that bes has been registered in kv + * @return , if error_code == OK, check pass + */ +static std::pair check_node_has_been_registered( + const ClusterPB::Type& type, const NodeInfoPB& node, + std::set fe_endpoints_registered, + std::set be_endpoints_registered) { + const auto& [addr, port] = get_node_endpoint_from_cluster(type, node); + std::stringstream ss; + std::string msg; + if (addr == "" || port == -1) { + ss << "add node input args node invalid, cluster=" << proto_to_json(node); + LOG(WARNING) << ss.str(); + msg = ss.str(); + return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + std::string node_endpoint = addr + ":" + std::to_string(port); + + if (type == ClusterPB::SQL) { + if (fe_endpoints_registered.count(node_endpoint)) { + ss << "sql node endpoint has been added, registered fe node=" << node_endpoint; + LOG(WARNING) << ss.str(); + msg = ss.str(); + return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg); + } + } else if (type == ClusterPB::COMPUTE) { + if (be_endpoints_registered.count(node_endpoint)) { + ss << "compute node endpoint has been added, registered be node=" << node_endpoint; + LOG(WARNING) << ss.str(); + msg = ss.str(); + return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg); + } + } + return std::make_pair(MetaServiceCode::OK, ""); +} + std::pair ResourceManager::add_cluster(const std::string& instance_id, const ClusterInfo& cluster) { std::string msg; @@ -294,35 +365,49 @@ std::pair ResourceManager::add_cluster(const std:: return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg); } - LOG(INFO) << "cluster to add json=" << proto_to_json(cluster.cluster); + auto& req_cluster = cluster.cluster; + LOG(INFO) << "cluster to add json=" << proto_to_json(req_cluster); LOG(INFO) << "json=" << proto_to_json(instance); // Check id and name, they need to be unique // One cluster id per name, name is alias of cluster id - for (auto& i : instance.clusters()) { - if (i.cluster_id() == cluster.cluster.cluster_id()) { + for (auto& instance_cluster : instance.clusters()) { + if (instance_cluster.cluster_id() == req_cluster.cluster_id()) { ss << "try to add a existing cluster id," - << " existing_cluster_id=" << i.cluster_id(); + << " existing_cluster_id=" << instance_cluster.cluster_id(); msg = ss.str(); return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg); } - if (i.cluster_name() == cluster.cluster.cluster_name()) { + if (instance_cluster.cluster_name() == req_cluster.cluster_name()) { ss << "try to add a existing cluster name," - << " existing_cluster_name=" << i.cluster_name(); + << " existing_cluster_name=" << instance_cluster.cluster_name(); msg = ss.str(); return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg); } } - // TODO(gavin): Check duplicated nodes, one node cannot deploy on multiple clusters + // modify cluster's node info auto now_time = std::chrono::system_clock::now(); uint64_t time = std::chrono::duration_cast(now_time.time_since_epoch()).count(); - for (auto& n : cluster.cluster.nodes()) { + + const auto& [fe_endpoints_registered, be_endpoints_registered] = + get_nodes_endpoint_registered(instance); + + for (auto& n : req_cluster.nodes()) { auto& node = const_cast&>(n); node.set_ctime(time); node.set_mtime(time); + // Check duplicated nodes, one node cannot deploy on multiple clusters + // diff instance_cluster's nodes and req_cluster's nodes + for (auto& n : req_cluster.nodes()) { + const auto& [c1, m1] = check_node_has_been_registered( + req_cluster.type(), n, fe_endpoints_registered, be_endpoints_registered); + if (c1 != MetaServiceCode::OK) { + return std::make_pair(c1, m1); + } + } } auto to_add_cluster = instance.add_clusters(); @@ -358,6 +443,27 @@ std::pair ResourceManager::add_cluster(const std:: return std::make_pair(MetaServiceCode::OK, ""); } +/** + * The current implementation is to add fe clusters through HTTP API, + * such as follower nodes `ABC` in the cluster, and then immediately drop follower node `A`, while fe is not yet pulled up, + * which may result in the formation of a multi master fe cluster + * This function provides a simple protection mechanism that does not allow dropping the fe node within 5 minutes after adding it through the API(add_cluster/add_node). + * If you bypass this protection and do the behavior described above, god bless you. + * @param node, which fe node to drop + * @return true, can drop. false , within ctime 5 mins, can't drop + */ +static bool is_sql_node_exceeded_safe_drop_time(const NodeInfoPB& node) { + int64_t ctime = node.ctime(); + // protect time 5mins + int64_t exceed_time = 5 * 60; + TEST_SYNC_POINT_CALLBACK("resource_manager::set_safe_drop_time", &exceed_time); + exceed_time = ctime + exceed_time; + auto now_time = std::chrono::system_clock::now(); + int64_t current_time = + std::chrono::duration_cast(now_time.time_since_epoch()).count(); + return current_time > exceed_time; +} + std::pair ResourceManager::drop_cluster( const std::string& instance_id, const ClusterInfo& cluster) { std::stringstream ss; @@ -400,12 +506,27 @@ std::pair ResourceManager::drop_cluster( bool found = false; int idx = -1; + std::string cache_err_help_msg = + "Ms nodes memory cache may be inconsistent, pls check registry key may be contain " + "127.0.0.1, and call get_instance api get instance info from fdb"; ClusterPB to_del; - // Check id and name, they need to be unique + // Check id, they need to be unique // One cluster id per name, name is alias of cluster id for (auto& i : instance.clusters()) { ++idx; if (i.cluster_id() == cluster.cluster.cluster_id()) { + if (i.type() == ClusterPB::SQL) { + for (auto& fe_node : i.nodes()) { + // check drop fe cluster + if (!is_sql_node_exceeded_safe_drop_time(fe_node)) { + ss << "drop fe cluster not in safe time, try later, cluster=" + << i.DebugString(); + msg = ss.str(); + LOG(WARNING) << msg; + return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg); + } + } + } to_del.CopyFrom(i); LOG(INFO) << "found a cluster to drop," << " instance_id=" << instance_id << " cluster_id=" << i.cluster_id() @@ -418,7 +539,8 @@ std::pair ResourceManager::drop_cluster( if (!found) { ss << "failed to find cluster to drop," << " instance_id=" << instance_id << " cluster_id=" << cluster.cluster.cluster_id() - << " cluster_name=" << cluster.cluster.cluster_name(); + << " cluster_name=" << cluster.cluster.cluster_name() + << " help Msg=" << cache_err_help_msg; msg = ss.str(); return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg); } @@ -666,27 +788,26 @@ std::pair ResourceManager::get_instance(std::shared_p } // check instance pb is valid +// check in drop nodes +// In the mode of managing cluster nodes through SQL, FE knows the master node of the current system, so it cannot delete the current master node (SQL error) +// However, in the mode of managing cluster nodes through HTTP, MS cannot know which node in the current FE cluster is the master node, so some SOP operations are required for cloud control +// 1. First, check the status of the nodes in the current FE cluster +// 2. Drop non master nodes +// 3. If you want to drop the master node, you need to find a way to switch the master node to another follower node and then drop it again bool is_instance_valid(const InstanceInfoPB& instance) { // check has fe node for (auto& c : instance.clusters()) { if (c.has_type() && c.type() == ClusterPB::SQL) { int master = 0; int follower = 0; - std::string mode = "multi-followers"; for (auto& n : c.nodes()) { if (n.node_type() == NodeInfoPB::FE_MASTER) { - mode = "master-observers"; master++; } else if (n.node_type() == NodeInfoPB::FE_FOLLOWER) { follower++; } } - // if master/observers mode , not have master or have multi master, return false - if (mode == "master-observers" && master != 1) { - return false; - } - // if multi followers mode, not have follower, return false - if (mode == "multi-followers" && !follower) { + if (master > 1 || (master == 0 && follower == 0)) { return false; } return true; @@ -734,7 +855,24 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, } LOG(INFO) << "instance json=" << proto_to_json(instance); - std::vector> vec; + if (!to_add.empty()) { + // add nodes + // Check duplicated nodes, one node cannot deploy on multiple clusters + // diff instance's nodes and to_add nodes + const auto& [fe_endpoints_registered, be_endpoints_registered] = + get_nodes_endpoint_registered(instance); + for (auto& add_node : to_add) { + const ClusterPB::Type type = + add_node.role == Role::SQL_SERVER ? ClusterPB::SQL : ClusterPB::COMPUTE; + const auto& [c1, m1] = check_node_has_been_registered( + type, add_node.node_info, fe_endpoints_registered, be_endpoints_registered); + if (c1 != MetaServiceCode::OK) { + return m1; + } + } + } + // a vector save (origin_cluster , changed_cluster), to update ms mem + std::vector> change_from_to_clusters; using modify_impl_func = std::function; using check_func = std::function; auto modify_func = [&](const NodeInfo& node, check_func check, @@ -764,6 +902,7 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, return ""; }; + // check in ms mem cache check_func check_to_add = [&](const NodeInfo& n) -> std::string { std::string err; std::stringstream s; @@ -787,6 +926,7 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, return ""; }; + // modify kv modify_impl_func modify_to_add = [&](const ClusterPB& c, const NodeInfo& n) -> std::string { std::string err; std::stringstream s; @@ -838,7 +978,8 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, auto& change_cluster = const_cast&>(c); change_cluster.add_nodes()->CopyFrom(node); copied_cluster.CopyFrom(change_cluster); - vec.emplace_back(std::move(copied_original_cluster), std::move(copied_cluster)); + change_from_to_clusters.emplace_back(std::move(copied_original_cluster), + std::move(copied_cluster)); return ""; }; @@ -850,18 +991,22 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, } } + std::string cache_err_help_msg = + "Ms nodes memory cache may be inconsistent, pls check registry key may be contain " + "127.0.0.1, and call get_instance api get instance info from fdb"; + + // check in ms mem cache check_func check_to_del = [&](const NodeInfo& n) -> std::string { std::string err; std::stringstream s; auto [start, end] = node_info_.equal_range(n.node_info.cloud_unique_id()); if (start == node_info_.end() || start->first != n.node_info.cloud_unique_id()) { - s << "cloud_unique_id can not find to drop node," + s << "can not find to drop nodes by cloud_unique_id=" << n.node_info.cloud_unique_id() << " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name - << " cluster_id=" << n.cluster_id - << " cloud_unique_id=" << n.node_info.cloud_unique_id(); + << " cluster_id=" << n.cluster_id << " help Msg=" << cache_err_help_msg; err = s.str(); LOG(WARNING) << err; - return err; + return std::string("not found ,") + err; } bool found = false; @@ -905,15 +1050,16 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, if (!found) { s << "cloud_unique_id can not find to drop node," << " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name - << " cluster_id=" << n.cluster_id - << " cloud_unique_id=" << n.node_info.cloud_unique_id(); + << " cluster_id=" << n.cluster_id << " node_info=" << n.node_info.DebugString() + << " help Msg=" << cache_err_help_msg; err = s.str(); LOG(WARNING) << err; - return err; + return std::string("not found ,") + err; } return ""; }; + // modify kv modify_impl_func modify_to_del = [&](const ClusterPB& c, const NodeInfo& n) -> std::string { std::string err; std::stringstream s; @@ -922,7 +1068,10 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, bool found = false; int idx = -1; + // ni: to drop node const auto& ni = n.node_info; + // c.nodes: cluster registered nodes + NodeInfoPB copy_node; for (auto& cn : c.nodes()) { idx++; if (cn.has_ip() && ni.has_ip()) { @@ -937,6 +1086,7 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, : std::to_string(ni.edit_log_port())); if (ni.cloud_unique_id() == cn.cloud_unique_id() && cn_endpoint == ni_endpoint) { + copy_node.CopyFrom(cn); found = true; break; } @@ -955,6 +1105,7 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, if (ni.cloud_unique_id() == cn.cloud_unique_id() && cn_endpoint_host == ni_endpoint_host) { + copy_node.CopyFrom(cn); found = true; break; } @@ -964,17 +1115,26 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, if (!found) { s << "failed to find node to drop," << " instance_id=" << instance.instance_id() << " cluster_id=" << c.cluster_id() - << " cluster_name=" << c.cluster_name() << " cluster=" << proto_to_json(c); + << " cluster_name=" << c.cluster_name() << " cluster=" << proto_to_json(c) + << " help Msg =" << cache_err_help_msg; err = s.str(); LOG(WARNING) << err; - // not found return ok. - return ""; + return std::string("not found ,") + err; + } + + // check drop fe node + if (ClusterPB::SQL == c.type() && !is_sql_node_exceeded_safe_drop_time(copy_node)) { + s << "drop fe node not in safe time, try later, node=" << copy_node.DebugString(); + err = s.str(); + LOG(WARNING) << err; + return err; } copied_original_cluster.CopyFrom(c); auto& change_nodes = const_cast&>(c.nodes()); change_nodes.DeleteSubrange(idx, 1); // Remove it copied_cluster.CopyFrom(c); - vec.emplace_back(std::move(copied_original_cluster), std::move(copied_cluster)); + change_from_to_clusters.emplace_back(std::move(copied_original_cluster), + std::move(copied_cluster)); return ""; }; @@ -982,13 +1142,13 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, msg = modify_func(it, check_to_del, modify_to_del); if (msg != "") { LOG(WARNING) << msg; - // not found, just return OK to cloud control - return ""; + return msg; } } LOG(INFO) << "instance " << instance_id << " info: " << instance.DebugString(); - if (!to_del.empty() && !is_instance_valid(instance)) { + // here, instance has been changed, not save in fdb + if ((!to_add.empty() || !to_del.empty()) && !is_instance_valid(instance)) { msg = "instance invalid, cant modify, plz check"; LOG(WARNING) << msg; return msg; @@ -1014,7 +1174,7 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id, return msg; } - for (auto& it : vec) { + for (auto& it : change_from_to_clusters) { update_cluster_to_index(instance_id, it.first, it.second); } diff --git a/cloud/test/fdb_injection_test.cpp b/cloud/test/fdb_injection_test.cpp index 08ba3e50e5253f..60226e7f9521a2 100644 --- a/cloud/test/fdb_injection_test.cpp +++ b/cloud/test/fdb_injection_test.cpp @@ -71,6 +71,7 @@ int main(int argc, char** argv) { cloud::config::fdb_cluster_file_path = "fdb.cluster"; cloud::config::write_schema_kv = true; cloud::config::enable_check_instance_id = false; + cloud::config::enable_loopback_address_for_ms = true; auto sp = SyncPoint::get_instance(); sp->enable_processing(); @@ -92,6 +93,8 @@ int main(int argc, char** argv) { [](auto&& args) { *try_any_cast(args[0]) = 0; }); sp->set_call_back("put_schema_kv:schema_key_exists_return", [](auto&& args) { *try_any_cast(args.back()) = true; }); + sp->set_call_back("resource_manager::set_safe_drop_time", + [](auto&& args) { *try_any_cast(args[0]) = -1; }); meta_service = create_meta_service(); diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index 4360efeb4422a9..dc70f31613ba9c 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -810,11 +810,11 @@ TEST(MetaServiceHttpTest, AlterClusterTest) { req.mutable_cluster()->set_type(ClusterPB::COMPUTE); auto node = req.mutable_cluster()->add_nodes(); node->set_ip("127.0.0.1"); - node->set_heartbeat_port(9999); + node->set_heartbeat_port(9996); node->set_cloud_unique_id("cloud_unique_id"); auto& meta_service = ctx.meta_service_; NodeInfoPB npb; - npb.set_heartbeat_port(9999); + npb.set_heartbeat_port(9996); npb.set_ip("127.0.0.1"); npb.set_cloud_unique_id("cloud_unique_id"); meta_service->resource_mgr()->node_info_.insert( diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp index 516a0b35c0f21f..c8d53408a048e1 100644 --- a/cloud/test/resource_test.cpp +++ b/cloud/test/resource_test.cpp @@ -227,9 +227,11 @@ TEST(ResourceTest, ModifyNodesIpTest) { auto* c = ins.mutable_clusters()->Add(); c->set_cluster_name("cluster_name_1"); c->set_cluster_id("cluster_id_1"); + c->set_type(ClusterPB::COMPUTE); auto* c1 = ins.mutable_clusters()->Add(); c1->set_cluster_name("cluster_name_2"); c1->set_cluster_id("cluster_id_2"); + c1->set_type(ClusterPB::COMPUTE); *try_any_cast(args[1]) = ins; }); sp->enable_processing(); @@ -286,9 +288,11 @@ TEST(ResourceTest, ModifyNodesHostTest) { auto* c = ins.mutable_clusters()->Add(); c->set_cluster_name("cluster_name_1"); c->set_cluster_id("cluster_id_1"); + c->set_type(ClusterPB::COMPUTE); auto* c1 = ins.mutable_clusters()->Add(); c1->set_cluster_name("cluster_name_2"); c1->set_cluster_id("cluster_id_2"); + c1->set_type(ClusterPB::COMPUTE); *try_any_cast(args[1]) = ins; }); sp->enable_processing(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index f4c6005a0d828a..b09b3553016906 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -180,13 +180,12 @@ private Cloud.NodeInfoPB getLocalTypeFromMetaService() { .stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList()); helperNodes.clear(); - Optional firstNonObserverNode = allNodes.stream().findFirst(); - if (firstNonObserverNode.isPresent()) { - helperNodes.add(new HostInfo( - Config.enable_fqdn_mode ? firstNonObserverNode.get().getHost() - : firstNonObserverNode.get().getIp(), - firstNonObserverNode.get().getEditLogPort())); - } + Optional firstNonObserverNode = allNodes.stream() + .filter(nodeInfoPB -> nodeInfoPB.getNodeType() != NodeInfoPB.NodeType.FE_OBSERVER).findFirst(); + firstNonObserverNode.ifPresent(nodeInfoPB -> helperNodes.add(new HostInfo( + Config.enable_fqdn_mode ? nodeInfoPB.getHost() + : nodeInfoPB.getIp(), + nodeInfoPB.getEditLogPort()))); Preconditions.checkState(helperNodes.size() == 1); Optional local = allNodes.stream().filter(n -> ((Config.enable_fqdn_mode ? n.getHost() : n.getIp()) diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 381075074335c5..e02b3290ab0ba9 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -140,7 +140,7 @@ message ClusterPB { message NodeInfoPB { enum NodeType { UNKNOWN = 0; - // lagacy logic for one-master-multi-observer mode + // lagacy logic for one-master-multi-observer mode FE_MASTER = 1; FE_OBSERVER = 2; FE_FOLLOWER = 3; diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 856b0e76956395..2c00caa3e38b90 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -36,6 +36,7 @@ class ClusterOptions { int feNum = 1 int beNum = 3 + int msNum = 1 Boolean sqlModeNodeMgr = false Boolean beMetaServiceEndpoint = true @@ -57,6 +58,10 @@ class ClusterOptions { 'report_random_wait=false', ] + List msConfigs = [] + + List recycleConfigs = [] + boolean connectToFollower = false // 1. cloudMode = true, only create cloud cluster. @@ -295,6 +300,9 @@ class SuiteCluster { if (options.beNum > 0) { cmd += ['--add-be-num', String.valueOf(options.beNum)] } + if (options.msNum > 0) { + cmd += ['--add-ms-num', String.valueOf(options.msNum)] + } // TODO: need escape white space in config if (options.feConfigs != null && options.feConfigs.size() > 0) { cmd += ['--fe-config'] @@ -304,6 +312,14 @@ class SuiteCluster { cmd += ['--be-config'] cmd += options.beConfigs } + if (options.msConfigs != null && options.msConfigs.size() > 0) { + cmd += ['--ms-config'] + cmd += options.msConfigs + } + if (options.recycleConfigs != null && options.recycleConfigs.size() > 0) { + cmd += ['--recycle-config'] + cmd += options.recycleConfigs + } if (options.beDisks != null) { cmd += ['--be-disks'] cmd += options.beDisks @@ -556,6 +572,16 @@ class SuiteCluster { runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) } + // if not specific ms indices, then restart all ms + void restartMs(int... indices) { + runMsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) + } + + // if not specific recycler indices, then restart all recyclers + void restartRecyclers(int... indices) { + runRecyclerCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout ${START_WAIT_TIMEOUT}".toString(), indices) + } + // if not specific fe indices, then drop all frontends void dropFrontends(boolean clean=false, int... indices) { def cmd = 'down' @@ -635,6 +661,16 @@ class SuiteCluster { runCmd(cmd, timeoutSecond) } + private void runMsCmd(int timeoutSecond, String op, int... indices) { + def cmd = op + ' ' + name + ' --ms-id ' + indices.join(' ') + runCmd(cmd, timeoutSecond) + } + + private void runRecyclerCmd(int timeoutSecond, String op, int... indices) { + def cmd = op + ' ' + name + ' --recycle-id ' + indices.join(' ') + runCmd(cmd, timeoutSecond) + } + private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception { def fullCmd = String.format('python -W ignore %s %s --output-json', config.dorisComposePath, cmd) logger.info('Run doris compose cmd: {}', fullCmd) diff --git a/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy b/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy new file mode 100644 index 00000000000000..62dfe5d39e269b --- /dev/null +++ b/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy @@ -0,0 +1,1553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import groovy.json.JsonOutput +import org.apache.doris.regression.suite.ClusterOptions + +suite('test_ms_api', 'p0, docker') { + if (!isCloudMode()) { + return; + } + + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + + def create_instance_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/create_instance?token=$token" + body request_body + check check_func + } + } + + def get_instance_api = { msHttpPort, instance_id, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/get_instance?token=${token}&instance_id=${instance_id}" + check check_func + } + } + + + def enable_ms_inject_api = { msHttpPort, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=enable" + check check_func + } + } + + // curl "175.43.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&op=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=[-1]" + def inject_to_ms_api = { msHttpPort, key, value, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}" + check check_func + } + } + + // curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=clear" + def clear_ms_inject_api = { msHttpPort, key, value, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=clear" + check check_func + } + } + + def drop_cluster_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/drop_cluster?token=$token" + body request_body + check check_func + } + } + + + // drop instance + def drop_instance_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/drop_instance?token=greedisgood9999" + body request_body + check check_func + } + } + + def add_cluster_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/add_cluster?token=$token" + body request_body + check check_func + } + } + + def get_obj_store_info_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/get_obj_store_info?token=$token" + body request_body + check check_func + } + } + + def update_ak_sk_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/update_ak_sk?token=$token" + body request_body + check check_func + } + } + + def add_obj_info_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/add_obj_info?token=$token" + body request_body + check check_func + } + } + + def get_cluster_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/get_cluster?token=$token" + body request_body + check check_func + } + } + + def rename_node_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/rename_cluster?token=$token" + body request_body + check check_func + } + } + + def add_node_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/add_node?token=$token" + body request_body + check check_func + } + } + + def drop_node_api = { msHttpPort, request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/drop_node?token=$token" + body request_body + check check_func + } + } + + // old case + docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + logger.info("ms1 addr={}, port={}, ms endpoint={}", ms.host, ms.httpPort, msHttpPort) + + // Inventory function test + def token = "greedisgood9999" + def instance_id = "instance_id_test_in_docker" + def name = "user_1" + def user_id = "10000" + + // create instance + /* + curl -X GET '127.0.0.1:5000/MetaService/http/create_instance?token=greedisgood9999' -d '{ + "instance_id": "instance_id_deadbeef", + "name": "user_1", + "user_id": "10000", + "obj_info": { + "ak": "test-ak1", + "sk": "test-sk1", + "bucket": "test-bucket", + "prefix": "test-prefix", + "endpoint": "test-endpoint", + "region": "test-region", + "provider" : "BOS", + "external_endpoint" : "endpoint" + } + }' + */ + def jsonOutput = new JsonOutput() + def s3 = [ + ak: "test-ak1", + sk : "test-sk1", + bucket : "test-bucket", + prefix: "test-prefix", + endpoint: "test-endpoint", + region: "test-region", + provider : "BOS", + 'external_endpoint': "test-external-endpoint" + ] + def map = [instance_id: "${instance_id}", name: "${name}", user_id: "${user_id}", obj_info: s3] + def instance_body = jsonOutput.toJson(map) + + create_instance_api.call(msHttpPort, instance_body) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + def otherInstancemap = [instance_id: "instance_id_test_in_docker_other", name: "${name}", user_id: "${user_id}", obj_info: s3] + def otherInstanceBody = jsonOutput.toJson(otherInstancemap) + + create_instance_api.call(msHttpPort, otherInstanceBody) { + respCode, body -> + log.info("create other instance http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // create again failed + create_instance_api.call(msHttpPort, otherInstanceBody) { + respCode, body -> + log.info("create other instance again http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED")) + } + + jsonOutput = new JsonOutput() + def instance = [instance_id: "instance_id_test_in_docker_other"] + def dropInstanceBody = jsonOutput.toJson(instance) + + drop_instance_api.call(msHttpPort, dropInstanceBody) { + respCode, body -> + log.info("drop instance http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // add compute group 1 + def clusterName = "compute_name_1" + def clusterId = "compute_id_1" + def cloudUniqueId = "1:${instance_id}:xxxxx" + def compute_ip1 = "182.0.0.1" + def heartbeatPort = 9050 + + // add no be cluster + /* + curl -X GET http://127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999 -d '{ + "instance_id": "instance_id_deadbeef", + "cluster": { + "cluster_name": "cluster_name1", + "cluster_id": "cluster_id1", + "type": "COMPUTE", + "nodes": [] + } + }' + */ + def nodeList = [] + def clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def addEmptyComputeGroup = jsonOutput.toJson(instance) + add_cluster_api.call(msHttpPort, addEmptyComputeGroup) { + respCode, body -> + log.info("add empty compute group http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + jsonOutput = new JsonOutput() + def dropEmptyComputeGroup = jsonOutput.toJson(instance) + // drop empty cluster + /* + curl -X GET http://127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999 -d '{ + "instance_id": "instance_id_deadbeef", + "cluster": { + "cluster_name": "cluster_name1", + "cluster_id": "cluster_id1" + } + }' + */ + drop_cluster_api.call(msHttpPort, dropEmptyComputeGroup) { + respCode, body -> + log.info("drop empty compute group http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + def nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: "${compute_ip1}", heartbeat_port: "${heartbeatPort}"] + nodeList = [nodeMap] + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + def addComputeGroupBody = jsonOutput.toJson(instance) + // add_cluster has one node + /* + curl '127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999' -d '{ + "instance_id":"instance_id_deadbeef", + "cluster":{ + "cluster_name":"cluster_name1", + "cluster_id":"cluster_id1", + "type" : "COMPUTE", + "nodes":[ + { + "cloud_unique_id":"cloud_unique_id_compute_node0", + "ip":"172.0.0.10", + "heartbeat_port":9050 + } + ] + } + }' + */ + add_cluster_api.call(msHttpPort, addComputeGroupBody) { + respCode, body -> + log.info("add one compute node http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // add sql group + // see Config.java + // public static String cloud_sql_server_cluster_id = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"; + // public static String cloud_sql_server_cluster_name = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER"; + def feClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER" + def feClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER" + def ip1 = "162.0.0.1" + def ip2 = "162.0.0.2" + def ip3 = "162.0.0.3" + def edit_log_port = 8050 + def feNodeMap1 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip1}", edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"] + def feNodeMap2 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip2}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"] + def feNodeMap3 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip3}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"] + def feNodeList = [feNodeMap1, feNodeMap2, feNodeMap3] + def feClusterMap = [cluster_name: "${feClusterName}", cluster_id:"${feClusterId}", type:"SQL", nodes:feNodeList] + instance = [instance_id: "${instance_id}", cluster: feClusterMap] + jsonOutput = new JsonOutput() + def addSqlGroupBody = jsonOutput.toJson(instance) + + add_cluster_api.call(msHttpPort, addSqlGroupBody) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + def json + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + json.result + + + // get instance's s3 info + /* + curl '127.0.0.1:5000/MetaService/http/get_obj_store_info?token=greedisgood9999' -d '{ + "cloud_unique_id":"cloud_unique_id_compute_node0" + }' + */ + def get_obj_store_info_api_body = [cloud_unique_id:"${cloudUniqueId}"] + jsonOutput = new JsonOutput() + def getObjStoreInfo = jsonOutput.toJson(get_obj_store_info_api_body) + + get_obj_store_info_api.call(msHttpPort, getObjStoreInfo) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // update instance's s3 info + /* + curl '127.0.0.1:5000/MetaService/http/update_ak_sk?token=greedisgood9999' -d '{ + "instance_id": "cloud_unique_id_compute_node0", + "internal_bucket_user": [ + "user_id": "1-userid", + "ak": "test-ak1-updated", + "sk": "test-sk1-updated" + ], + }' + */ + def internal_bucket_user = [[user_id:"1-userid", ak:"test-ak1-updated", sk:"test-sk1-updated"]] + def update_ak_sk_api_body = [instance_id:"${instance_id}", internal_bucket_user:internal_bucket_user] + jsonOutput = new JsonOutput() + upDateAKSKBody = jsonOutput.toJson(update_ak_sk_api_body) + + + update_ak_sk_api.call(msHttpPort, upDateAKSKBody) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // add s3 info to instance + /* + curl '127.0.0.1:5000/MetaService/http/add_obj_info?token=greedisgood9999' -d '{ + "cloud_unique_id": "cloud_unique_id_compute_node0", + "obj": { + "ak": "test-ak2", + "sk": "test-sk2", + "bucket": "test-bucket", + "prefix": "test-prefix", + "endpoint": "test-endpoint", + "region": "test-region", + "provider": "COS" + } + }' + */ + + def add_obj_info_api_body = [cloud_unique_id:"${cloudUniqueId}", + obj:[ak:"test-ak2", sk:"test-sk2", bucket:"test-bucket", + prefix: "test-prefix", endpoint: "test-endpoint", region:"test-region", provider:"COS"]] + jsonOutput = new JsonOutput() + addObjInfoBody = jsonOutput.toJson(add_obj_info_api_body) + + + add_obj_info_api.call(msHttpPort, addObjInfoBody) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // add again, failed + add_cluster_api.call(msHttpPort, addComputeGroupBody) { + respCode, body -> + log.info("add again http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED")) + assertTrue(json.msg.startsWith("try to add a existing cluster id")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // get_cluster by cluster name + /* + curl '127.0.0.1:5000/MetaService/http/get_cluster?token=greedisgood9999' -d '{ + "cluster_name": "cluster_name1", + "cloud_unique_id": "cloud_unique_id_compute_node0" + }' + */ + def get_cluster_by_name = [cluster_name: "${clusterName}", cloud_unique_id: "${cloudUniqueId}"] + jsonOutput = new JsonOutput() + def getClusterByNameBody = jsonOutput.toJson(get_cluster_by_name) + + get_cluster_api.call(msHttpPort, getClusterByNameBody) { + respCode, body -> + json = parseJson(body) + log.info("http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // get_cluster by cluster id + /* + curl '127.0.0.1:5000/MetaService/http/get_cluster?token=greedisgood9999' -d '{ + "cluster_id": "cluster_id1", + "cloud_unique_id": "cloud_unique_id_compute_node0" + }' + */ + def get_cluster_by_id = [cluster_id: "${clusterId}", cloud_unique_id: "${cloudUniqueId}"] + jsonOutput = new JsonOutput() + def getClusterByIdBody = jsonOutput.toJson(get_cluster_by_id) + + get_cluster_api.call(msHttpPort, getClusterByIdBody) { + respCode, body -> + json = parseJson(body) + log.info("http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // add nodes + /* + curl '127.0.0.1:5000/MetaService/http/add_node?token=greedisgood9999' -d '{ + "instance_id": "instance_id_deadbeef", + "cluster": { + "cluster_name": "cluster_name1", + "cluster_id": "cluster_id1", + "type": "COMPUTE", + "nodes": [ + { + "cloud_unique_id": "cloud_unique_id_compute_node1", + "ip": "172.0.0.11", + "heartbeat_port": 9050 + }, + { + "cloud_unique_id": "cloud_unique_id_compute_node2", + "ip": "172.0.0.12", + "heartbeat_port": 9050 + } + ] + } + }' + */ + def compute_ip2 = "182.0.0.2" + def compute_ip3 = "182.0.0.3" + def node1 = [cloud_unique_id: "${cloudUniqueId}", ip : "${compute_ip2}", heartbeat_port: 9050] + def node2 = [cloud_unique_id: "${cloudUniqueId}", ip : "${compute_ip3}", heartbeat_port: 9050] + def add_nodes = [node1, node2] + def add_nodes_cluster = [cluster_name: "${clusterName}", cluster_id: "${clusterId}", type: "COMPUTE", nodes: add_nodes] + def add_nodes_body = [instance_id: "${instance_id}", cluster: add_nodes_cluster] + jsonOutput = new JsonOutput() + def addTwoComputeNodeBody = jsonOutput.toJson(add_nodes_body) + + add_node_api.call(msHttpPort, addTwoComputeNodeBody) { + respCode, body -> + json = parseJson(body) + log.info("add two compute nodes http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // drop nodes + /* + curl '127.0.0.1:5000/MetaService/http/drop_node?token=greedisgood9999' -d '{ + "instance_id": "instance_id_deadbeef", + "cluster": { + "cluster_name": "cluster_name1", + "cluster_id": "cluster_id1", + "type": "COMPUTE", + "nodes": [ + { + "cloud_unique_id": "cloud_unique_id_compute_node1", + "ip": "172.0.0.11", + "heartbeat_port": 9050 + }, + { + "cloud_unique_id": "cloud_unique_id_compute_node2", + "ip": "172.0.0.12", + "heartbeat_port": 9050 + } + ] + } + }' + */ + def del_nodes = [node1, node2] + def del_nodes_cluster = [cluster_name: "${clusterName}", cluster_id: "${clusterId}", type: "COMPUTE", nodes: del_nodes] + def del_nodes_body = [instance_id: "${instance_id}", cluster: del_nodes_cluster] + jsonOutput = new JsonOutput() + def delTwoNodesBody = jsonOutput.toJson(del_nodes_body) + + + drop_node_api.call(msHttpPort, delTwoNodesBody) { + respCode, body -> + json = parseJson(body) + log.info("drop two nodes http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // rename cluster + /* + curl '127.0.0.1:5000/MetaService/http/rename_cluster?token=greedisgood9999' -d '{ + "instance_id":"instance_id_deadbeef", + "cluster":{ + "cluster_name":"compute_name_1", + "cluster_id":"compute_id_1" + } + }' + */ + + clusterMap = [cluster_name: "compute_name_1_renamed", cluster_id:"${clusterId}"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def renmaeClusterBody = jsonOutput.toJson(instance) + rename_node_api.call(msHttpPort, renmaeClusterBody) { + respCode, body -> + json = parseJson(body) + log.info("rename cluster http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def renameClusterBackBody = jsonOutput.toJson(instance) + rename_node_api.call(msHttpPort, renameClusterBackBody) { + respCode, body -> + json = parseJson(body) + log.info("rename cluster back http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + def clusterName2 = "cluster_name2" + def clusterId2 = "cluster_id2" + def nodeList1 = [node1] + clusterMap1 = [cluster_name: "${clusterName2}", cluster_id:"${clusterId2}", type:"COMPUTE", nodes:nodeList1] + instance = [instance_id: "${instance_id}", cluster: clusterMap1] + jsonOutput = new JsonOutput() + def addNewComputeGroupBody = jsonOutput.toJson(instance) + // add_cluster has one node + /* + curl '127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999' -d '{ + "instance_id":"instance_id_deadbeef", + "cluster":{ + "cluster_name":"cluster_name2", + "cluster_id":"cluster_id2", + "type" : "COMPUTE", + "nodes":[ + { + "cloud_unique_id":"cloud_unique_id_compute_node0", + "ip":"172.0.0.11", + "heartbeat_port":9050 + } + ] + } + }' + */ + + add_cluster_api.call(msHttpPort, addNewComputeGroupBody) { + respCode, body -> + log.info("add new compute group http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // "code": "INVALID_ARGUMENT", + // "msg": "failed to drop instance, instance has clusters" + def dropInstanceFailedDueHasCuster = addNewComputeGroupBody + drop_instance_api.call(msHttpPort, dropInstanceFailedDueHasCuster) { + respCode, body -> + log.info("drop instance failed due to has cluster http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT")) + } + + // failed, failed to rename cluster, a cluster with the same name already exists in this instance + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId2}"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def renameFailedBody = jsonOutput.toJson(instance) + rename_node_api.call(msHttpPort, renameFailedBody) { + respCode, body -> + json = parseJson(body) + log.info("rename failed http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT")) + assertTrue(json.msg.contains("failed to rename cluster, a cluster with the same name already exists in this instance")) + } + + // get cluster status + /* + curl '127.0.0.1:5000/MetaService/http/get_cluster_status?token=greedisgood9999' -d '{ + "instance_ids":["instance_id_deadbeef"] + }' + */ + def get_cluster_status = { request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/get_cluster_status?token=$token" + body request_body + check check_func + } + } + + // set cluster status + /* + curl '127.0.0.1:5000/MetaService/http/set_cluster_status?token=greedisgood9999' -d '{ + "instance_id": "instance_id_deadbeef", + "cluster": { + "cluster_id": "test_cluster_1_id1", + "cluster_status":"STOPPED" + } + }' + */ + def set_cluster_status = { request_body, check_func -> + httpTest { + endpoint msHttpPort + uri "/MetaService/http/set_cluster_status?token=$token" + body request_body + check check_func + } + } + + def getClusterInstance = [instance_ids: ["${instance_id}"]] + jsonOutput = new JsonOutput() + def getClusterStatusBody = jsonOutput.toJson(getClusterInstance) + get_cluster_status.call(getClusterStatusBody) { + respCode, body -> + json = parseJson(body) + log.info("get cluster status http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + clusterMap = [cluster_id:"${clusterId2}", cluster_status:"SUSPENDED"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def setClusterStatusBody = jsonOutput.toJson(instance) + set_cluster_status.call(setClusterStatusBody) { + respCode, body -> + json = parseJson(body) + log.info("set cluster status http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // failed to set cluster status, status eq original status, original cluster is NORMAL + set_cluster_status.call(setClusterStatusBody) { + respCode, body -> + json = parseJson(body) + log.info("set cluster status again failed http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT")) + } + + // failed to set cluster status, original cluster is SUSPENDED and want set UNKNOWN + clusterMap = [cluster_id:"${clusterId2}", cluster_status:"UNKNOWN"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def setClusterInvalidStatusBody = jsonOutput.toJson(instance) + set_cluster_status.call(setClusterInvalidStatusBody) { + respCode, body -> + json = parseJson(body) + log.info("set cluster status invalid status http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT")) + } + + // drop cluster + /* + curl '127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999' -d '{ + "instance_id":"instance_id_deadbeef", + "cluster":{ + "cluster_name":"cluster_name1", + "cluster_id":"cluster_id1" + } + }' + */ + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def dropCluster1Body = jsonOutput.toJson(instance) + drop_cluster_api.call(msHttpPort, dropCluster1Body) { + respCode, body -> + log.info("drop cluster 1 http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // drop cluster_id2 + clusterMap = [cluster_id:"${clusterId2}"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def dropCluster2Body = jsonOutput.toJson(instance) + drop_cluster_api.call(msHttpPort, dropCluster2Body) { + respCode, body -> + log.info("drop cluster 2 http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // drop not exist cluster, falied + /* + curl '127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999' -d '{ + "instance_id":"instance_id_deadbeef", + "cluster":{ + "cluster_name":"not_exist_cluster_name", + "cluster_id":"not_exist_cluster_name" + } + }' + */ + clusterMap = [cluster_name: "not_exist_cluster_name", cluster_id:"not_exist_cluster_id", nodes:nodeList] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def dropNotExistClusterBody = jsonOutput.toJson(instance) + drop_cluster_api.call(msHttpPort, dropNotExistClusterBody) { + respCode, body -> + log.info("drop not exist cluster http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("NOT_FOUND")) + } + + // after drop, get cluster again, failed + instance = [cloud_unique_id: "${cloudUniqueId}", cluster_id: "${clusterId}"] + jsonOutput = new JsonOutput() + def afterDropAndGetBody = jsonOutput.toJson(instance) + get_cluster_api.call(msHttpPort, afterDropAndGetBody) { + respCode, body -> + json = parseJson(body) + log.info("after drop and get again http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("NOT_FOUND")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // add node to another cluster, compute_ip1 not owned by any cluster + /* + curl '127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999' -d '{ + "instance_id":"instance_id_deadbeef", + "cluster":{ + "cluster_name":"cluster_name2", + "cluster_id":"cluster_name2", + "type" : "COMPUTE", + "nodes":[ + { + "cloud_unique_id":"cloud_unique_id_compute_node0", + "ip":"172.0.0.10", + "heartbeat_port":9050 + } + ] + } + }' + */ + clusterName = "compute_group_name2" + clusterId = "compute_group_id2" + nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: "${compute_ip1}", heartbeat_port: "${heartbeatPort}"] + nodeList = [nodeMap] + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}", nodes:nodeList, type:"COMPUTE"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def addNodeToOtherBody = jsonOutput.toJson(instance) + add_cluster_api.call(msHttpPort, addNodeToOtherBody) { + respCode, body -> + log.info("add node to other compute group http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // drop cluster + /* + curl -X GET http://127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999 -d '{ + "instance_id": "instance_id_deadbeef", + "cluster": { + "cluster_name": "cluster_name2", + "cluster_id": "cluster_id2" + } + }' + */ + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}"] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + jsonOutput = new JsonOutput() + def dropClusterErrBody = jsonOutput.toJson(instance) + drop_cluster_api.call(msHttpPort, dropClusterErrBody) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("after drop other cluster get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // add back compute group + add_cluster_api.call(msHttpPort, addNewComputeGroupBody) { + respCode, body -> + log.info("add new compute group again again http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + add_cluster_api.call(msHttpPort, addComputeGroupBody) { + respCode, body -> + log.info("add again again http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("after add back fe cluster get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // custom instance PB + /* + { + "code": "OK", + "msg": "", + "result": { + "user_id": "10000", + "instance_id": "instance_id_test_in_docker", + "name": "user_1", + "clusters": [ + { + "cluster_id": "RESERVED_CLUSTER_ID_FOR_SQL_SERVER", + "cluster_name": "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER", + "type": "SQL", + "nodes": [ + { + "cloud_unique_id": "1:instance_id_test_in_docker:xxxxx", + "ip": "162.0.0.1", + "ctime": "1733650571", + "mtime": "1733650571", + "edit_log_port": 8050, + "node_type": "FE_FOLLOWER" + }, + { + "cloud_unique_id": "1:instance_id_test_in_docker:xxxxx", + "ip": "162.0.0.2", + "ctime": "1733650571", + "mtime": "1733650571", + "edit_log_port": 8050, + "node_type": "FE_OBSERVER" + }, + { + "cloud_unique_id": "1:instance_id_test_in_docker:xxxxx", + "ip": "162.0.0.3", + "ctime": "1733650571", + "mtime": "1733650571", + "edit_log_port": 8050, + "node_type": "FE_OBSERVER" + } + ] + }, + { + "cluster_id": "cluster_id2", + "cluster_name": "cluster_name2", + "type": "COMPUTE", + "nodes": [ + { + "cloud_unique_id": "1:instance_id_test_in_docker:xxxxx", + "ip": "182.0.0.2", + "ctime": "1733653263", + "mtime": "1733653263", + "heartbeat_port": 9050 + } + ], + "cluster_status": "NORMAL" + }, + { + "cluster_id": "compute_id_1", + "cluster_name": "compute_name_1", + "type": "COMPUTE", + "nodes": [ + { + "cloud_unique_id": "1:instance_id_test_in_docker:xxxxx", + "ip": "182.0.0.1", + "ctime": "1733653263", + "mtime": "1733653263", + "heartbeat_port": 9050 + } + ], + "cluster_status": "NORMAL" + } + ], + "obj_info": [ + { + "ctime": "1733650571", + "mtime": "1733650571", + "id": "1", + "ak": "test-ak1-updated", + "sk": "test-sk1-updated", + "bucket": "test-bucket", + "prefix": "test-prefix", + "endpoint": "test-endpoint", + "region": "test-region", + "provider": "BOS", + "external_endpoint": "test-external-endpoint", + "user_id": "1-userid", + "encryption_info": { + "encryption_method": "AES_256_ECB", + "key_id": "1" + }, + "sse_enabled": false + }, + { + "ctime": "1733650571", + "mtime": "1733650571", + "id": "2", + "ak": "test-ak2", + "sk": "test-sk2", + "bucket": "test-bucket", + "prefix": "test-prefix", + "endpoint": "test-endpoint", + "region": "test-region", + "provider": "COS", + "external_endpoint": "", + "encryption_info": { + "encryption_method": "AES_256_ECB", + "key_id": "1" + }, + "sse_enabled": false + } + ], + "status": "NORMAL", + "iam_user": { + "user_id": "", + "ak": "", + "sk": "", + "external_id": "instance_id_test_in_docker" + }, + "sse_enabled": false, + "enable_storage_vault": false + } + } + */ + } + + def clusterOptions = [ + new ClusterOptions(), + new ClusterOptions(), + ] + + for (def cos in clusterOptions) { + cos.cloudMode = true + cos.feNum = 1 + cos.beNum = 1 + cos.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + ] + } + + + for (def i = 0; i < clusterOptions.size(); i++) { + // 1. Test that a node cannot be repeatedly added to multiple clusters + // 1.1 compute node + // 2. Test API supports switching from master observer mode to multi follower mode + docker(clusterOptions[i]) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + logger.info("ms2 addr={}, port={}, ms endpoint={}", ms.host, ms.httpPort, msHttpPort) + + def token = "greedisgood9999" + def instance_id = "instance_id_test_in_docker_1" + def name = "user_1" + def user_id = "10000" + def clusterName = "compute_name_1" + def clusterId = "compute_id_1" + def cloudUniqueId = "1:${instance_id}:xxxxx" + // create instance + def jsonOutput = new JsonOutput() + def s3 = [ + ak: "test-ak1", + sk : "test-sk1", + bucket : "test-bucket", + prefix: "test-prefix", + endpoint: "test-endpoint", + region: "test-region", + provider : "BOS", + 'external_endpoint': "test-external-endpoint" + ] + def map = [instance_id: "${instance_id}", name: "${name}", user_id: "${user_id}", obj_info: s3] + def instance_body = jsonOutput.toJson(map) + + create_instance_api.call(msHttpPort, instance_body) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + def compute_ip1 = "182.0.0.1" + def heartbeatPort = 9050 + def nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: "${compute_ip1}", heartbeat_port: "${heartbeatPort}"] + nodeList = [nodeMap] + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + def addComputeGroupBody = jsonOutput.toJson(instance) + add_cluster_api.call(msHttpPort, addComputeGroupBody) { + respCode, body -> + log.info("add one compute node http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // 1. Test that a node cannot be repeatedly added to multiple clusters + // 1.1 compute node + node1 = [cloud_unique_id: "${cloudUniqueId}", ip : "${compute_ip1}", heartbeat_port: 9050] + add_nodes = [node1] + def otherClusterName = "compute_name_1_other" + def otherClusterId = "compute_id_1_other" + add_nodes_cluster = [cluster_name: "${otherClusterName}", cluster_id: "${otherClusterId}", type: "COMPUTE", nodes: add_nodes] + def addNodeToOtherCluster = [instance_id: "${instance_id}", cluster: add_nodes_cluster] + jsonOutput = new JsonOutput() + addNodeToOtherClusterbody = jsonOutput.toJson(addNodeToOtherCluster) + add_cluster_api.call(msHttpPort, addNodeToOtherClusterbody) { + respCode, body -> + log.info("add node to other compute group http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED")) + assertTrue(json.msg.contains("compute node endpoint has been added")) + } + + // 1.2 sql node + def feClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER" + def feClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER" + def ip1 = "162.0.0.1" + def ip2 = "162.0.0.2" + def ip3 = "162.0.0.3" + def edit_log_port = 8050 + def feNodeMap1 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip1}", edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"] + def feNodeMap2 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip2}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"] + def feNodeMap3 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip3}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"] + def feNodeList = [feNodeMap1, feNodeMap2, feNodeMap3] + def feClusterMap = [cluster_name: "${feClusterName}", cluster_id:"${feClusterId}", type:"SQL", nodes:feNodeList] + instance = [instance_id: "${instance_id}", cluster: feClusterMap] + jsonOutput = new JsonOutput() + def addSqlGroupBody = jsonOutput.toJson(instance) + + add_cluster_api.call(msHttpPort, addSqlGroupBody) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + node_fe_other = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip3}", edit_log_port: 8050, node_type:"FE_FOLLOWER"] + add_nodes = [node_fe_other] + otherClusterName = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER_OTHER" + otherClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER_OTHER" + add_nodes_cluster = [cluster_name: "${otherClusterName}", cluster_id: "${otherClusterId}", type:"SQL", nodes: add_nodes] + def addNodeToOtherClusterFE = [instance_id: "${instance_id}", cluster: add_nodes_cluster] + jsonOutput = new JsonOutput() + addNodeToOtherFEClusterbody = jsonOutput.toJson(addNodeToOtherClusterFE) + add_cluster_api.call(msHttpPort, addNodeToOtherFEClusterbody) { + respCode, body -> + log.info("add node to other compute group http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED")) + assertTrue(json.msg.contains("sql node endpoint has been added")) + } + + // 2. Test API supports switching from master observer mode to multi follower mode + def newTestSQLIP1 = "152.0.0.1" + def newTestSQLIP2 = "152.0.0.2" + def newNodefeTestForUnlimit1Failed = [cloud_unique_id: "${cloudUniqueId}", ip : "${newTestSQLIP1}", edit_log_port: 8050, node_type:"FE_OBSERVER"] + def newNodefeTestForUnlimit2 = [cloud_unique_id: "${cloudUniqueId}", ip : "${newTestSQLIP2}", edit_log_port: 8050, node_type:"FE_OBSERVER"] + def addNodesFailed = [newNodefeTestForUnlimit1Failed, newNodefeTestForUnlimit2] + + // two FE_OBSERVER Failed to add cluster + def addTwoObNodesClusterFailed = [cluster_name: "${otherClusterName}", cluster_id: "${otherClusterId}", type:"SQL", nodes: addNodesFailed] + def addNodesClusterInstanceFailed = [instance_id: "${instance_id}", cluster: addTwoObNodesClusterFailed] + jsonOutput = new JsonOutput() + def addNodesClusterFailedBody = jsonOutput.toJson(addNodesClusterInstanceFailed) + add_cluster_api.call(msHttpPort, addNodesClusterFailedBody) { + respCode, body -> + log.info("add two observer fe failed test http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT")) + assertTrue(json.msg.contains("cluster is SQL type, but not set master and follower node, master count=0 follower count=0 so sql cluster can't get a Master node")) + } + + def ip4 = "162.0.0.4" + def ip5 = "162.0.0.5" + def ip6 = "162.0.0.6" + def feNode4 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip4}", edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"] + def feNode5 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip5}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"] + def feNode6 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip6}", edit_log_port: "${edit_log_port}", node_type:"FE_FOLLOWER"] + def addNodesClusterFailed = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: [feNode4, feNode5, feNode6]] + def dropAllFeNodesFailed = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: [feNodeMap1, feNodeMap2, feNodeMap3, feNode5, feNode6]] + def addNodesClusterSucc = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: [feNode5, feNode6]] + def addNodesFailedBody = [instance_id: "${instance_id}", cluster: addNodesClusterFailed] + def dropAllFeNodesClusterBody = [instance_id: "${instance_id}", cluster: dropAllFeNodesFailed] + def addNodesBodySuccBody= [instance_id: "${instance_id}", cluster: addNodesClusterSucc] + jsonOutput = new JsonOutput() + def addSomeFENodesFailed = jsonOutput.toJson(addNodesFailedBody) + def addSomeFENodesSucc = jsonOutput.toJson(addNodesBodySuccBody) + def dropAllFeNodesFailedJson = jsonOutput.toJson(dropAllFeNodesClusterBody) + + add_node_api.call(msHttpPort, addSomeFENodesFailed) { + respCode, body -> + json = parseJson(body) + // failed, due to two master node + // if force_change_to_multi_follower_mode == false, check type not changed, FE_MASTER + log.info("add some fe failed nodes http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR")) + assertTrue(json.msg.contains("instance invalid, cant modify, plz check")) + } + + add_node_api.call(msHttpPort, addSomeFENodesSucc) { + respCode, body -> + json = parseJson(body) + log.info("add some fe nodes http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // inject point, to change MetaServiceImpl_get_cluster_set_config + inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) { + respCode, body -> + log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) + } + + enable_ms_inject_api.call(msHttpPort) { + respCode, body -> + log.info("enable inject resp: ${body} ${respCode}".toString()) + } + + drop_node_api.call(msHttpPort, dropAllFeNodesFailedJson) { + respCode, body -> + json = parseJson(body) + log.info("drop all fe nodes failed http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR")) + assertTrue(json.msg.contains("instance invalid, cant modify, plz check")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("add Master-observer mode get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + def result = json.result + def FECluster = result.clusters.find { + it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER" + } + log.info("find it cluster ${FECluster}") + assertNotNull(FECluster) + def checkFENode = FECluster.nodes.find { + it.ip == ip1 + } + def followerFeNode = FECluster.nodes.find { + it.ip == ip6 + } + log.info("find it node fe1: ${checkFENode}") + log.info("find it node fe6: ${followerFeNode}") + assertNotNull(checkFENode) + assertNotNull(followerFeNode) + assertEquals("FE_MASTER", checkFENode.node_type) + assertEquals("FE_FOLLOWER", followerFeNode.node_type) + } + } + } + + // 3. fe get cluster, change FE_MASTER to FE_FOLLOWER + // Upgrade compatibility for existing master-observers models + // when call get_cluster api, FE_MASTER type will be changed to FE_FOLLOWER + def optionsForUpgrade = new ClusterOptions() + optionsForUpgrade.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + optionsForUpgrade.setFeNum(1) + optionsForUpgrade.setBeNum(1) + optionsForUpgrade.cloudMode = true + + docker(optionsForUpgrade) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + logger.info("ms3 addr={}, port={}, ms endpoint={}", ms.host, ms.httpPort, msHttpPort) + + def token = "greedisgood9999" + def instance_id = "instance_id_test_in_docker_2" + def name = "user_1" + def user_id = "10000" + def clusterName = "compute_name_1" + def clusterId = "compute_id_1" + def cloudUniqueId = "1:${instance_id}:xxxxx" + // create instance + def jsonOutput = new JsonOutput() + def s3 = [ + ak: "test-ak1", + sk : "test-sk1", + bucket : "test-bucket", + prefix: "test-prefix", + endpoint: "test-endpoint", + region: "test-region", + provider : "BOS", + 'external_endpoint': "test-external-endpoint" + ] + def map = [instance_id: "${instance_id}", name: "${name}", user_id: "${user_id}", obj_info: s3] + def instance_body = jsonOutput.toJson(map) + + create_instance_api.call(msHttpPort, instance_body) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + def feClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER" + def feClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER" + def ip1 = "162.0.0.1" + def ip2 = "162.0.0.2" + def edit_log_port = 8050 + def feNodeMap1 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip1}", edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"] + def feNodeMap2 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip2}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"] + def feNodeList = [feNodeMap1, feNodeMap2] + def feClusterMap = [cluster_name: "${feClusterName}", cluster_id:"${feClusterId}", type:"SQL", nodes:feNodeList] + instance = [instance_id: "${instance_id}", cluster: feClusterMap] + jsonOutput = new JsonOutput() + def addSqlGroupBody = jsonOutput.toJson(instance) + + add_cluster_api.call(msHttpPort, addSqlGroupBody) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("add Master-observer mode get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + def result = json.result + def FECluster = result.clusters.find { + it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER" + } + log.info("find it cluster ${FECluster}") + assertNotNull(FECluster) + def checkMasterNode = FECluster.nodes.find { + it.ip == ip1 + } + assertNotNull(checkMasterNode) + assertEquals("FE_MASTER", checkMasterNode.node_type) + } + + + enable_ms_inject_api.call(msHttpPort) { + respCode, body -> + log.info("enable inject resp: ${body} ${respCode}".toString()) + } + + def getFEClusterByName = [cluster_name: "${feClusterName}", cluster_id:"${feClusterId}", cloud_unique_id: "${cloudUniqueId}"] + jsonOutput = new JsonOutput() + def getClusterByNameBody = jsonOutput.toJson(getFEClusterByName) + + get_cluster_api.call(msHttpPort, getClusterByNameBody) { + respCode, body -> + json = parseJson(body) + log.info("get FE cluster http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + def result = json.result + def checkFollowerNode = result.nodes.find { + it.ip == ip1 + } + assertNotNull(checkFollowerNode) + assertEquals("FE_MASTER", checkFollowerNode.node_type) + } + + // check instance + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("after get cluster get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + def result = json.result + def FECluster = result.clusters.find { + it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER" + } + log.info("find it cluster ${FECluster}") + assertNotNull(FECluster) + def checkFollowerNode = FECluster.nodes.find { + it.ip == ip1 + } + assertNotNull(checkFollowerNode) + assertEquals("FE_MASTER", checkFollowerNode.node_type) + } + + // 4. Test use HTTP API add fe, drop fe node in protection time failed, excced protection time succ + // test drop fe observer node + def del_nodes = [feNodeMap2] + def del_nodes_cluster = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: del_nodes] + def del_nodes_body = [instance_id: "${instance_id}", cluster: del_nodes_cluster] + jsonOutput = new JsonOutput() + def delFeObserverNodesBody = jsonOutput.toJson(del_nodes_body) + + drop_node_api.call(msHttpPort, delFeObserverNodesBody) { + respCode, body -> + json = parseJson(body) + log.info("drop fe observer node http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR")) + assertTrue(json.msg.contains("drop fe node not in safe time, try later")) + } + + // test drop fe cluster, can drop, without protection + // fe checker will drop fe node, + // non-master node will exit, and master node, whill throw error, can't drop itself succ + // feClusterMap = [cluster_name: "${feClusterName}", cluster_id:"${feClusterId}"] + feClusterMap = [cluster_id:"${feClusterId}"] + instance = [instance_id: "${instance_id}", cluster: feClusterMap] + jsonOutput = new JsonOutput() + def dropFeClusterBody = jsonOutput.toJson(instance) + drop_cluster_api.call(msHttpPort, dropFeClusterBody) { + respCode, body -> + log.info("drop fe cluster http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("NOT_FOUND")) + assertTrue(json.msg.contains("drop fe cluster not in safe time, try later")) + } + + get_cluster_api.call(msHttpPort, getClusterByNameBody) { + respCode, body -> + json = parseJson(body) + log.info("get FE cluster after drop observer http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + def result = json.result + def checkObserverNotBeenDropedNode = result.nodes.find { + it.ip == ip2 + } + assertNotNull(checkObserverNotBeenDropedNode) + } + + // inject point, to change MetaServiceImpl_get_cluster_set_config + inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) { + respCode, body -> + log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) + } + + enable_ms_inject_api.call(msHttpPort) { + respCode, body -> + log.info("enable inject resp: ${body} ${respCode}".toString()) + } + + // after inject, drop fe node, drop fe cluster all succ + drop_node_api.call(msHttpPort, delFeObserverNodesBody) { + respCode, body -> + json = parseJson(body) + log.info("after inject drop fe observer nodeshttp cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_cluster_api.call(msHttpPort, getClusterByNameBody) { + respCode, body -> + json = parseJson(body) + log.info("get FE cluster after drop observer http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) + def result = json.result + def checkObserverBeenDropedNode = result.nodes.find { + it.ip == ip2 + } + assertNull(checkObserverBeenDropedNode) + } + + drop_cluster_api.call(msHttpPort, dropFeClusterBody) { + respCode, body -> + log.info("drop fe cluster http cli result: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("after get cluster get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + def result = json.result + def FECluster = result.clusters.find { + it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER" + } + assertNull(FECluster) + } + + // 5. Test Drop node, unable to find node message HTTP code return 404 + def compute_ip1 = "182.0.0.1" + def heartbeatPort = 9050 + def nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: "${compute_ip1}", heartbeat_port: "${heartbeatPort}"] + nodeList = [nodeMap] + clusterMap = [cluster_name: "${clusterName}", cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList] + instance = [instance_id: "${instance_id}", cluster: clusterMap] + def addComputeGroupBody = jsonOutput.toJson(instance) + add_cluster_api.call(msHttpPort, addComputeGroupBody) { + respCode, body -> + log.info("http cli result: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + get_instance_api.call(msHttpPort, instance_id) { + respCode, body -> + log.info("after get cluster get instance resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + + // 182.0.0.3 not in instance, can't find to drop, return code 404 + def node1 = [cloud_unique_id: "${cloudUniqueId}", ip : "182.0.0.3", heartbeat_port: 9050] + def del_compute_nodes = [node1] + def del_compute_nodes_cluster = [cluster_name: "${clusterName}", cluster_id: "${clusterId}", type: "COMPUTE", nodes: del_compute_nodes] + def del_compute_nodes_body = [instance_id: "${instance_id}", cluster: del_compute_nodes_cluster] + jsonOutput = new JsonOutput() + def delComputeNodesBody = jsonOutput.toJson(del_compute_nodes_body) + + drop_node_api.call(msHttpPort, delComputeNodesBody) { + respCode, body -> + log.info("drop compute group http cli result: ${body} ${respCode}".toString()) + assertEquals(404, respCode) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("NOT_FOUND")) + } + } + + // 6. check 127.0.0.1 ms exit + def optionsForMs = new ClusterOptions() + optionsForMs.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + optionsForMs.setFeNum(1) + optionsForMs.setBeNum(1) + optionsForMs.setMsNum(2) + optionsForMs.cloudMode = true + + docker(optionsForMs) { + log.info("in test ms docker env") + def mss = cluster.getAllMetaservices() + cluster.addRWPermToAllFiles() + def ms2 = cluster.getMetaservices().get(1) + assertNotNull(ms2) + // change ms2 conf, and restart it + def confFile = ms2.getConfFilePath() + log.info("ms2 conf file: {}", confFile) + def writer = new PrintWriter(new FileWriter(confFile, true)) // true 表示 append 模式 + writer.println("priority_networks=127.0.0.1/32") + writer.flush() + writer.close() + + cluster.restartMs(ms2.index) + // check ms2 exit, exit need some time + sleep(15000) + ms2 = cluster.getMetaservices().get(1) + def ms2Alive = ms2.alive + assertFalse(ms2Alive) + } +} diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy index 7405cb864d889d..b48f09a82f2a90 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -36,6 +36,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { ] options.cloudMode = true options.sqlModeNodeMgr = true + options.connectToFollower = true options.waitTimeout = 0 options.feNum = 3 options.feConfigs += ["resource_not_ready_sleep_seconds=1", @@ -58,9 +59,48 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { clusterOptions[3].beClusterId = false; clusterOptions[3].beMetaServiceEndpoint = false; + def inject_to_ms_api = { msHttpPort, key, value, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}" + check check_func + } + } + + def clear_ms_inject_api = { msHttpPort, key, value, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=clear" + check check_func + } + } + + def enable_ms_inject_api = { msHttpPort, check_func -> + httpTest { + op "get" + endpoint msHttpPort + uri "/MetaService/http/v1/injection_point?token=${token}&op=enable" + check check_func + } + } + for (options in clusterOptions) { docker(options) { logger.info("docker started"); + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + // inject point, to change MetaServiceImpl_get_cluster_set_config + inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) { + respCode, body -> + log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) + } + + enable_ms_inject_api.call(msHttpPort) { + respCode, body -> + log.info("enable inject resp: ${body} ${respCode}".toString()) + } def checkFrontendsAndBackends = { // Check frontends @@ -489,6 +529,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { logger.info("Successfully decommissioned backend and verified its status") checkClusterStatus(3, 3, 8) + } }