diff --git a/.github/workflows/hqps-db-ci.yml b/.github/workflows/hqps-db-ci.yml index 13b16950763b..7a0fb5686ec4 100644 --- a/.github/workflows/hqps-db-ci.yml +++ b/.github/workflows/hqps-db-ci.yml @@ -163,6 +163,13 @@ jobs: bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./engine_config_test.yaml python sed -i 's/default_graph: modern_graph/default_graph: ldbc/g' ./engine_config_test.yaml + - name: Proxy Server test + env: + INTERACTIVE_WORKSPACE: /tmp/interactive_workspace + run: | + cd ${GITHUB_WORKSPACE}/flex/tests/hqps + bash hqps_proxy_server_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml + - name: Sample Query test env: GS_TEST_DIR: ${{ github.workspace }}/gstest diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index a4a8ded53e4c..c04a477a589e 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -15,6 +15,7 @@ option(BUILD_TEST "Whether to build test" ON) option(BUILD_DOC "Whether to build doc" OFF) option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF) option(USE_PTHASH "Whether to use pthash" OFF) +option(BUILD_PROXY "Whether to build proxy" ON) #print options message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}") diff --git a/flex/bin/CMakeLists.txt b/flex/bin/CMakeLists.txt index 6e9ea1266127..7e599622566b 100644 --- a/flex/bin/CMakeLists.txt +++ b/flex/bin/CMakeLists.txt @@ -34,6 +34,12 @@ if(BUILD_HQPS) install(PROGRAMS load_plan_and_gen.sh DESTINATION bin) endif() +if (BUILD_PROXY) + add_executable(proxy_server proxy_server.cc) + target_link_libraries(proxy_server flex_utils flex_server ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES}) + install_without_export_flex_target(proxy_server) +endif() + include_directories(${Boost_INCLUDE_DIRS}) add_executable(bulk_loader bulk_loader.cc) target_link_libraries(bulk_loader flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES}) diff --git a/flex/bin/proxy_server.cc b/flex/bin/proxy_server.cc new file mode 100644 index 000000000000..89997a15f318 --- /dev/null +++ b/flex/bin/proxy_server.cc @@ -0,0 +1,115 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "stdlib.h" + +#include "flex/engines/hqps_db/core/utils/hqps_utils.h" +#include "flex/engines/http_server/service/proxy_service.h" +#include "flex/utils/service_utils.h" + +#include + +#include + +namespace bpo = boost::program_options; + +namespace gs { +// Function to parse endpoints from a string +bool parse_endpoints(const std::string& input_string, + std::vector>& endpoints) { + std::istringstream iss(input_string); + std::string endpoint; + + while (std::getline(iss, endpoint, ',')) { + // Split the endpoint into host and port using ':' + size_t delimiter_pos = endpoint.find(':'); + if (delimiter_pos == std::string::npos) { + std::cerr << "Invalid endpoint: " << endpoint << ", missing delimiter ':'" + << std::endl; + continue; + } + + std::string host = endpoint.substr(0, delimiter_pos); + std::string port_str = endpoint.substr(delimiter_pos + 1); + uint16_t port; + try { + port = std::stoull(port_str); + } catch (const std::invalid_argument& e) { + LOG(ERROR) << "Invalid port: " << port_str << ", must be a number" + << std::endl; + return false; + } + + // Check for valid port range + if (port < 1 || port > 65535) { + LOG(ERROR) << "Invalid port: " << port << ", must be between 1 and 65535" + << std::endl; + return false; + } + endpoints.push_back({host, port}); + } + return true; +} +} // namespace gs + +/** + * The main entrance for ProxyServer. + */ +int main(int argc, char** argv) { + bpo::options_description desc("Usage:"); + desc.add_options()("help,h", "Display help messages")( + "endpoints,e", bpo::value()->required(), + "The endpoints of the proxy server, e.g., {ip}:{port},{ip}:{port},...")( + "heartbeat-interval,i", bpo::value()->default_value(1), + "The interval of heartbeat check in seconds"); + + setenv("TZ", "Asia/Shanghai", 1); + tzset(); + + bpo::variables_map vm; + bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm); + bpo::notify(vm); + + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + + if (!vm.count("endpoints")) { + LOG(FATAL) << "endpoints is not specified"; + return 0; + } + std::vector> endpoints; + if (!gs::parse_endpoints(vm["endpoints"].as(), endpoints)) { + LOG(FATAL) << "Failed to parse endpoints"; + return 0; + } + + LOG(INFO) << "got endpoints of size: " << endpoints.size() + << ", :" << gs::to_string(endpoints); + + uint32_t shard_num = 1; + uint16_t http_port = 9999; + + if (!server::ProxyService::get().init(shard_num, http_port, endpoints).ok()) { + LOG(FATAL) << "Failed to init ProxyService"; + return 0; + } + server::ProxyService::get().run_and_wait_for_exit(); + + return 0; +} diff --git a/flex/engines/hqps_db/core/utils/hqps_utils.h b/flex/engines/hqps_db/core/utils/hqps_utils.h index 5cae552776e0..781693e74e49 100644 --- a/flex/engines/hqps_db/core/utils/hqps_utils.h +++ b/flex/engines/hqps_db/core/utils/hqps_utils.h @@ -803,6 +803,13 @@ struct to_string_impl { } }; +template <> +struct to_string_impl { + static inline std::string to_string(const uint16_t& empty) { + return std::to_string((int32_t) empty); + } +}; + template <> struct to_string_impl { static inline std::string to_string(const int64_t& empty) { diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt index 07cd7034d813..a7c80d3d1419 100644 --- a/flex/engines/http_server/CMakeLists.txt +++ b/flex/engines/http_server/CMakeLists.txt @@ -11,6 +11,10 @@ if (Hiactor_FOUND) list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*") endif () + if (NOT BUILD_PROXY) + list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*proxy.*") + endif () + # get all .cc files in current directory, except for generated/ file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") list(FILTER SERVER_FILES EXCLUDE REGEX ".*generated.*") @@ -22,6 +26,14 @@ if (Hiactor_FOUND) list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*") endif () + if (NOT BUILD_PROXY) + list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_actor*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_http*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_service*") + endif () + + message(STATUS "SERVER_FILES: ${SERVER_FILES}") + add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files}) add_dependencies(flex_server server_actor_autogen) target_compile_options (flex_server diff --git a/flex/engines/http_server/actor/proxy_actor.act.cc b/flex/engines/http_server/actor/proxy_actor.act.cc new file mode 100644 index 000000000000..60d27e3d2244 --- /dev/null +++ b/flex/engines/http_server/actor/proxy_actor.act.cc @@ -0,0 +1,81 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/actor/proxy_actor.act.h" +#include "flex/engines/http_server/service/proxy_service.h" + +#include "nlohmann/json.hpp" + +#include + +namespace server { + +proxy_actor::~proxy_actor() { + // finalization + // ... +} + +proxy_actor::proxy_actor(hiactor::actor_base* exec_ctx, + const hiactor::byte_t* addr) + : hiactor::actor(exec_ctx, addr) { + set_max_concurrency(1); // set max concurrency for task reentrancy (stateful) + // initialization + // ... +} + +seastar::future proxy_actor::do_query( + proxy_request&& request_payload) { + auto& request = request_payload.content; + VLOG(10) << "proxy_actor::forward_request, method: " << request->_method + << ", path: " << request->_url << ", query: " << request->content; + + // recover the old url with paramters in request + auto& proxy_service = ProxyService::get(); + auto& client = proxy_service.get_client(); + return client + .forward_request(request->_url, request->_method, request->content, + request->_headers) + .then([&proxy_service](HttpForwardingResponses&& content) { + if (content.size() == 0) { + return seastar::make_exception_future( + std::runtime_error("Got no responses when forwarding request " + "to interactive servers.")); + } + // Check all responses are ok, if not ok, return error + seastar::sstring res_string; + for (size_t i = 0; i < content.size(); ++i) { + auto& response = content[i]; + if (response.first != 200) { + LOG(ERROR) << "Got error response when forwarding request " + "to interactive servers at index: " + << std::to_string(i) << ", endpoint: " + << proxy_service.get_endpoints()[i].first + ":" + << std::to_string( + proxy_service.get_endpoints()[i].second) + << std::to_string(response.first) + ", msg:" + << response.second; + } else { + // Select the first/last response as the final result? or check + // whether all responses are the same? + res_string = response.second; + break; + } + } + return seastar::make_ready_future( + query_result{std::move(res_string)}); + }); +} + +} // namespace server diff --git a/flex/engines/http_server/actor/proxy_actor.act.h b/flex/engines/http_server/actor/proxy_actor.act.h new file mode 100644 index 000000000000..826c2b44bc47 --- /dev/null +++ b/flex/engines/http_server/actor/proxy_actor.act.h @@ -0,0 +1,45 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_ +#define ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_ + + +#include "flex/engines/http_server/types.h" + +#include +#include +#include + +namespace server { + +class ANNOTATION(actor:impl) proxy_actor : public hiactor::actor { + + public: + proxy_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr); + ~proxy_actor() override; + + seastar::future ANNOTATION(actor:method) do_query(proxy_request&& param); + + // DECLARE_RUN_QUERIES; + /// Declare `do_work` func here, no need to implement. + ACTOR_DO_WORK() + + private: + int32_t your_private_members_ = 0; +}; +} + +#endif // ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_ \ No newline at end of file diff --git a/flex/engines/http_server/handler/admin_http_handler.cc b/flex/engines/http_server/handler/admin_http_handler.cc index a26dda870d58..6139385b69d2 100644 --- a/flex/engines/http_server/handler/admin_http_handler.cc +++ b/flex/engines/http_server/handler/admin_http_handler.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "flex/engines/http_server/generated/actor/admin_actor_ref.act.autogen.h" #include "flex/engines/http_server/types.h" diff --git a/flex/engines/http_server/handler/admin_http_handler.h b/flex/engines/http_server/handler/admin_http_handler.h index 14465e459a95..c7774b56934f 100644 --- a/flex/engines/http_server/handler/admin_http_handler.h +++ b/flex/engines/http_server/handler/admin_http_handler.h @@ -16,13 +16,15 @@ #ifndef ENGINES_HTTP_SERVER_HANDLER_ADMIN_HTTP_HANDLER_H_ #define ENGINES_HTTP_SERVER_HANDLER_ADMIN_HTTP_HANDLER_H_ -#include -#include #include #include "flex/engines/http_server/handler/http_utils.h" #include "flex/engines/http_server/types.h" #include "flex/utils/service_utils.h" +#include +#include +#include + namespace server { class InteractiveAdminService; diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 36e9d0bc29ff..b729a62ef24d 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -56,6 +56,21 @@ class HttpTextMapCarrier namespace server { +hqps_heartbeat_handler::hqps_heartbeat_handler() {} + +hqps_heartbeat_handler::~hqps_heartbeat_handler() = default; + +// TODO: return snapshot_id. +seastar::future> +hqps_heartbeat_handler::handle(const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) { + rep->write_body("bin", seastar::sstring{"OK"}); + rep->done(); + return seastar::make_ready_future>( + std::move(rep)); +} + hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency) @@ -377,6 +392,7 @@ hqps_http_handler::hqps_http_handler(uint16_t http_port) ic_adhoc_group_id, codegen_group_id, max_group_id, group_inc_step, shard_adhoc_concurrency); exit_handler_ = new hqps_exit_handler(); + heart_beat_handler_ = new hqps_heartbeat_handler(); } hqps_http_handler::~hqps_http_handler() { @@ -469,6 +485,8 @@ seastar::future<> hqps_http_handler::set_routes() { r.add(seastar::httpd::operation_type::POST, seastar::httpd::url("/interactive/adhoc_query"), adhoc_query_handler_); + r.add(seastar::httpd::operation_type::GET, + seastar::httpd::url("/heartbeat"), heart_beat_handler_); r.add(seastar::httpd::operation_type::POST, seastar::httpd::url("/interactive/exit"), exit_handler_); return seastar::make_ready_future<>(); diff --git a/flex/engines/http_server/handler/hqps_http_handler.h b/flex/engines/http_server/handler/hqps_http_handler.h index 13c3a8f02a1f..1f90ba3f508a 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.h +++ b/flex/engines/http_server/handler/hqps_http_handler.h @@ -24,6 +24,19 @@ namespace server { +class hqps_heartbeat_handler : public seastar::httpd::handler_base { + public: + hqps_heartbeat_handler(); + ~hqps_heartbeat_handler() override; + + seastar::future> handle( + const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) override; + + private: +}; + class hqps_ic_handler : public seastar::httpd::handler_base { public: hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id, @@ -117,6 +130,7 @@ class hqps_http_handler { hqps_ic_handler* ic_handler_; hqps_adhoc_query_handler* adhoc_query_handler_; hqps_exit_handler* exit_handler_; + hqps_heartbeat_handler* heart_beat_handler_; }; } // namespace server diff --git a/flex/engines/http_server/handler/http_proxy.cc b/flex/engines/http_server/handler/http_proxy.cc new file mode 100644 index 000000000000..8cd9063078f2 --- /dev/null +++ b/flex/engines/http_server/handler/http_proxy.cc @@ -0,0 +1,201 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/handler/http_proxy.h" +#include "flex/engines/hqps_db/core/utils/hqps_utils.h" + +#include + +namespace server { + +HeartBeatChecker::HeartBeatChecker( + std::vector& clients, + const std::vector>& endpoints, + int32_t heart_beat_interval) + : running_(false), + heart_beat_interval_(DEFAULT_HEART_BEAT_INTERVAL), + clients_(clients), + endpoints_(endpoints) { + endpoint_status_.resize(clients.size(), true); +} + +HeartBeatChecker::~HeartBeatChecker() { + if (running_) { + stop(); + } +} + +gs::Status HeartBeatChecker::start() { + running_ = true; + heartbeat_thread_ = std::thread(&HeartBeatChecker::check_heartbeat, this); + VLOG(10) << "HeartBeatChecker started"; + return gs::Status::OK(); +} + +gs::Status HeartBeatChecker::stop() { + running_ = false; + VLOG(10) << "Stopping HeartBeatChecker"; + while (!heartbeat_thread_.joinable()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + heartbeat_thread_.join(); + VLOG(10) << "HeartBeatChecker stopped"; + return gs::Status::OK(); +} + +void HeartBeatChecker::check_heartbeat() { + while (running_) { + for (size_t i = 0; i < clients_.size(); ++i) { + httplib::Client client(endpoints_[i].first, endpoints_[i].second); + auto res = client.Get("/"); + if (!res) { + LOG(ERROR) << "Failed to connect to endpoint at index: " << i; + endpoint_status_[i] = false; + } else { + VLOG(10) << "Heartbeat check to " << i << " is OK"; + endpoint_status_[i] = true; + } + } + std::this_thread::sleep_for(std::chrono::seconds(heart_beat_interval_)); + } +} + +const std::vector& HeartBeatChecker::get_endpoint_status() const { + return endpoint_status_; +} + +// Utils functions + +HttpForwardingResponse to_response(const httplib::Result& res) { + if (res.error() != httplib::Error::Success) { + LOG(ERROR) << "Failed to send request: " << res.error(); + return std::make_pair(500, "Failed to send request"); + } + return std::make_pair(res->status, res->body); +} + +// std::multimap; +httplib::Headers to_httplib_headers(const seastar_http_headers_t& headers) { + httplib::Headers httplib_headers; + for (auto& header : headers) { + // Those headers should not be forwarded, otherwise will cause error. + if (header.first == "Host" || header.first == "User-Agent" || + header.first == "Content-Length") { + continue; + } + httplib_headers.emplace(std::string(header.first.c_str()), + std::string(header.second.c_str())); + } + return httplib_headers; +} + +HttpProxy::HttpProxy() : initialized_(false) {} + +HttpProxy::~HttpProxy() { close(); } + +void HttpProxy::close() { + if (initialized_) { + heartbeat_checker_->stop(); + for (auto& client : clients_) { + client.stop(); + } + initialized_ = false; + } +} + +gs::Status HttpProxy::init( + const std::vector>& endpoints, + int32_t heart_beat_interval) { + endpoints_ = endpoints; + if (endpoints_.empty()) { + return gs::Status(gs::StatusCode::InValidArgument, "No endpoint provided"); + } + // TODO: check connection to endpoint, if not connected, return error + clients_.reserve(endpoints_.size()); + for (auto& endpoint : endpoints_) { + httplib::Client client(endpoint.first, endpoint.second); + client.set_connection_timeout(CONNECTION_TIMEOUT, 0); // 5s + client.set_read_timeout(READ_TIMEOUT, 0); // 10s + client.set_write_timeout(WRITE_TIMEOUT, 0); // 10s + clients_.emplace_back(std::move(client)); + } + // test connection + for (auto& client : clients_) { + auto res = client.Get("/heartbeat"); + if (!res) { + return gs::Status(gs::StatusCode::InternalError, + "Failed to connect to endpoint"); + } + } + // start heart beat check + heartbeat_checker_ = std::make_unique(clients_, endpoints_); + RETURN_IF_NOT_OK(heartbeat_checker_->start()); + initialized_ = true; + return gs::Status::OK(); +} + +seastar::future HttpProxy::forward_request( + const std::string& path, const std::string& method, const std::string& body, + const seastar_http_headers_t& headers) { + if (!initialized_) { + return seastar::make_ready_future( + HttpForwardingResponses{}); + } + std::vector> reply_futs; + // Get the status of the endpoints from last heartbeat check + const auto& endpoint_status = heartbeat_checker_->get_endpoint_status(); + for (size_t i = 0; i < clients_.size(); ++i) { + if (!endpoint_status[i]) { + LOG(WARNING) << "Endpoint at index " << i << " is not available"; + continue; + } + reply_futs.emplace_back( + do_send_request(path, method, body, headers, clients_[i])); + } + + return seastar::when_all_succeed(reply_futs.begin(), reply_futs.end()) + .then([](std::vector&& replies) { + LOG(INFO) << "Received " << replies.size() + << " replies: " << gs::to_string(replies); + return seastar::make_ready_future( + std::move(replies)); + }); +} + +seastar::future HttpProxy::do_send_request( + const std::string& path, const std::string& method, const std::string& body, + const seastar_http_headers_t& headers, httplib::Client& client) { + if (method == "GET") { + VLOG(10) << "Forwarding GET request to " << path; + // return do_send_get_request(path, body, headers, endpoint); + return seastar::make_ready_future( + to_response(client.Get(path.c_str(), to_httplib_headers(headers)))); + // to_response(client.Get(path.c_str()))); + } else if (method == "POST") { + return seastar::make_ready_future( + to_response(client.Post(path.c_str(), to_httplib_headers(headers), body, + "application/json"))); + } else if (method == "DELETE") { + return seastar::make_ready_future( + to_response(client.Delete(path.c_str(), to_httplib_headers(headers)))); + } else if (method == "PUT") { + return seastar::make_ready_future( + to_response(client.Put(path.c_str(), to_httplib_headers(headers), body, + "application/json"))); + } else { + return seastar::make_ready_future(); + } +} +} // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/handler/http_proxy.h b/flex/engines/http_server/handler/http_proxy.h new file mode 100644 index 000000000000..b604b4a1a293 --- /dev/null +++ b/flex/engines/http_server/handler/http_proxy.h @@ -0,0 +1,101 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef ENGINES_HTTP_SERVER_HANDLER_FORWARD_HTTP_CLIENT_H_ +#define ENGINES_HTTP_SERVER_HANDLER_FORWARD_HTTP_CLIENT_H_ + +#include + +#include "flex/third_party/httplib.h" +#include "flex/utils/result.h" + +#include +#include + +namespace server { + +class HeartBeatChecker { + public: + static constexpr int32_t DEFAULT_HEART_BEAT_INTERVAL = 2; // 2s + HeartBeatChecker( + std::vector& clients, + const std::vector>& endpoints, + int32_t heart_beat_interval = DEFAULT_HEART_BEAT_INTERVAL); + ~HeartBeatChecker(); + + gs::Status start(); + + gs::Status stop(); + + const std::vector& get_endpoint_status() const; + + private: + void check_heartbeat(); + + std::atomic running_; + int32_t heart_beat_interval_; + std::vector& clients_; + const std::vector>& endpoints_; + std::vector endpoint_status_; // to mark whether the endpoint is alive + std::thread heartbeat_thread_; +}; + +using HttpForwardingResponse = std::pair; +using HttpForwardingResponses = std::vector; +using seastar_http_headers_t = + std::unordered_map; + +// A wrapped http client which will send request to multiple endpoints and +// return the summary of the responses. +// It will do heartbeat check to the endpoints to make sure the endpoints are +// available. +// TODO(zhanglei): need to distinguish between read request and write request. +// For read request, we just need to send request to one endpoint, but for write +// request, we need to send request to all endpoints. +class HttpProxy { + public: + static constexpr int32_t CONNECTION_TIMEOUT = 5; // 5s + static constexpr int32_t READ_TIMEOUT = 10; // 5s + static constexpr int32_t WRITE_TIMEOUT = 10; // 10s + HttpProxy(); + ~HttpProxy(); + + gs::Status init( + const std::vector>& endpoints, + int32_t heart_beat_interval); + + void close(); + + seastar::future forward_request( + const std::string& path, const std::string& method, + const std::string& body, const seastar_http_headers_t& headers); + + private: + seastar::future do_send_request( + const std::string& path, const std::string& method, + const std::string& body, const seastar_http_headers_t& headers, + httplib::Client& client); + + std::vector> endpoints_; // ip and ports + std::atomic initialized_; + std::vector clients_; + + std::unique_ptr heartbeat_checker_; +}; + +} // namespace server + +#endif // ENGINES_HTTP_SERVER_HANDLER_FORWARD_HTTP_CLIENT_H_ diff --git a/flex/engines/http_server/handler/http_utils.h b/flex/engines/http_server/handler/http_utils.h index e303beb68d35..12cb5ee2eda6 100644 --- a/flex/engines/http_server/handler/http_utils.h +++ b/flex/engines/http_server/handler/http_utils.h @@ -12,9 +12,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include "flex/engines/http_server/types.h" #include "flex/utils/result.h" -#include "seastar/http/reply.hh" #ifndef ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ #define ENGINES_HTTP_SERVER_HANDLER_HTTP_UTILS_H_ diff --git a/flex/engines/http_server/handler/proxy_http_handler.cc b/flex/engines/http_server/handler/proxy_http_handler.cc new file mode 100644 index 000000000000..d3819fd50c18 --- /dev/null +++ b/flex/engines/http_server/handler/proxy_http_handler.cc @@ -0,0 +1,93 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/handler/proxy_http_handler.h" + +#include "flex/engines/http_server/executor_group.actg.h" +#include "flex/engines/http_server/options.h" + +#include "flex/engines/http_server/types.h" + +namespace server { + +proxy_http_forward_handler::proxy_http_forward_handler( + uint32_t group_id, uint32_t shard_concurrency) + : executor_idx_(0), shard_concurrency_(shard_concurrency) { + executor_refs_.reserve(shard_concurrency_); + hiactor::scope_builder builder; + builder.set_shard(hiactor::local_shard_id()) + .enter_sub_scope(hiactor::scope(0)) + .enter_sub_scope(hiactor::scope(group_id)); + for (unsigned i = 0; i < shard_concurrency_; ++i) { + executor_refs_.emplace_back(builder.build_ref(i)); + } +} + +seastar::future> +proxy_http_forward_handler::handle(const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) { + auto dst_executor = executor_idx_; + executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; + + return executor_refs_[dst_executor] + .do_query(proxy_request{std::move(req)}) + .then_wrapped( + [rep = std::move(rep)](seastar::future&& fut) mutable { + if (__builtin_expect(fut.failed(), false)) { + return seastar::make_exception_future< + std::unique_ptr>(fut.get_exception()); + } + auto result = fut.get0(); + rep->write_body("bin", std::move(result.content)); + rep->done(); + return seastar::make_ready_future< + std::unique_ptr>(std::move(rep)); + }); +} + +proxy_http_handler::proxy_http_handler(uint16_t http_port) + : http_port_(http_port) {} + +void proxy_http_handler::start() { + auto fut = seastar::alien::submit_to( + *seastar::alien::internal::default_instance, 0, [this] { + return server_.start() + .then([this] { return set_routes(); }) + .then([this] { return server_.listen(http_port_); }) + .then([this] { + fmt::print("Http handler is listening on port {} ...\n", + http_port_); + }); + }); + fut.wait(); +} + +void proxy_http_handler::stop() { + auto fut = + seastar::alien::submit_to(*seastar::alien::internal::default_instance, 0, + [this] { return server_.stop(); }); + fut.wait(); +} + +seastar::future<> proxy_http_handler::set_routes() { + return server_.set_routes([](seastar::httpd::routes& r) { + r.add_default_handler(new proxy_http_forward_handler( + proxy_group_id, shard_proxy_concurrency)); + return seastar::make_ready_future<>(); + }); +} + +} // namespace server \ No newline at end of file diff --git a/flex/engines/http_server/handler/proxy_http_handler.h b/flex/engines/http_server/handler/proxy_http_handler.h new file mode 100644 index 000000000000..b360f9390608 --- /dev/null +++ b/flex/engines/http_server/handler/proxy_http_handler.h @@ -0,0 +1,63 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_HTTP_SERVER_HANDLER_PROXY_HTTP_HANDLER_H_ +#define ENGINES_HTTP_SERVER_HANDLER_PROXY_HTTP_HANDLER_H_ + +#include +#include +#include +#include + +#include "flex/engines/http_server/generated/actor/proxy_actor_ref.act.autogen.h" + +namespace server { + +class proxy_http_forward_handler : public seastar::httpd::handler_base { + public: + proxy_http_forward_handler(uint32_t group_id, uint32_t shard_concurrency); + + ~proxy_http_forward_handler() = default; + + seastar::future> handle( + const seastar::sstring& path, + std::unique_ptr req, + std::unique_ptr rep) override; + + private: + uint32_t executor_idx_; + const uint32_t shard_concurrency_; + std::vector executor_refs_; +}; + +// TODO: How to distinguish between read requests and write requests? +class proxy_http_handler { + public: + proxy_http_handler(uint16_t http_port); + + void start(); + void stop(); + + private: + seastar::future<> set_routes(); + + private: + const uint16_t http_port_; + seastar::httpd::http_server_control server_; +}; + +} // namespace server + +#endif // ENGINES_HTTP_SERVER_HANDLER_PROXY_HTTP_HANDLER_H_ \ No newline at end of file diff --git a/flex/engines/http_server/options.cc b/flex/engines/http_server/options.cc index 2f7c0441acf3..372a1465743b 100644 --- a/flex/engines/http_server/options.cc +++ b/flex/engines/http_server/options.cc @@ -25,5 +25,6 @@ uint32_t shard_admin_procedure_concurrency = 1; uint32_t shard_admin_node_concurrency = 1; uint32_t shard_admin_job_concurrency = 1; uint32_t shard_admin_service_concurrency = 1; +uint32_t shard_proxy_concurrency = 16; // same as shard_query_concurrency } // namespace server diff --git a/flex/engines/http_server/options.h b/flex/engines/http_server/options.h index 732e19eecd33..3948b8d1b6be 100644 --- a/flex/engines/http_server/options.h +++ b/flex/engines/http_server/options.h @@ -27,6 +27,7 @@ const uint32_t ic_query_group_id = 2; const uint32_t ic_update_group_id = 3; const uint32_t ic_adhoc_group_id = 4; const uint32_t codegen_group_id = 5; +const uint32_t proxy_group_id = 6; const uint32_t max_group_id = std::numeric_limits::max(); const uint32_t group_inc_step = @@ -41,6 +42,7 @@ extern uint32_t shard_admin_node_concurrency; extern uint32_t shard_admin_service_concurrency; extern uint32_t shard_admin_job_concurrency; extern uint32_t shard_admin_procedure_concurrency; +extern uint32_t shard_proxy_concurrency; } // namespace server diff --git a/flex/engines/http_server/service/proxy_service.cc b/flex/engines/http_server/service/proxy_service.cc new file mode 100644 index 000000000000..ecbb3cec7010 --- /dev/null +++ b/flex/engines/http_server/service/proxy_service.cc @@ -0,0 +1,60 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/http_server/service/proxy_service.h" + +namespace server { + +gs::Status ProxyService::init( + uint32_t num_shards, uint16_t http_port, + const std::vector>& endpoints, + int32_t heart_beat_interval) { + proxy_port_ = http_port; + endpoints_ = endpoints; + actor_sys_ = std::make_unique(num_shards, false); + http_hdl_ = std::make_unique(http_port); + if (!client.init(endpoints, heart_beat_interval).ok()) { + LOG(ERROR) << "Failed to init HttpProxy"; + return gs::Status(gs::StatusCode::InternalError, + "Failed to init HttpProxy"); + } + return gs::Status::OK(); +} + +void ProxyService::run_and_wait_for_exit() { + if (!actor_sys_ || !http_hdl_) { + std::cerr << "GraphDB service has not been inited!" << std::endl; + return; + } + actor_sys_->launch(); + http_hdl_->start(); + running_.store(true); + while (running_.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + http_hdl_->stop(); + actor_sys_->terminate(); +} + +const std::vector>& +ProxyService::get_endpoints() const { + return endpoints_; +} + +HttpProxy& ProxyService::get_client() { return client; } + +void ProxyService::set_exit_state() { running_.store(false); } + +} // namespace server diff --git a/flex/engines/http_server/service/proxy_service.h b/flex/engines/http_server/service/proxy_service.h new file mode 100644 index 000000000000..a758cf595a7a --- /dev/null +++ b/flex/engines/http_server/service/proxy_service.h @@ -0,0 +1,64 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_HTTP_SERVER_SERVICE_PROXY_SERVICE_H_ +#define ENGINES_HTTP_SERVER_SERVICE_PROXY_SERVICE_H_ + +#include +#include +#include +#include + +#include "flex/engines/http_server/actor_system.h" +#include "flex/engines/http_server/handler/http_proxy.h" +#include "flex/engines/http_server/handler/proxy_http_handler.h" +#include "flex/utils/result.h" +#include "flex/utils/service_utils.h" + +namespace server { +class ProxyService { + public: + static ProxyService& get() { + static ProxyService instance; + return instance; + } + + ~ProxyService() = default; + + gs::Status init( + uint32_t num_shards, uint16_t http_port, + const std::vector>& endpoints, + int32_t heart_beat_interval = + HeartBeatChecker::DEFAULT_HEART_BEAT_INTERVAL); + void run_and_wait_for_exit(); + const std::vector>& get_endpoints() const; + void set_exit_state(); + + HttpProxy& get_client(); + + private: + ProxyService() = default; + + private: + uint32_t proxy_port_; + std::vector> endpoints_; + std::unique_ptr actor_sys_; + std::unique_ptr http_hdl_; + std::atomic running_{false}; + HttpProxy client; +}; +} // namespace server + +#endif // ENGINES_HTTP_SERVER_SERVICE_PROXY_SERVICE_H_ diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index f43cd6d4fe08..e514f8d83059 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -20,8 +20,10 @@ #include #include #include +#include #include "flex/utils/service_utils.h" +#include #include namespace server { @@ -54,6 +56,7 @@ struct payload { }; using query_param = payload; +using proxy_request = payload>; using query_result = payload; using admin_query_result = payload>; // url_path, query_param diff --git a/flex/tests/hqps/hqps_proxy_server_test.sh b/flex/tests/hqps/hqps_proxy_server_test.sh new file mode 100644 index 000000000000..fd7be06a7cc1 --- /dev/null +++ b/flex/tests/hqps/hqps_proxy_server_test.sh @@ -0,0 +1,161 @@ +#!/bin/bash +# Copyright 2020 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -e + +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' # No Color +err() { + echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2 +} + +info() { + echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}" +} + +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) +FLEX_HOME=${SCRIPT_DIR}/../../ +INTERACITIVE_SERVER_BIN=${FLEX_HOME}/build/bin/interactive_server +PROXY_SERVER_BIN=${FLEX_HOME}/build/bin/proxy_server +GIE_HOME=${FLEX_HOME}/../interactive_engine/ +ENGINE_CONFIG_PATH_WORKER1=/tmp/interactive_engine_config_worker1.yaml +ENGINE_CONFIG_PATH_WORKER2=/tmp/interactive_engine_config_worker2.yaml + +if [ $# -lt 2 ] || [ $# -ge 3 ]; then + echo "Receives: $# args, need 2 args" + echo "Usage: $0 " + exit 1 +fi + +INTERACTIVE_WORKSPACE=$1 +ENGINE_CONFIG_PATH=$2 +info "INTERACTIVE_WORKSPACE: ${INTERACTIVE_WORKSPACE}" +info "ENGINE_CONFIG_PATH: ${ENGINE_CONFIG_PATH}" + +kill_service(){ + info "Kill Service first" + ps -ef | grep "com.alibaba.graphscope.GraphServer" | awk '{print $2}' | xargs kill -9 || true + ps -ef | grep "interactive_server" | awk '{print $2}' | xargs kill -9 || true + ps -ef | grep "/bin/proxy_server" | awk '{print $2}' | xargs kill -9 || true + sleep 3 + # check if service is killed + info "Kill Service success" +} + +# kill service when exit +trap kill_service EXIT + +# create two copy of engine config, for two workers. +prepare_engine_config() { + if [ -f ${ENGINE_CONFIG_PATH_WORKER1} ]; then + rm -f ${ENGINE_CONFIG_PATH_WORKER1} + fi + if [ -f ${ENGINE_CONFIG_PATH_WORKER2} ]; then + rm -f ${ENGINE_CONFIG_PATH_WORKER2} + fi + cp ${ENGINE_CONFIG_PATH} ${ENGINE_CONFIG_PATH_WORKER1} + cp ${ENGINE_CONFIG_PATH} ${ENGINE_CONFIG_PATH_WORKER2} + # modify the engine config + sed -i "s/localhost:10000/localhost:10001/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/port: 7687/port: 7688/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/port: 8182/port: 8183/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/admin_port: 7777/admin_port: 7778/g" ${ENGINE_CONFIG_PATH_WORKER2} + sed -i "s/query_port: 10000/query_port: 10001/g" ${ENGINE_CONFIG_PATH_WORKER2} +} + +start_worker() { + info "start worker1" + graph_yaml=${INTERACTIVE_WORKSPACE}/data/modern_graph/graph.yaml + indices_path=${INTERACTIVE_WORKSPACE}/data/modern_graph/indices + base_cmd="${INTERACITIVE_SERVER_BIN} -g ${graph_yaml}" + base_cmd=" ${base_cmd} --data-path ${indices_path}" + cmd1=" ${base_cmd} -c ${ENGINE_CONFIG_PATH_WORKER1}" + cmd2=" ${base_cmd} -c ${ENGINE_CONFIG_PATH_WORKER2}" + info "Start worker1 with command: ${cmd1}" + ${cmd1} & + sleep 2 + info "Start worker2 with command: ${cmd2}" + ${cmd2} & + sleep 2 + # check whether interactive_server has two process running + cnt=`ps -ef | grep "bin/interactive_server" | grep -v grep | wc -l` + if [ ${cnt} -ne 2 ]; then + err "Start worker failed, expect 2 interactive_server process, but got ${cnt}" + exit 1 + fi + info "Start worker success" +} + +start_proxy() { + info "start proxy server" + cmd="${PROXY_SERVER_BIN} -e localhost:10000,localhost:10001" + info "Start proxy server with command: ${cmd}" + ${cmd} & + sleep 2 + # check whether proxy_server is running + cnt=`ps -ef | grep "bin/proxy_server" | grep -v grep | wc -l` + if [ ${cnt} -ne 1 ]; then + err "Start proxy server failed, expect 1 proxy_server process, but got ${cnt}" + exit 1 + fi + info "Start proxy server success" +} + + +test_proxy() { + # First check whether proxy server is running, if not, exit + cnt=`ps -ef | grep "bin/proxy_server" | grep -v grep | wc -l` + if [ ${cnt} -ne 1 ]; then + err "Proxy server is not running, got cnt ${cnt}, expect 1" + exit 1 + fi + # test proxy server + info "Test proxy server" + # check heart beat + res=$(curl -X GET http://localhost:9999/heartbeat) + if [ "${res}" != "OK" ]; then + err "Test proxy server failed, expect OK, but got ${res}" + exit 1 + fi + # now kill worker2, and check whether proxy server can still work + ps -ef | grep "bin/interactive_server" | grep -v grep | grep ${ENGINE_CONFIG_PATH_WORKER2} | awk '{print $2}' | xargs kill -9 + sleep 2 + res=$(curl -X GET http://localhost:9999/heartbeat) + # shold still be ok + if [ "${res}" != "OK" ]; then + err "Test proxy server failed, expect OK, but got ${res}" + exit 1 + fi + # now kill worker1, and check whether proxy server can still work + ps -ef | grep "bin/interactive_server" | grep -v grep | grep ${ENGINE_CONFIG_PATH_WORKER1} | awk '{print $2}' | xargs kill -9 + sleep 2 + res=$(curl -X GET http://localhost:9999/heartbeat) + # shold not contains OK + if [ "${res}" == "OK" ]; then + err "Test proxy server failed, expect not OK, but got ${res}" + exit 1 + fi + info "Test proxy server success" +} + + +kill_service + +prepare_engine_config +start_worker # start interactive worker first +start_proxy # start the proxy server +test_proxy + +kill_service \ No newline at end of file