Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng committed Dec 27, 2024
1 parent 93ac740 commit 920dbac
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 126 deletions.
5 changes: 1 addition & 4 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ CONF_mInt64(max_num_aborted_txn, "100");

CONF_Bool(enable_check_instance_id, "true");

// FE type force change from FE_MASTER to FE_FOLLOWER
CONF_Bool(force_change_to_multi_follower_mode, "true");

// Check if ip eq 127.0.0.1, ms/recycler exit
CONF_Bool(disable_loopback_address_for_ms, "true");
CONF_Bool(enable_check_loopback_address_for_ms, "true");
} // namespace doris::cloud::config
2 changes: 1 addition & 1 deletion cloud/src/common/network_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ std::string get_local_ip(const std::string& priority_networks) {
std::string localhost_str = butil::my_ip_cstr();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&localhost_str](int*) {
// Check if ip eq 127.0.0.1, ms/recycler exit
if (config::disable_loopback_address_for_ms && "127.0.0.1" == localhost_str) {
if (config::enable_check_loopback_address_for_ms && "127.0.0.1" == localhost_str) {
LOG(WARNING) << "enable check prohibit use loopback addr, but localhost="
<< localhost_str
<< ", so exit(-1), please use priority network CIDR to set non-loopback "
Expand Down
118 changes: 106 additions & 12 deletions cloud/src/meta-service/injection_point_http.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand All @@ -18,6 +17,7 @@

#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
#include <rapidjson/document.h>

#include "common/config.h"
#include "common/logging.h"
Expand All @@ -34,6 +34,11 @@ namespace doris::cloud {
std::map<std::string, std::function<void()>> suite_map;
std::once_flag register_suites_once;

// define a struct to store value only
struct TypedValue {
std::variant<int64_t, bool, std::string> value;
};

inline std::default_random_engine make_random_engine() {
return std::default_random_engine(
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
Expand Down Expand Up @@ -88,21 +93,104 @@ static void register_suites() {
});
}

bool url_decode(const std::string& in, std::string* out) {
out->clear();
out->reserve(in.size());

for (size_t i = 0; i < in.size(); ++i) {
if (in[i] == '%') {
if (i + 3 <= in.size()) {
int value = 0;
std::istringstream is(in.substr(i + 1, 2));

if (is >> std::hex >> value) {
(*out) += static_cast<char>(value);
i += 2;
} else {
return false;
}
} else {
return false;
}
} else if (in[i] == '+') {
(*out) += ' ';
} else {
(*out) += in[i];
}
}

return true;
}

HttpResponse set_value(const std::string& point, const brpc::URI& uri) {
std::string value_str(http_query(uri, "value"));
int64_t value = 0;
try {
value = std::stol(value_str);
} catch (const std::exception& e) {
auto msg = fmt::format("invalid value:{}", value_str);
std::string decoded_value;
if (!url_decode(value_str, &decoded_value)) {
auto msg = fmt::format("failed to decode value: {}", value_str);
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}
rapidjson::Document doc;
if (doc.Parse(decoded_value.c_str()).HasParseError()) {
auto msg = fmt::format("invalid json value: {}", decoded_value);
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}

if (!doc.IsArray()) {
auto msg = "value must be a json array";
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}

// use vector to keep order
std::vector<TypedValue> parsed_values;

for (const auto& value : doc.GetArray()) {
TypedValue typed_value;
try {
if (value.IsBool()) {
typed_value.value = value.GetBool();
} else if (value.IsInt64()) {
typed_value.value = value.GetInt64();
} else if (value.IsString()) {
typed_value.value = value.GetString();
} else {
auto msg = "value must be boolean, integer or string";
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}
parsed_values.push_back(std::move(typed_value));
} catch (const std::exception& e) {
auto msg = fmt::format("failed to parse value");
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point, value](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point << " set value=" << value;
*try_any_cast<int64_t*>(args[0]) = value;
sp->set_call_back(point, [point, parsed_values = std::move(parsed_values)](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point;
for (size_t i = 0; i < parsed_values.size(); i++) {
const auto& typed_value = parsed_values[i];
std::visit(
[&](const auto& v) {
LOG(INFO) << "index=" << i << " value=" << v
<< " type=" << typeid(v).name();
if constexpr (std::is_same_v<std::decay_t<decltype(v)>, int64_t>) {
// process int64_t
*try_any_cast<int64_t*>(args[i]) = v;
} else if constexpr (std::is_same_v<std::decay_t<decltype(v)>, bool>) {
// process bool
*try_any_cast<bool*>(args[i]) = v;
} else if constexpr (std::is_same_v<std::decay_t<decltype(v)>,
std::string>) {
// process string
*try_any_cast<std::string*>(args[i]) = v;
}
},
typed_value.value);
}
});
return http_json_reply(MetaServiceCode::OK, "OK");
}
Expand Down Expand Up @@ -155,7 +243,7 @@ HttpResponse handle_set(const brpc::URI& uri) {
return set_sleep(point, uri);
} else if (behavior == "return") {
return set_return(point, uri);
} else if (behavior == "set") {
} else if (behavior == "change_args") {
return set_value(point, uri);
}

Expand Down Expand Up @@ -224,8 +312,14 @@ HttpResponse handle_disable(const brpc::URI& uri) {
// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
// &name=${injection_point_name}&behavior=return" # return void

// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
// &name=${injection_point_name}&behavior=set&value=10" # set value 10
// ATTN: change_args use uri encode, see example test_ms_api.groovy
// use inject point in cpp file
// bool testBool = false;
// std::string testString = "world";
// TEST_SYNC_POINT_CALLBACK("resource_manager::set_safe_drop_time", &exceed_time, &testBool, &testString);

// curl http://175.40.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&o
// p=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=%5B-1%2Ctrue%2C%22hello%22%5D
// ```

HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) {
Expand Down
30 changes: 0 additions & 30 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2012,14 +2012,6 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
for (auto& n : request->cluster().nodes()) {
NodeInfo node;
node.instance_id = request->instance_id();
// add fe node, force change type FE_MASTER to FE_FOLLOWER
if (config::force_change_to_multi_follower_mode && n.has_node_type() &&
n.node_type() == NodeInfoPB::FE_MASTER) {
auto& mutable_node = const_cast<std::decay_t<decltype(n)>&>(n);
mutable_node.set_node_type(NodeInfoPB::FE_FOLLOWER);
LOG(INFO) << "add fe master node, force change it to follwer type, n="
<< mutable_node.DebugString();
}
node.node_info = n;
node.cluster_id = request->cluster().cluster_id();
node.cluster_name = request->cluster().cluster_name();
Expand Down Expand Up @@ -2412,28 +2404,6 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
if ((c.has_cluster_name() && c.cluster_name() == cluster_name) ||
(c.has_cluster_id() && c.cluster_id() == cluster_id) ||
mysql_users.count(mysql_user_name)) {
// stock master-observers auto change to multi-followers
int64_t config_need_change =
config::force_change_to_multi_follower_mode == true ? 1 : 0;
TEST_SYNC_POINT_CALLBACK("MetaServiceImpl_get_cluster_set_config",
&config_need_change);
if (config_need_change && c.type() == ClusterPB::SQL) {
for (auto& node : c.nodes()) {
if (node.node_type() == NodeInfoPB::FE_MASTER) {
auto& mutable_node = const_cast<std::decay_t<decltype(node)>&>(node);
auto now_time = std::chrono::system_clock::now();
uint64_t time = std::chrono::duration_cast<std::chrono::seconds>(
now_time.time_since_epoch())
.count();
mutable_node.set_mtime(time);
mutable_node.set_node_type(NodeInfoPB::FE_FOLLOWER);
is_instance_changed = true;
LOG(INFO) << "enable force change master-observers mode to "
"multi-followers, so change FE type to FE_FOLLOWER, node="
<< mutable_node.DebugString();
}
}
}
// just one cluster
response->add_cluster()->CopyFrom(c);
LOG_EVERY_N(INFO, 100) << "found a cluster, instance_id=" << instance.instance_id()
Expand Down
8 changes: 0 additions & 8 deletions cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,6 @@ std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const std::
auto& node = const_cast<std::decay_t<decltype(n)>&>(n);
node.set_ctime(time);
node.set_mtime(time);
// add cluster, force modify FE_MASTER to FE_FOLLOWER,
// Incremental logic modification, stock compatibility
if (config::force_change_to_multi_follower_mode && req_cluster.has_type() &&
ClusterPB::SQL == req_cluster.type() && node.has_node_type() &&
NodeInfoPB::FE_MASTER == node.node_type()) {
LOG(INFO) << "change to multi follower mode, force modify FE_MASTER to FE_FOLLOWER";
node.set_node_type(NodeInfoPB::FE_FOLLOWER);
}
// Check duplicated nodes, one node cannot deploy on multiple clusters
// diff instance_cluster's nodes and req_cluster's nodes
for (auto& n : req_cluster.nodes()) {
Expand Down
2 changes: 1 addition & 1 deletion cloud/test/fdb_injection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ int main(int argc, char** argv) {
cloud::config::fdb_cluster_file_path = "fdb.cluster";
cloud::config::write_schema_kv = true;
cloud::config::enable_check_instance_id = false;
cloud::config::disable_loopback_address_for_ms = false;
cloud::config::enable_check_loopback_address_for_ms = false;

auto sp = SyncPoint::get_instance();
sp->enable_processing();
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ message ClusterPB {
message NodeInfoPB {
enum NodeType {
UNKNOWN = 0;
// lagacy logic for one-master-multi-observer mode
FE_MASTER = 1;
FE_OBSERVER = 2;
FE_FOLLOWER = 3;
Expand Down
Loading

0 comments on commit 920dbac

Please sign in to comment.