Skip to content

Commit

Permalink
introduce proxy_server
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 committed May 9, 2024
1 parent 5b2712c commit 396d941
Show file tree
Hide file tree
Showing 23 changed files with 1,061 additions and 3 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/hqps-db-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ jobs:
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./engine_config_test.yaml python
sed -i 's/default_graph: modern_graph/default_graph: ldbc/g' ./engine_config_test.yaml
- name: Proxy Server test
env:
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
bash hqps_proxy_server_test.sh ${INTERACTIVE_WORKSPACE} ./engine_config_test.yaml
- name: Sample Query test
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest
Expand Down
1 change: 1 addition & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" OFF)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF)
option(USE_PTHASH "Whether to use pthash" OFF)
option(BUILD_PROXY "Whether to build proxy" ON)

#print options
message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}")
Expand Down
6 changes: 6 additions & 0 deletions flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ if(BUILD_HQPS)
install(PROGRAMS load_plan_and_gen.sh DESTINATION bin)
endif()

if (BUILD_PROXY)
add_executable(proxy_server proxy_server.cc)
target_link_libraries(proxy_server flex_utils flex_server ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
install_without_export_flex_target(proxy_server)
endif()

include_directories(${Boost_INCLUDE_DIRS})
add_executable(bulk_loader bulk_loader.cc)
target_link_libraries(bulk_loader flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES})
Expand Down
115 changes: 115 additions & 0 deletions flex/bin/proxy_server.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <filesystem>
#include <iostream>
#include "stdlib.h"

#include "flex/engines/hqps_db/core/utils/hqps_utils.h"
#include "flex/engines/http_server/service/proxy_service.h"
#include "flex/utils/service_utils.h"

#include <boost/program_options.hpp>

#include <glog/logging.h>

namespace bpo = boost::program_options;

namespace gs {
// Function to parse endpoints from a string
bool parse_endpoints(const std::string& input_string,
std::vector<std::pair<std::string, uint16_t>>& endpoints) {
std::istringstream iss(input_string);
std::string endpoint;

while (std::getline(iss, endpoint, ',')) {
// Split the endpoint into host and port using ':'
size_t delimiter_pos = endpoint.find(':');
if (delimiter_pos == std::string::npos) {
std::cerr << "Invalid endpoint: " << endpoint << ", missing delimiter ':'"
<< std::endl;
continue;
}

std::string host = endpoint.substr(0, delimiter_pos);
std::string port_str = endpoint.substr(delimiter_pos + 1);
uint16_t port;
try {
port = std::stoull(port_str);
} catch (const std::invalid_argument& e) {
LOG(ERROR) << "Invalid port: " << port_str << ", must be a number"
<< std::endl;
return false;
}

// Check for valid port range
if (port < 1 || port > 65535) {
LOG(ERROR) << "Invalid port: " << port << ", must be between 1 and 65535"
<< std::endl;
return false;
}
endpoints.push_back({host, port});
}
return true;
}
} // namespace gs

/**
* The main entrance for ProxyServer.
*/
int main(int argc, char** argv) {
bpo::options_description desc("Usage:");
desc.add_options()("help,h", "Display help messages")(
"endpoints,e", bpo::value<std::string>()->required(),
"The endpoints of the proxy server, e.g., {ip}:{port},{ip}:{port},...")(
"heartbeat-interval,i", bpo::value<int>()->default_value(1),
"The interval of heartbeat check in seconds");

setenv("TZ", "Asia/Shanghai", 1);
tzset();

bpo::variables_map vm;
bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
bpo::notify(vm);

if (vm.count("help")) {
std::cout << desc << std::endl;
return 0;
}

if (!vm.count("endpoints")) {
LOG(FATAL) << "endpoints is not specified";
return 0;
}
std::vector<std::pair<std::string, uint16_t>> endpoints;
if (!gs::parse_endpoints(vm["endpoints"].as<std::string>(), endpoints)) {
LOG(FATAL) << "Failed to parse endpoints";
return 0;
}

LOG(INFO) << "got endpoints of size: " << endpoints.size()
<< ", :" << gs::to_string(endpoints);

uint32_t shard_num = 1;
uint16_t http_port = 9999;

if (!server::ProxyService::get().init(shard_num, http_port, endpoints).ok()) {
LOG(FATAL) << "Failed to init ProxyService";
return 0;
}
server::ProxyService::get().run_and_wait_for_exit();

return 0;
}
7 changes: 7 additions & 0 deletions flex/engines/hqps_db/core/utils/hqps_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,13 @@ struct to_string_impl<uint8_t> {
}
};

template <>
struct to_string_impl<uint16_t> {
static inline std::string to_string(const uint16_t& empty) {
return std::to_string((int32_t) empty);
}
};

template <>
struct to_string_impl<int64_t> {
static inline std::string to_string(const int64_t& empty) {
Expand Down
12 changes: 12 additions & 0 deletions flex/engines/http_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ if (Hiactor_FOUND)
list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*")
endif ()

if (NOT BUILD_PROXY)
list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*proxy.*")
endif ()

# get all .cc files in current directory, except for generated/
file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*generated.*")
Expand All @@ -22,6 +26,14 @@ if (Hiactor_FOUND)
list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*")
endif ()

if (NOT BUILD_PROXY)
list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_actor*")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_http*")
list(FILTER SERVER_FILES EXCLUDE REGEX ".*proxy_service*")
endif ()

message(STATUS "SERVER_FILES: ${SERVER_FILES}")

add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files})
add_dependencies(flex_server server_actor_autogen)
target_compile_options (flex_server
Expand Down
81 changes: 81 additions & 0 deletions flex/engines/http_server/actor/proxy_actor.act.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/http_server/actor/proxy_actor.act.h"
#include "flex/engines/http_server/service/proxy_service.h"

#include "nlohmann/json.hpp"

#include <seastar/core/print.hh>

namespace server {

proxy_actor::~proxy_actor() {
// finalization
// ...
}

proxy_actor::proxy_actor(hiactor::actor_base* exec_ctx,
const hiactor::byte_t* addr)
: hiactor::actor(exec_ctx, addr) {
set_max_concurrency(1); // set max concurrency for task reentrancy (stateful)
// initialization
// ...
}

seastar::future<query_result> proxy_actor::do_query(
proxy_request&& request_payload) {
auto& request = request_payload.content;
VLOG(10) << "proxy_actor::forward_request, method: " << request->_method
<< ", path: " << request->_url << ", query: " << request->content;

// recover the old url with paramters in request
auto& proxy_service = ProxyService::get();
auto& client = proxy_service.get_client();
return client
.forward_request(request->_url, request->_method, request->content,
request->_headers)
.then([&proxy_service](HttpForwardingResponses&& content) {
if (content.size() == 0) {
return seastar::make_exception_future<query_result>(
std::runtime_error("Got no responses when forwarding request "
"to interactive servers."));
}
// Check all responses are ok, if not ok, return error
seastar::sstring res_string;
for (size_t i = 0; i < content.size(); ++i) {
auto& response = content[i];
if (response.first != 200) {
LOG(ERROR) << "Got error response when forwarding request "
"to interactive servers at index: "
<< std::to_string(i) << ", endpoint: "
<< proxy_service.get_endpoints()[i].first + ":"
<< std::to_string(
proxy_service.get_endpoints()[i].second)
<< std::to_string(response.first) + ", msg:"
<< response.second;
} else {
// Select the first/last response as the final result? or check
// whether all responses are the same?
res_string = response.second;
break;
}
}
return seastar::make_ready_future<query_result>(
query_result{std::move(res_string)});
});
}

} // namespace server
45 changes: 45 additions & 0 deletions flex/engines/http_server/actor/proxy_actor.act.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_
#define ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_


#include "flex/engines/http_server/types.h"

#include <hiactor/core/actor-template.hh>
#include <hiactor/util/data_type.hh>
#include <seastar/http/httpd.hh>

namespace server {

class ANNOTATION(actor:impl) proxy_actor : public hiactor::actor {

Check notice on line 29 in flex/engines/http_server/actor/proxy_actor.act.h

View check run for this annotation

codefactor.io / CodeFactor

flex/engines/http_server/actor/proxy_actor.act.h#L29

Redundant blank line at the start of a code block should be deleted. (whitespace/blank_line)
public:
proxy_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr);
~proxy_actor() override;

seastar::future<query_result> ANNOTATION(actor:method) do_query(proxy_request&& param);

// DECLARE_RUN_QUERIES;
/// Declare `do_work` func here, no need to implement.
ACTOR_DO_WORK()

private:
int32_t your_private_members_ = 0;
};
}

#endif // ENGINES_HTTP_SERVER_ACTOR_PROXY_ACTOR_H_
1 change: 1 addition & 0 deletions flex/engines/http_server/handler/admin_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <seastar/core/alien.hh>
#include <seastar/core/print.hh>
#include <seastar/http/common.hh>
#include <seastar/http/handlers.hh>
#include "flex/engines/http_server/generated/actor/admin_actor_ref.act.autogen.h"
#include "flex/engines/http_server/types.h"
Expand Down
6 changes: 4 additions & 2 deletions flex/engines/http_server/handler/admin_http_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
#ifndef ENGINES_HTTP_SERVER_HANDLER_ADMIN_HTTP_HANDLER_H_
#define ENGINES_HTTP_SERVER_HANDLER_ADMIN_HTTP_HANDLER_H_

#include <boost/property_tree/json_parser.hpp>
#include <seastar/http/httpd.hh>
#include <string>
#include "flex/engines/http_server/handler/http_utils.h"
#include "flex/engines/http_server/types.h"
#include "flex/utils/service_utils.h"

#include <boost/property_tree/json_parser.hpp>
#include <seastar/http/common.hh>
#include <seastar/http/httpd.hh>

namespace server {

class InteractiveAdminService;
Expand Down
18 changes: 18 additions & 0 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ class HttpTextMapCarrier

namespace server {

hqps_heartbeat_handler::hqps_heartbeat_handler() {}

hqps_heartbeat_handler::~hqps_heartbeat_handler() = default;

// TODO: return snapshot_id.
seastar::future<std::unique_ptr<seastar::httpd::reply>>
hqps_heartbeat_handler::handle(const seastar::sstring& path,
std::unique_ptr<seastar::httpd::request> req,
std::unique_ptr<seastar::httpd::reply> rep) {
rep->write_body("bin", seastar::sstring{"OK"});
rep->done();
return seastar::make_ready_future<std::unique_ptr<seastar::httpd::reply>>(
std::move(rep));
}

hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
uint32_t group_inc_step,
uint32_t shard_concurrency)
Expand Down Expand Up @@ -377,6 +392,7 @@ hqps_http_handler::hqps_http_handler(uint16_t http_port)
ic_adhoc_group_id, codegen_group_id, max_group_id, group_inc_step,
shard_adhoc_concurrency);
exit_handler_ = new hqps_exit_handler();
heart_beat_handler_ = new hqps_heartbeat_handler();
}

hqps_http_handler::~hqps_http_handler() {
Expand Down Expand Up @@ -469,6 +485,8 @@ seastar::future<> hqps_http_handler::set_routes() {
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/interactive/adhoc_query"),
adhoc_query_handler_);
r.add(seastar::httpd::operation_type::GET,
seastar::httpd::url("/heartbeat"), heart_beat_handler_);
r.add(seastar::httpd::operation_type::POST,
seastar::httpd::url("/interactive/exit"), exit_handler_);
return seastar::make_ready_future<>();
Expand Down
Loading

0 comments on commit 396d941

Please sign in to comment.