Skip to content

Commit

Permalink
[fix](arrow-flight-sql) Fix conf public_host and `arrow_flight_sql_…
Browse files Browse the repository at this point in the history
…proxy_port` (apache#44177)

### What problem does this PR solve?

Problem Summary:

Rename `public_access_ip` to `public_host` and `public_access_port` to
`arrow_flight_sql_proxy_port`, they do not have to be used together.
  • Loading branch information
xinyiZzz committed Nov 19, 2024
1 parent 49ce106 commit 0e5c2fd
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 15 deletions.
25 changes: 23 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,29 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "-1");

DEFINE_mString(public_access_ip, "");
DEFINE_Int32(public_access_port, "-1");
// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network.
// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be
// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx.
// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP,
// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP.
DEFINE_mString(public_host, "");

// If the BE node is connected to the external network through a reverse proxy like Nginx
// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy
// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example:
// upstream arrowflight {
// server 10.16.10.8:8069;
// server 10.16.10.8:8068;
//}
// server {
// listen 8167 http2;
// listen [::]:8167 http2;
// server_name doris.arrowflight.com;
// }
DEFINE_Int32(arrow_flight_sql_proxy_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
28 changes: 23 additions & 5 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// If priority_networks is incorrect but cannot be modified, set public_access_ip as BE’s real IP.
// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result.
// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip.
DECLARE_mString(public_access_ip);
DECLARE_Int32(public_access_port);
// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network.
// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be
// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx.
// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP,
// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP.
DECLARE_mString(public_host);

// If the BE node is connected to the external network through a reverse proxy like Nginx
// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy
// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example:
// upstream arrowflight {
// server 10.16.10.8:8069;
// server 10.16.10.8:8068;
//}
// server {
// listen 8167 http2;
// listen [::]:8167 http2;
// server_name doris.arrowflight.com;
// }
DECLARE_Int32(arrow_flight_sql_proxy_port);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
8 changes: 5 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,11 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
st = serialize_arrow_schema(&schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
if (!config::public_access_ip.empty() && config::public_access_port != -1) {
result->set_be_arrow_flight_ip(config::public_access_ip);
result->set_be_arrow_flight_port(config::public_access_port);
if (!config::public_host.empty()) {
result->set_be_arrow_flight_ip(config::public_host);
}
if (config::arrow_flight_sql_proxy_port != -1) {
result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port);
}
}
st.to_protobuf(result->mutable_status());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,15 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
} else {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
connectContext.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ public Schema fetchArrowFlightSchema(int timeoutMs) {
throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s",
DebugUtil.printId(tid), resultStatus));
}
if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) {
publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8();
publicAccessAddr.port = pResult.getBeArrowFlightPort();
if (pResult.hasBeArrowFlightIp()) {
publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
}
if (pResult.hasBeArrowFlightPort()) {
publicAccessAddr.setPort(pResult.getBeArrowFlightPort());
}
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
Expand Down

0 comments on commit 0e5c2fd

Please sign in to comment.