diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index e742417c06bc3e..313076e2ab77e6 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -226,9 +226,6 @@ CONF_mInt64(max_num_aborted_txn, "100"); CONF_Bool(enable_check_instance_id, "true"); -// FE type force change from FE_MASTER to FE_FOLLOWER -CONF_Bool(force_change_to_multi_follower_mode, "true"); - // Check if ip eq 127.0.0.1, ms/recycler exit -CONF_Bool(disable_loopback_address_for_ms, "true"); +CONF_Bool(enable_check_loopback_address_for_ms, "true"); } // namespace doris::cloud::config diff --git a/cloud/src/common/network_util.cpp b/cloud/src/common/network_util.cpp index 0fe0738f22b674..f63d9d042e93d5 100644 --- a/cloud/src/common/network_util.cpp +++ b/cloud/src/common/network_util.cpp @@ -163,7 +163,7 @@ std::string get_local_ip(const std::string& priority_networks) { std::string localhost_str = butil::my_ip_cstr(); std::unique_ptr> defer((int*)0x01, [&localhost_str](int*) { // Check if ip eq 127.0.0.1, ms/recycler exit - if (config::disable_loopback_address_for_ms && "127.0.0.1" == localhost_str) { + if (config::enable_check_loopback_address_for_ms && "127.0.0.1" == localhost_str) { LOG(WARNING) << "enable check prohibit use loopback addr, but localhost=" << localhost_str << ", so exit(-1), please use priority network CIDR to set non-loopback " diff --git a/cloud/src/meta-service/injection_point_http.cpp b/cloud/src/meta-service/injection_point_http.cpp index 146a77a63596e9..18b2ddcebb07fd 100644 --- a/cloud/src/meta-service/injection_point_http.cpp +++ b/cloud/src/meta-service/injection_point_http.cpp @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -18,6 +17,7 @@ #include #include +#include #include "common/config.h" #include "common/logging.h" @@ -34,6 +34,11 @@ namespace doris::cloud { std::map> suite_map; std::once_flag register_suites_once; +// define a struct to store value only +struct TypedValue { + std::variant value; +}; + inline std::default_random_engine make_random_engine() { return std::default_random_engine( static_cast(std::chrono::steady_clock::now().time_since_epoch().count())); @@ -88,21 +93,104 @@ static void register_suites() { }); } +bool url_decode(const std::string& in, std::string* out) { + out->clear(); + out->reserve(in.size()); + + for (size_t i = 0; i < in.size(); ++i) { + if (in[i] == '%') { + if (i + 3 <= in.size()) { + int value = 0; + std::istringstream is(in.substr(i + 1, 2)); + + if (is >> std::hex >> value) { + (*out) += static_cast(value); + i += 2; + } else { + return false; + } + } else { + return false; + } + } else if (in[i] == '+') { + (*out) += ' '; + } else { + (*out) += in[i]; + } + } + + return true; +} + HttpResponse set_value(const std::string& point, const brpc::URI& uri) { std::string value_str(http_query(uri, "value")); - int64_t value = 0; - try { - value = std::stol(value_str); - } catch (const std::exception& e) { - auto msg = fmt::format("invalid value:{}", value_str); + std::string decoded_value; + if (!url_decode(value_str, &decoded_value)) { + auto msg = fmt::format("failed to decode value: {}", value_str); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + rapidjson::Document doc; + if (doc.Parse(decoded_value.c_str()).HasParseError()) { + auto msg = fmt::format("invalid json value: {}", decoded_value); LOG(WARNING) << msg; return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); } + if (!doc.IsArray()) { + auto msg = "value must be a json array"; + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + // use vector to keep order + std::vector parsed_values; + + for (const auto& value : doc.GetArray()) { + TypedValue typed_value; + try { + if (value.IsBool()) { + typed_value.value = value.GetBool(); + } else if (value.IsInt64()) { + typed_value.value = value.GetInt64(); + } else if (value.IsString()) { + typed_value.value = value.GetString(); + } else { + auto msg = "value must be boolean, integer or string"; + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + parsed_values.push_back(std::move(typed_value)); + } catch (const std::exception& e) { + auto msg = fmt::format("failed to parse value"); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + } + auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [point, value](auto&& args) { - LOG(INFO) << "injection point hit, point=" << point << " set value=" << value; - *try_any_cast(args[0]) = value; + sp->set_call_back(point, [point, parsed_values = std::move(parsed_values)](auto&& args) { + LOG(INFO) << "injection point hit, point=" << point; + for (size_t i = 0; i < parsed_values.size(); i++) { + const auto& typed_value = parsed_values[i]; + std::visit( + [&](const auto& v) { + LOG(INFO) << "index=" << i << " value=" << v + << " type=" << typeid(v).name(); + if constexpr (std::is_same_v, int64_t>) { + // process int64_t + *try_any_cast(args[i]) = v; + } else if constexpr (std::is_same_v, bool>) { + // process bool + *try_any_cast(args[i]) = v; + } else if constexpr (std::is_same_v, + std::string>) { + // process string + *try_any_cast(args[i]) = v; + } + }, + typed_value.value); + } }); return http_json_reply(MetaServiceCode::OK, "OK"); } @@ -155,7 +243,7 @@ HttpResponse handle_set(const brpc::URI& uri) { return set_sleep(point, uri); } else if (behavior == "return") { return set_return(point, uri); - } else if (behavior == "set") { + } else if (behavior == "change_args") { return set_value(point, uri); } @@ -224,8 +312,14 @@ HttpResponse handle_disable(const brpc::URI& uri) { // curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set // &name=${injection_point_name}&behavior=return" # return void -// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set -// &name=${injection_point_name}&behavior=set&value=10" # set value 10 +// ATTN: change_args use uri encode, see example test_ms_api.groovy +// use inject point in cpp file +// bool testBool = false; +// std::string testString = "world"; +// TEST_SYNC_POINT_CALLBACK("resource_manager::set_safe_drop_time", &exceed_time, &testBool, &testString); + +// curl http://175.40.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&o +// p=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=%5B-1%2Ctrue%2C%22hello%22%5D // ``` HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) { diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index b605220a52b867..a8611df3631b85 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -2012,14 +2012,6 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller, for (auto& n : request->cluster().nodes()) { NodeInfo node; node.instance_id = request->instance_id(); - // add fe node, force change type FE_MASTER to FE_FOLLOWER - if (config::force_change_to_multi_follower_mode && n.has_node_type() && - n.node_type() == NodeInfoPB::FE_MASTER) { - auto& mutable_node = const_cast&>(n); - mutable_node.set_node_type(NodeInfoPB::FE_FOLLOWER); - LOG(INFO) << "add fe master node, force change it to follwer type, n=" - << mutable_node.DebugString(); - } node.node_info = n; node.cluster_id = request->cluster().cluster_id(); node.cluster_name = request->cluster().cluster_name(); @@ -2412,28 +2404,6 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller, if ((c.has_cluster_name() && c.cluster_name() == cluster_name) || (c.has_cluster_id() && c.cluster_id() == cluster_id) || mysql_users.count(mysql_user_name)) { - // stock master-observers auto change to multi-followers - int64_t config_need_change = - config::force_change_to_multi_follower_mode == true ? 1 : 0; - TEST_SYNC_POINT_CALLBACK("MetaServiceImpl_get_cluster_set_config", - &config_need_change); - if (config_need_change && c.type() == ClusterPB::SQL) { - for (auto& node : c.nodes()) { - if (node.node_type() == NodeInfoPB::FE_MASTER) { - auto& mutable_node = const_cast&>(node); - auto now_time = std::chrono::system_clock::now(); - uint64_t time = std::chrono::duration_cast( - now_time.time_since_epoch()) - .count(); - mutable_node.set_mtime(time); - mutable_node.set_node_type(NodeInfoPB::FE_FOLLOWER); - is_instance_changed = true; - LOG(INFO) << "enable force change master-observers mode to " - "multi-followers, so change FE type to FE_FOLLOWER, node=" - << mutable_node.DebugString(); - } - } - } // just one cluster response->add_cluster()->CopyFrom(c); LOG_EVERY_N(INFO, 100) << "found a cluster, instance_id=" << instance.instance_id() diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index f864302fa5c6b8..3addfecdb85069 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -399,14 +399,6 @@ std::pair ResourceManager::add_cluster(const std:: auto& node = const_cast&>(n); node.set_ctime(time); node.set_mtime(time); - // add cluster, force modify FE_MASTER to FE_FOLLOWER, - // Incremental logic modification, stock compatibility - if (config::force_change_to_multi_follower_mode && req_cluster.has_type() && - ClusterPB::SQL == req_cluster.type() && node.has_node_type() && - NodeInfoPB::FE_MASTER == node.node_type()) { - LOG(INFO) << "change to multi follower mode, force modify FE_MASTER to FE_FOLLOWER"; - node.set_node_type(NodeInfoPB::FE_FOLLOWER); - } // Check duplicated nodes, one node cannot deploy on multiple clusters // diff instance_cluster's nodes and req_cluster's nodes for (auto& n : req_cluster.nodes()) { diff --git a/cloud/test/fdb_injection_test.cpp b/cloud/test/fdb_injection_test.cpp index a767da19f5c814..e647d015b1146e 100644 --- a/cloud/test/fdb_injection_test.cpp +++ b/cloud/test/fdb_injection_test.cpp @@ -71,7 +71,7 @@ int main(int argc, char** argv) { cloud::config::fdb_cluster_file_path = "fdb.cluster"; cloud::config::write_schema_kv = true; cloud::config::enable_check_instance_id = false; - cloud::config::disable_loopback_address_for_ms = false; + cloud::config::enable_check_loopback_address_for_ms = false; auto sp = SyncPoint::get_instance(); sp->enable_processing(); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 48c82eadb4da97..e02b3290ab0ba9 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -140,6 +140,7 @@ message ClusterPB { message NodeInfoPB { enum NodeType { UNKNOWN = 0; + // lagacy logic for one-master-multi-observer mode FE_MASTER = 1; FE_OBSERVER = 2; FE_FOLLOWER = 3; diff --git a/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy b/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy index 31da1d1acb0c18..62dfe5d39e269b 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy @@ -60,12 +60,12 @@ suite('test_ms_api', 'p0, docker') { } } - // curl "175.43.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&op=set&name=resource_manager::set_safe_drop_time&behavior=set&value=60000" + // curl "175.43.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&op=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=[-1]" def inject_to_ms_api = { msHttpPort, key, value, check_func -> httpTest { op "get" endpoint msHttpPort - uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=set&value=${value}" + uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}" check check_func } } @@ -1049,12 +1049,6 @@ suite('test_ms_api', 'p0, docker') { ] } - clusterOptions[0].msConfigs += [ - 'force_change_to_multi_follower_mode=false', - ] - clusterOptions[1].msConfigs += [ - 'force_change_to_multi_follower_mode=true', - ] for (def i = 0; i < clusterOptions.size(); i++) { // 1. Test that a node cannot be repeatedly added to multiple clusters @@ -1192,7 +1186,7 @@ suite('test_ms_api', 'p0, docker') { def feNode5 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip5}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"] def feNode6 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip6}", edit_log_port: "${edit_log_port}", node_type:"FE_FOLLOWER"] def addNodesClusterFailed = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: [feNode4, feNode5, feNode6]] - def dropAllFeNodesFailed = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: [feNodeMap1, feNodeMap2, feNodeMap3, feNode4, feNode5, feNode6]] + def dropAllFeNodesFailed = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: [feNodeMap1, feNodeMap2, feNodeMap3, feNode5, feNode6]] def addNodesClusterSucc = [cluster_name: "${feClusterName}", cluster_id: "${feClusterId}", type: "SQL", nodes: [feNode5, feNode6]] def addNodesFailedBody = [instance_id: "${instance_id}", cluster: addNodesClusterFailed] def dropAllFeNodesClusterBody = [instance_id: "${instance_id}", cluster: dropAllFeNodesFailed] @@ -1205,33 +1199,24 @@ suite('test_ms_api', 'p0, docker') { add_node_api.call(msHttpPort, addSomeFENodesFailed) { respCode, body -> json = parseJson(body) - if (i == 0) { - // failed, due to two master node - // if force_change_to_multi_follower_mode == false, check type not changed, FE_MASTER - log.info("add some fe failed nodes http cli result: ${body} ${respCode} ${json}".toString()) - assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR")) - assertTrue(json.msg.contains("instance invalid, cant modify, plz check")) - } else if (i == 1) { - // if force_change_to_multi_follower_mode == true, check type changed, FE_FOLLOWER - // in add node api params, fe4 node type has been changed from FE_MASTER to FE_FOLLOWER - assertTrue(json.code.equalsIgnoreCase("OK")) - } + // failed, due to two master node + // if force_change_to_multi_follower_mode == false, check type not changed, FE_MASTER + log.info("add some fe failed nodes http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR")) + assertTrue(json.msg.contains("instance invalid, cant modify, plz check")) } - if (i == 0) { - // i == 1, line 1209 has succ added - add_node_api.call(msHttpPort, addSomeFENodesSucc) { - respCode, body -> - json = parseJson(body) - log.info("add some fe nodes http cli result: ${body} ${respCode} ${json}".toString()) - assertTrue(json.code.equalsIgnoreCase("OK")) - } + add_node_api.call(msHttpPort, addSomeFENodesSucc) { + respCode, body -> + json = parseJson(body) + log.info("add some fe nodes http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("OK")) } // inject point, to change MetaServiceImpl_get_cluster_set_config - inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", -1) { - respCode, body -> - log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) + inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) { + respCode, body -> + log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) } enable_ms_inject_api.call(msHttpPort) { @@ -1239,14 +1224,12 @@ suite('test_ms_api', 'p0, docker') { log.info("enable inject resp: ${body} ${respCode}".toString()) } - if (i == 1) { - drop_node_api.call(msHttpPort, dropAllFeNodesFailedJson) { - respCode, body -> - json = parseJson(body) - log.info("drop all fe nodes failed http cli result: ${body} ${respCode} ${json}".toString()) - assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR")) - assertTrue(json.msg.contains("instance invalid, cant modify, plz check")) - } + drop_node_api.call(msHttpPort, dropAllFeNodesFailedJson) { + respCode, body -> + json = parseJson(body) + log.info("drop all fe nodes failed http cli result: ${body} ${respCode} ${json}".toString()) + assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR")) + assertTrue(json.msg.contains("instance invalid, cant modify, plz check")) } get_instance_api.call(msHttpPort, instance_id) { @@ -1270,17 +1253,8 @@ suite('test_ms_api', 'p0, docker') { log.info("find it node fe6: ${followerFeNode}") assertNotNull(checkFENode) assertNotNull(followerFeNode) - if (i == 0) { - // if force_change_to_multi_follower_mode == false, check type not changed, FE_MASTER - assertEquals("FE_MASTER", checkFENode.node_type) - assertEquals("FE_FOLLOWER", followerFeNode.node_type) - } else if (i == 1) { - // if force_change_to_multi_follower_mode == true, check type changed, FE_FOLLOWER - // check FE_MASTER type has been force changed to FE_FOLLOWER - // registe FE_MASTER - assertEquals("FE_FOLLOWER", checkFENode.node_type) - assertEquals("FE_FOLLOWER", followerFeNode.node_type) - } + assertEquals("FE_MASTER", checkFENode.node_type) + assertEquals("FE_FOLLOWER", followerFeNode.node_type) } } } @@ -1297,9 +1271,6 @@ suite('test_ms_api', 'p0, docker') { optionsForUpgrade.setFeNum(1) optionsForUpgrade.setBeNum(1) optionsForUpgrade.cloudMode = true - optionsForUpgrade.msConfigs += [ - 'force_change_to_multi_follower_mode=false', - ] docker(optionsForUpgrade) { def ms = cluster.getAllMetaservices().get(0) @@ -1371,11 +1342,6 @@ suite('test_ms_api', 'p0, docker') { assertEquals("FE_MASTER", checkMasterNode.node_type) } - // inject point, to change MetaServiceImpl_get_cluster_set_config - inject_to_ms_api.call(msHttpPort, "MetaServiceImpl_get_cluster_set_config", 1) { - respCode, body -> - log.info("inject MetaServiceImpl_get_cluster_set_config resp: ${body} ${respCode}".toString()) - } enable_ms_inject_api.call(msHttpPort) { respCode, body -> @@ -1396,7 +1362,7 @@ suite('test_ms_api', 'p0, docker') { it.ip == ip1 } assertNotNull(checkFollowerNode) - assertEquals("FE_FOLLOWER", checkFollowerNode.node_type) + assertEquals("FE_MASTER", checkFollowerNode.node_type) } // check instance @@ -1415,7 +1381,7 @@ suite('test_ms_api', 'p0, docker') { it.ip == ip1 } assertNotNull(checkFollowerNode) - assertEquals("FE_FOLLOWER", checkFollowerNode.node_type) + assertEquals("FE_MASTER", checkFollowerNode.node_type) } // 4. Test use HTTP API add fe, drop fe node in protection time failed, excced protection time succ @@ -1463,14 +1429,14 @@ suite('test_ms_api', 'p0, docker') { } // inject point, to change MetaServiceImpl_get_cluster_set_config - inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", -1) { - respCode, body -> - log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) + inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) { + respCode, body -> + log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) } enable_ms_inject_api.call(msHttpPort) { respCode, body -> - log.info("enable inject resp: ${body} ${respCode}".toString()) + log.info("enable inject resp: ${body} ${respCode}".toString()) } // after inject, drop fe node, drop fe cluster all succ @@ -1562,19 +1528,17 @@ suite('test_ms_api', 'p0, docker') { optionsForMs.setBeNum(1) optionsForMs.setMsNum(2) optionsForMs.cloudMode = true - optionsForMs.msConfigs += [ - 'disable_loopback_address_for_ms=false', - ] + docker(optionsForMs) { log.info("in test ms docker env") def mss = cluster.getAllMetaservices() cluster.addRWPermToAllFiles() def ms2 = cluster.getMetaservices().get(1) + assertNotNull(ms2) // change ms2 conf, and restart it def confFile = ms2.getConfFilePath() log.info("ms2 conf file: {}", confFile) def writer = new PrintWriter(new FileWriter(confFile, true)) // true 表示 append 模式 - writer.println("disable_loopback_address_for_ms=true") writer.println("priority_networks=127.0.0.1/32") writer.flush() writer.close() diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy index 41bd3777a8c1e6..b48f09a82f2a90 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -63,7 +63,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { httpTest { op "get" endpoint msHttpPort - uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=set&value=${value}" + uri "/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}" check check_func } } @@ -92,7 +92,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { def ms = cluster.getAllMetaservices().get(0) def msHttpPort = ms.host + ":" + ms.httpPort // inject point, to change MetaServiceImpl_get_cluster_set_config - inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", -1) { + inject_to_ms_api.call(msHttpPort, "resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) { respCode, body -> log.info("inject resource_manager::set_safe_drop_time resp: ${body} ${respCode}".toString()) }