Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](cloud) Cloud enable fe deploy mode from master-observers to multi-followers #45255

Merged
merged 7 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,7 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000");
CONF_mInt64(max_num_aborted_txn, "100");

CONF_Bool(enable_check_instance_id, "true");

// Check if ip eq 127.0.0.1, ms/recycler exit
CONF_Bool(enable_loopback_address_for_ms, "false");
} // namespace doris::cloud::config
11 changes: 11 additions & 0 deletions cloud/src/common/network_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <sstream>
#include <vector>

#include "common/config.h"
#include "common/logging.h"

namespace doris::cloud {
Expand Down Expand Up @@ -160,6 +161,16 @@ static bool get_hosts_v4(std::vector<InetAddress>* hosts) {

std::string get_local_ip(const std::string& priority_networks) {
std::string localhost_str = butil::my_ip_cstr();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&localhost_str](int*) {
deardeng marked this conversation as resolved.
Show resolved Hide resolved
// Check if ip eq 127.0.0.1, ms/recycler exit
LOG(INFO) << "get the IP for ms is " << localhost_str;
if (config::enable_loopback_address_for_ms || localhost_str != "127.0.0.1") return;
LOG(WARNING) << "localhost IP is loopback address (127.0.0.1), "
<< "there may be multiple NICs for use, "
<< "please set priority_network with a CIDR expression in doris_cloud.conf "
<< "to choose a non-loopback address accordingly";
exit(-1);
});
if (priority_networks == "") {
LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" << localhost_str;
return localhost_str;
Expand Down
120 changes: 119 additions & 1 deletion cloud/src/meta-service/injection_point_http.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand All @@ -18,6 +17,7 @@

#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
#include <rapidjson/document.h>

#include "common/config.h"
#include "common/logging.h"
Expand All @@ -34,6 +34,11 @@ namespace doris::cloud {
std::map<std::string, std::function<void()>> suite_map;
std::once_flag register_suites_once;

// define a struct to store value only
struct TypedValue {
std::variant<int64_t, bool, std::string> value;
};

inline std::default_random_engine make_random_engine() {
return std::default_random_engine(
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
Expand Down Expand Up @@ -88,6 +93,108 @@ static void register_suites() {
});
}

bool url_decode(const std::string& in, std::string* out) {
out->clear();
out->reserve(in.size());

for (size_t i = 0; i < in.size(); ++i) {
if (in[i] == '%') {
if (i + 3 <= in.size()) {
int value = 0;
std::istringstream is(in.substr(i + 1, 2));

if (is >> std::hex >> value) {
(*out) += static_cast<char>(value);
i += 2;
} else {
return false;
}
} else {
return false;
}
} else if (in[i] == '+') {
(*out) += ' ';
} else {
(*out) += in[i];
}
}

return true;
}

HttpResponse set_value(const std::string& point, const brpc::URI& uri) {
std::string value_str(http_query(uri, "value"));
std::string decoded_value;
if (!url_decode(value_str, &decoded_value)) {
auto msg = fmt::format("failed to decode value: {}", value_str);
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}
rapidjson::Document doc;
if (doc.Parse(decoded_value.c_str()).HasParseError()) {
auto msg = fmt::format("invalid json value: {}", decoded_value);
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}

if (!doc.IsArray()) {
auto msg = "value must be a json array";
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}

// use vector to keep order
std::vector<TypedValue> parsed_values;

for (const auto& value : doc.GetArray()) {
TypedValue typed_value;
try {
if (value.IsBool()) {
typed_value.value = value.GetBool();
} else if (value.IsInt64()) {
typed_value.value = value.GetInt64();
} else if (value.IsString()) {
typed_value.value = value.GetString();
} else {
auto msg = "value must be boolean, integer or string";
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}
parsed_values.push_back(std::move(typed_value));
} catch (const std::exception& e) {
auto msg = fmt::format("failed to parse value");
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point, parsed_values = std::move(parsed_values)](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point;
for (size_t i = 0; i < parsed_values.size(); i++) {
const auto& typed_value = parsed_values[i];
std::visit(
[&](const auto& v) {
LOG(INFO) << "index=" << i << " value=" << v
<< " type=" << typeid(v).name();
if constexpr (std::is_same_v<std::decay_t<decltype(v)>, int64_t>) {
// process int64_t
*try_any_cast<int64_t*>(args[i]) = v;
} else if constexpr (std::is_same_v<std::decay_t<decltype(v)>, bool>) {
// process bool
*try_any_cast<bool*>(args[i]) = v;
} else if constexpr (std::is_same_v<std::decay_t<decltype(v)>,
std::string>) {
// process string
*try_any_cast<std::string*>(args[i]) = v;
}
},
typed_value.value);
}
});
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) {
std::string duration_str(http_query(uri, "duration"));
int64_t duration = 0;
Expand Down Expand Up @@ -136,6 +243,8 @@ HttpResponse handle_set(const brpc::URI& uri) {
return set_sleep(point, uri);
} else if (behavior == "return") {
return set_return(point, uri);
} else if (behavior == "change_args") {
return set_value(point, uri);
}

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown behavior: " + behavior);
Expand Down Expand Up @@ -202,6 +311,15 @@ HttpResponse handle_disable(const brpc::URI& uri) {

// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
// &name=${injection_point_name}&behavior=return" # return void

// ATTN: change_args use uri encode, see example test_ms_api.groovy
// use inject point in cpp file
// bool testBool = false;
// std::string testString = "world";
// TEST_SYNC_POINT_CALLBACK("resource_manager::set_safe_drop_time", &exceed_time, &testBool, &testString);

// curl http://175.40.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&o
// p=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=%5B-1%2Ctrue%2C%22hello%22%5D
// ```

HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) {
Expand Down
27 changes: 27 additions & 0 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,14 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
code = MetaServiceCode::UNDEFINED_ERR;
}

// ugly but easy to repair
// not change cloud.proto add err_code
if (request->op() == AlterClusterRequest::DROP_NODE &&
msg.find("not found") != std::string::npos) {
// see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return http status code 404
code = MetaServiceCode::CLUSTER_NOT_FOUND;
}

if (code != MetaServiceCode::OK) return;

auto f = new std::function<void()>([instance_id = request->instance_id(), txn_kv = txn_kv_] {
Expand Down Expand Up @@ -2386,6 +2394,7 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
response->mutable_cluster()->CopyFrom(instance.clusters());
LOG_EVERY_N(INFO, 100) << "get all cluster info, " << msg;
} else {
bool is_instance_changed = false;
for (int i = 0; i < instance.clusters_size(); ++i) {
auto& c = instance.clusters(i);
std::set<std::string> mysql_users;
Expand All @@ -2401,6 +2410,24 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
<< " cluster=" << msg;
}
}
if (is_instance_changed) {
val = instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
return;
}

txn->put(key, val);
LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key)
<< " json=" << proto_to_json(instance);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}
}
}

if (response->cluster().empty()) {
Expand Down
Loading
Loading