Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](cloud) Cloud enable fe deploy mode from master-observers to multi-followers #45255

Merged
merged 7 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,10 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000");
CONF_mInt64(max_num_aborted_txn, "100");

CONF_Bool(enable_check_instance_id, "true");

// FE type force change from FE_MASTER to FE_FOLLOWER
CONF_Bool(force_change_to_multi_follower_mode, "true");
deardeng marked this conversation as resolved.
Show resolved Hide resolved

// Check if ip eq 127.0.0.1, ms/recycler exit
CONF_Bool(prohibit_use_loopback_addresses, "true");
deardeng marked this conversation as resolved.
Show resolved Hide resolved
} // namespace doris::cloud::config
14 changes: 14 additions & 0 deletions cloud/src/common/network_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <sstream>
#include <vector>

#include "common/config.h"
#include "common/logging.h"

namespace doris::cloud {
Expand Down Expand Up @@ -158,8 +159,21 @@ static bool get_hosts_v4(std::vector<InetAddress>* hosts) {
return true;
}

// Check if ip eq 127.0.0.1, ms/recycler exit
static void check_is_loopback(const std::string& localhost_str) {
if (config::prohibit_use_loopback_addresses && "127.0.0.1" == localhost_str) {
LOG(WARNING) << "enable check prohibit use loopback addr, but localhost=" << localhost_str
<< ", so exit(-1)";
exit(-1);
}
}

std::string get_local_ip(const std::string& priority_networks) {
std::string localhost_str = butil::my_ip_cstr();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&localhost_str](int*) {
deardeng marked this conversation as resolved.
Show resolved Hide resolved
check_is_loopback(localhost_str);
LOG(INFO) << "at last, localhost=" << localhost_str;
});
if (priority_networks == "") {
LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" << localhost_str;
return localhost_str;
Expand Down
24 changes: 24 additions & 0 deletions cloud/src/meta-service/injection_point_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ static void register_suites() {
});
}

HttpResponse set_value(const std::string& point, const brpc::URI& uri) {
std::string value_str(http_query(uri, "value"));
int64_t value = 0;
try {
value = std::stol(value_str);
} catch (const std::exception& e) {
auto msg = fmt::format("invalid value:{}", value_str);
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point, value](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point << " set value=" << value;
*try_any_cast<int64_t*>(args[0]) = value;
});
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) {
std::string duration_str(http_query(uri, "duration"));
int64_t duration = 0;
Expand Down Expand Up @@ -136,6 +155,8 @@ HttpResponse handle_set(const brpc::URI& uri) {
return set_sleep(point, uri);
} else if (behavior == "return") {
return set_return(point, uri);
} else if (behavior == "set") {
deardeng marked this conversation as resolved.
Show resolved Hide resolved
return set_value(point, uri);
}

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown behavior: " + behavior);
Expand Down Expand Up @@ -202,6 +223,9 @@ HttpResponse handle_disable(const brpc::URI& uri) {

// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
// &name=${injection_point_name}&behavior=return" # return void

// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
// &name=${injection_point_name}&behavior=set&value=10" # set value 10
// ```

HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) {
Expand Down
57 changes: 57 additions & 0 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,14 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
for (auto& n : request->cluster().nodes()) {
NodeInfo node;
node.instance_id = request->instance_id();
// add fe node, force change type FE_MASTER to FE_FOLLOWER
if (config::force_change_to_multi_follower_mode && n.has_node_type() &&
n.node_type() == NodeInfoPB::FE_MASTER) {
auto& mutable_node = const_cast<std::decay_t<decltype(n)>&>(n);
mutable_node.set_node_type(NodeInfoPB::FE_FOLLOWER);
LOG(INFO) << "add fe master node, force change it to follwer type, n="
<< mutable_node.DebugString();
}
node.node_info = n;
node.cluster_id = request->cluster().cluster_id();
node.cluster_name = request->cluster().cluster_name();
Expand Down Expand Up @@ -2273,6 +2281,14 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
code = MetaServiceCode::UNDEFINED_ERR;
}

// ugly but easy to repair
// not change cloud.proto add err_code
if (request->op() == AlterClusterRequest::DROP_NODE &&
msg.find("not found") != std::string::npos) {
// see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return http status code 404
code = MetaServiceCode::CLUSTER_NOT_FOUND;
}

if (code != MetaServiceCode::OK) return;

auto f = new std::function<void()>([instance_id = request->instance_id(), txn_kv = txn_kv_] {
Expand Down Expand Up @@ -2386,6 +2402,7 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
response->mutable_cluster()->CopyFrom(instance.clusters());
LOG_EVERY_N(INFO, 100) << "get all cluster info, " << msg;
} else {
bool is_instance_changed = false;
for (int i = 0; i < instance.clusters_size(); ++i) {
auto& c = instance.clusters(i);
std::set<std::string> mysql_users;
Expand All @@ -2395,12 +2412,52 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
if ((c.has_cluster_name() && c.cluster_name() == cluster_name) ||
(c.has_cluster_id() && c.cluster_id() == cluster_id) ||
mysql_users.count(mysql_user_name)) {
// stock master-observers auto change to multi-followers
int64_t config_need_change =
config::force_change_to_multi_follower_mode == true ? 1 : 0;
TEST_SYNC_POINT_CALLBACK("MetaServiceImpl_get_cluster_set_config",
&config_need_change);
if (config_need_change && c.type() == ClusterPB::SQL) {
for (auto& node : c.nodes()) {
if (node.node_type() == NodeInfoPB::FE_MASTER) {
auto& mutable_node = const_cast<std::decay_t<decltype(node)>&>(node);
auto now_time = std::chrono::system_clock::now();
uint64_t time = std::chrono::duration_cast<std::chrono::seconds>(
now_time.time_since_epoch())
.count();
mutable_node.set_mtime(time);
mutable_node.set_node_type(NodeInfoPB::FE_FOLLOWER);
is_instance_changed = true;
LOG(INFO) << "enable force change master-observers mode to "
"multi-followers, so change FE type to FE_FOLLOWER, node="
<< mutable_node.DebugString();
}
}
}
// just one cluster
response->add_cluster()->CopyFrom(c);
LOG_EVERY_N(INFO, 100) << "found a cluster, instance_id=" << instance.instance_id()
<< " cluster=" << msg;
}
}
if (is_instance_changed) {
val = instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
return;
}

txn->put(key, val);
LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key)
<< " json=" << proto_to_json(instance);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}
}
}

if (response->cluster().empty()) {
Expand Down
Loading
Loading