Skip to content

Commit

Permalink
[improve] Improve Httpclient Connection (apache#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Mar 19, 2024
1 parent d840ccc commit e5bba52
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.doris.flink.sink;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;

import java.util.concurrent.TimeUnit;

/** util to build http client. */
public class HttpUtil {
private final HttpClientBuilder httpClientBuilder =
Expand All @@ -34,7 +35,9 @@ public class HttpUtil {
protected boolean isRedirectable(String method) {
return true;
}
});
})
.evictExpiredConnections()
.evictIdleConnections(60, TimeUnit.SECONDS);

public CloseableHttpClient getHttpClient() {
return httpClientBuilder.build();
Expand All @@ -48,17 +51,21 @@ public CloseableHttpClient getHttpClient() {
.setSocketTimeout(9 * 60 * 1000)
.build();

public CloseableHttpClient getHttpClientForBatch() {
return httpClientBuilder.setDefaultRequestConfig(requestConfig).build();
public HttpClientBuilder getHttpClientBuilderForBatch() {
return HttpClients.custom()
.setRedirectStrategy(
new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
})
.setDefaultRequestConfig(requestConfig);
}

private final HttpClientBuilder httpClientBuilderWithTimeout =
HttpClients.custom().setDefaultRequestConfig(requestConfig);

public CloseableHttpClient getHttpClientWithTimeout() {
return httpClientBuilderWithTimeout
// fix failed to respond for commit copy
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
.build();
public HttpClientBuilder getHttpClientBuilderForCopyBatch() {
return HttpClients.custom()
.disableRedirectHandling()
.setDefaultRequestConfig(requestConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,7 +91,7 @@ public class DorisBatchStreamLoad implements Serializable {
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
private CloseableHttpClient httpClient = new HttpUtil().getHttpClientForBatch();
private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch();
private BackendUtil backendUtil;

public DorisBatchStreamLoad(
Expand Down Expand Up @@ -274,32 +275,35 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
LOG.info("stream load started for {} on host {}", label, hostPort);
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent =
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(), respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
} else {
return;
try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent =
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
} else {
return;
}
}
LOG.error(
"stream load failed with {}, reason {}, to retry",
hostPort,
response.getStatusLine().toString());
} catch (Exception ex) {
if (retry == executionOptions.getMaxRetries()) {
throw new DorisBatchLoadException("stream load error: ", ex);
}
LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);
}
LOG.error(
"stream load failed with {}, reason {}, to retry",
hostPort,
response.getStatusLine().toString());
} catch (Exception ex) {
if (retry == executionOptions.getMaxRetries()) {
throw new DorisBatchLoadException("stream load error: ", ex);
}
LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);
}
retry++;
// get available backend retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -84,7 +85,7 @@ public class BatchStageLoad implements Serializable {
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
private CloseableHttpClient httpClient = new HttpUtil().getHttpClientWithTimeout();
private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForCopyBatch();

public BatchStageLoad(
DorisOptions dorisOptions,
Expand Down Expand Up @@ -290,33 +291,38 @@ public String uploadToInternalStage(String address, ByteBuffer data)
BackoffAndRetryUtils.backoffAndRetry(
BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE,
() -> {
try (CloseableHttpResponse response =
httpClient.execute(httpPut)) {
final int statusCode =
response.getStatusLine().getStatusCode();
String requestId = getRequestId(response.getAllHeaders());
if (statusCode == 200 && response.getEntity() != null) {
String loadResult =
EntityUtils.toString(response.getEntity());
if (loadResult == null || loadResult.isEmpty()) {
// upload finished
return requestId;
try (CloseableHttpClient httpClient =
httpClientBuilder.build()) {
try (CloseableHttpResponse response =
httpClient.execute(httpPut)) {
final int statusCode =
response.getStatusLine().getStatusCode();
String requestId =
getRequestId(response.getAllHeaders());
if (statusCode == 200 && response.getEntity() != null) {
String loadResult =
EntityUtils.toString(response.getEntity());
if (loadResult == null || loadResult.isEmpty()) {
// upload finished
return requestId;
}
LOG.error(
"upload file failed, requestId is {}, response result: {}",
requestId,
loadResult);
throw new CopyLoadException(
"upload file failed: "
+ response.getStatusLine()
.toString()
+ ", with requestId "
+ requestId);
}
LOG.error(
"upload file failed, requestId is {}, response result: {}",
requestId,
loadResult);
throw new CopyLoadException(
"upload file failed: "
"upload file error: "
+ response.getStatusLine().toString()
+ ", with requestId "
+ requestId);
}
throw new CopyLoadException(
"upload file error: "
+ response.getStatusLine().toString()
+ ", with requestId "
+ requestId);
}
});
return String.valueOf(result);
Expand Down Expand Up @@ -361,27 +367,33 @@ public String getUploadAddress(String fileName) throws CopyLoadException {
BackoffAndRetryUtils.backoffAndRetry(
BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS,
() -> {
try (CloseableHttpResponse execute =
httpClient.execute(putBuilder.build())) {
int statusCode = execute.getStatusLine().getStatusCode();
String reason = execute.getStatusLine().getReasonPhrase();
if (statusCode == 307) {
Header location = execute.getFirstHeader("location");
String uploadAddress = location.getValue();
return uploadAddress;
} else {
HttpEntity entity = execute.getEntity();
String result =
entity == null
? null
: EntityUtils.toString(entity);
LOG.error(
"Failed to get internalStage address, status {}, reason {}, response {}",
statusCode,
reason,
result);
throw new CopyLoadException(
"Failed get internalStage address");
try (CloseableHttpClient httpClient =
httpClientBuilder.build()) {
try (CloseableHttpResponse execute =
httpClient.execute(putBuilder.build())) {
int statusCode =
execute.getStatusLine().getStatusCode();
String reason =
execute.getStatusLine().getReasonPhrase();
if (statusCode == 307) {
Header location =
execute.getFirstHeader("location");
String uploadAddress = location.getValue();
return uploadAddress;
} else {
HttpEntity entity = execute.getEntity();
String result =
entity == null
? null
: EntityUtils.toString(entity);
LOG.error(
"Failed to get internalStage address, status {}, reason {}, response {}",
statusCode,
reason,
result);
throw new CopyLoadException(
"Failed get internalStage address");
}
}
}
});
Expand Down
Loading

0 comments on commit e5bba52

Please sign in to comment.