Skip to content

Commit

Permalink
[streamload](redirect) Support redirect-policy for streamload which i…
Browse files Browse the repository at this point in the history
…s used by audit plugin (apache#35840)

Audit log plugin uses streamload to save audit data, in cloud mode, the
streamload initiated by the audit-log component may be forwarded to a
Load Balancer (LB) attached to a Backend (BE). This can cause the
request to fail due to network issues.

This PR adds new a streamlaod redirect policy for to get rid the LB
issue.
  • Loading branch information
gavinchou authored and dataroaring committed Jun 7, 2024
1 parent a6a30f0 commit e0001bc
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 27 deletions.
3 changes: 1 addition & 2 deletions be/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ struct AuthInfo {

template <class T>
void set_request_auth(T* req, const AuthInfo& auth) {
req->user = auth.user; // always set user, because it may be used by FE
if (auth.auth_code != -1) {
// if auth_code is set, no need to set other info
req->__set_auth_code(auth.auth_code);
// user name and passwd is unused, but they are required field.
// so they have to be set.
req->user = "";
req->passwd = "";
} else if (auth.token != "") {
req->__isset.token = true;
req->token = auth.token;
} else {
req->user = auth.user;
req->passwd = auth.passwd;
if (!auth.cluster.empty()) {
req->__set_cluster(auth.cluster);
Expand Down
35 changes: 18 additions & 17 deletions be/src/http/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,30 +77,31 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
const auto& token = req.header("token");
const auto& auth_code = req.header(HTTP_AUTH_CODE);

std::tuple<std::string, std::string, std::string> tmp;
auto& [user, pass, cluster] = tmp;
bool valid_basic_auth = parse_basic_auth(req, &user, &pass);
if (valid_basic_auth) { // always set the basic auth, the user may be useful
auto pos = user.find('@');
if (pos != std::string::npos) {
cluster.assign(user.c_str() + pos + 1);
user.assign(user.c_str(), pos); // user is updated
}
auth->user = user;
auth->passwd = pass;
auth->cluster = cluster;
}

if (!token.empty()) {
auth->token = token;
} else if (!auth_code.empty()) {
auth->auth_code = std::stoll(auth_code);
} else {
std::string full_user;
if (!parse_basic_auth(req, &full_user, &auth->passwd)) {
return false;
}
auto pos = full_user.find('@');
if (pos != std::string::npos) {
auth->user.assign(full_user.data(), pos);
auth->cluster.assign(full_user.data() + pos + 1);
} else {
auth->user = full_user;
}
} else if (!valid_basic_auth) {
return false;
}

// set user ip
if (req.remote_host() != nullptr) {
auth->user_ip.assign(req.remote_host());
} else {
auth->user_ip.assign("");
}
auth->user_ip.assign(req.remote_host() != nullptr ? req.remote_host() : "");

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import java.net.InetAddress;
import java.net.URI;
import java.util.Enumeration;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
Expand All @@ -69,6 +70,11 @@ public class LoadAction extends RestBaseController {

public static final String SUB_LABEL_NAME_PARAM = "sub_label";

public static final String HEADER_REDIRECT_POLICY = "redirect-policy";

public static final String REDIRECT_POLICY_PUBLIC_PRIVATE = "public-private";
public static final String REDIRECT_POLICY_RANDOM_BE = "random-be";

private ExecuteEnv execEnv = ExecuteEnv.getInstance();

private int lastSelectedBackendIndex = 0;
Expand All @@ -94,6 +100,7 @@ public Object load(HttpServletRequest request, HttpServletResponse response,
public Object streamLoad(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
Expand Down Expand Up @@ -209,6 +216,7 @@ private String[] parseDbAndTb(String sql) throws Exception {
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
LOG.info("streamload action 2PC, db: {}, headers: {}", db, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -222,6 +230,7 @@ public Object streamLoad2PC_table(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand Down Expand Up @@ -348,8 +357,7 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea
if (Strings.isNullOrEmpty(cloudClusterName)) {
throw new LoadException("No cloud cluster name selected.");
}
String reqHostStr = request.getHeader(HttpHeaderNames.HOST.toString());
return selectCloudRedirectBackend(cloudClusterName, reqHostStr, groupCommit);
return selectCloudRedirectBackend(cloudClusterName, request, groupCommit);
} else {
return selectLocalRedirectBackend(groupCommit);
}
Expand Down Expand Up @@ -391,10 +399,18 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws L
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}

private TNetworkAddress selectCloudRedirectBackend(String clusterName, String reqHostStr, boolean groupCommit)
private TNetworkAddress selectCloudRedirectBackend(String clusterName, HttpServletRequest req, boolean groupCommit)
throws LoadException {
Backend backend = StreamLoadHandler.selectBackend(clusterName, groupCommit);

String redirectPolicy = req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
// User specified redirect policy
if (redirectPolicy != null && redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE)) {
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
? Config.streamload_redirect_policy : redirectPolicy;

Pair<String, Integer> publicHostPort = null;
Pair<String, Integer> privateHostPort = null;
try {
Expand All @@ -413,6 +429,7 @@ private TNetworkAddress selectCloudRedirectBackend(String clusterName, String re
throw new LoadException(e.getMessage());
}

String reqHostStr = req.getHeader(HttpHeaderNames.HOST.toString());
reqHostStr = reqHostStr.replaceAll("\\s+", "");
if (reqHostStr.isEmpty()) {
LOG.info("Invalid header host: {}", reqHostStr);
Expand All @@ -430,8 +447,8 @@ private TNetworkAddress selectCloudRedirectBackend(String clusterName, String re
throw new LoadException("Invalid header host: " + reqHost);
}

if (Config.streamload_redirect_policy.equalsIgnoreCase("public-private")) {
// ip
if (redirectPolicy != null && redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC_PRIVATE)) {
// redirect with ip
if (InetAddressValidator.getInstance().isValid(reqHost)) {
InetAddress addr;
try {
Expand All @@ -451,7 +468,7 @@ private TNetworkAddress selectCloudRedirectBackend(String clusterName, String re
}
}

// domin
// redirect with domain
if (publicHostPort != null && reqHost.toLowerCase().contains("public")) {
return new TNetworkAddress(publicHostPort.first, publicHostPort.second);
} else if (privateHostPort != null) {
Expand Down Expand Up @@ -585,4 +602,15 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
ConnectContext.remove();
}
}

private String getAllHeaders(HttpServletRequest request) {
StringBuilder headers = new StringBuilder();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
String headerValue = request.getHeader(headerName);
headers.append(headerName).append(":").append(headerValue).append(", ");
}
return headers.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public StreamLoadHandler(TStreamLoadPutRequest request, AtomicInteger indexId,
this.clientAddr = clientAddr;
}

/**
* Select a random backend in the given cloud cluster.
*
* @param clusterName cloud cluster name
* @param groupCommit if this selection is for group commit
* @throws LoadException if there is no available backend
*/
public static Backend selectBackend(String clusterName, boolean groupCommit) throws LoadException {
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
conn.setRequestProperty("token", clusterToken);
conn.setRequestProperty("Authorization", "Basic ");
conn.setRequestProperty("Authorization", "Basic YWRtaW46"); // admin
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
Expand All @@ -67,6 +67,7 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus
conn.addRequestProperty("columns",
InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect(
Collectors.joining(",")));
conn.addRequestProperty("redirect-policy", "random-be");
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
Expand All @@ -75,13 +76,14 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus
private String toCurl(HttpURLConnection conn) {
StringBuilder sb = new StringBuilder("curl -v ");
sb.append("-X ").append(conn.getRequestMethod()).append(" \\\n ");
sb.append("-H \"").append("Authorization\":").append("\"Basic ").append("\" \\\n ");
sb.append("-H \"").append("Authorization\":").append("\"Basic YWRtaW46").append("\" \\\n ");
sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n ");
sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n ");
sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n ");
sb.append("-H \"").append("columns\":")
.append("\"" + InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect(
Collectors.joining(",")) + "\" \\\n ");
sb.append("-H \"").append("redirect-policy\":").append("\"random-be").append("\" \\\n ");
sb.append("\"").append(conn.getURL()).append("\"");
return sb.toString();
}
Expand Down

0 comments on commit e0001bc

Please sign in to comment.