From 8264c45ca75d7881bda5a43581734a9e4a2a4d98 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 18 Mar 2024 14:42:14 +0800 Subject: [PATCH] improve httpclient --- .../org/apache/doris/flink/sink/HttpUtil.java | 21 ++-- .../sink/batch/DorisBatchStreamLoad.java | 54 +++++----- .../doris/flink/sink/copy/BatchStageLoad.java | 98 +++++++++++-------- .../flink/sink/copy/DorisCopyCommitter.java | 64 ++++++------ .../sink/copy/TestDorisCopyCommitter.java | 5 +- 5 files changed, 127 insertions(+), 115 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 43e2bea8c..45f2b0e19 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -18,12 +18,13 @@ package org.apache.doris.flink.sink; import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; +import java.util.concurrent.TimeUnit; + /** util to build http client. */ public class HttpUtil { private final HttpClientBuilder httpClientBuilder = @@ -34,7 +35,9 @@ public class HttpUtil { protected boolean isRedirectable(String method) { return true; } - }); + }) + .evictExpiredConnections() + .evictIdleConnections(60, TimeUnit.SECONDS); public CloseableHttpClient getHttpClient() { return httpClientBuilder.build(); @@ -48,17 +51,7 @@ public CloseableHttpClient getHttpClient() { .setSocketTimeout(9 * 60 * 1000) .build(); - public CloseableHttpClient getHttpClientForBatch() { - return httpClientBuilder.setDefaultRequestConfig(requestConfig).build(); - } - - private final HttpClientBuilder httpClientBuilderWithTimeout = - HttpClients.custom().setDefaultRequestConfig(requestConfig); - - public CloseableHttpClient getHttpClientWithTimeout() { - return httpClientBuilderWithTimeout - // fix failed to respond for commit copy - .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) - .build(); + public HttpClientBuilder getHttpClientBuilderForBatch() { + return httpClientBuilder.setDefaultRequestConfig(requestConfig); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 0971be081..ad4ccdede 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -35,6 +35,7 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,7 @@ public class DorisBatchStreamLoad implements Serializable { private final AtomicBoolean started; private volatile boolean loadThreadAlive = false; private AtomicReference exception = new AtomicReference<>(null); - private CloseableHttpClient httpClient = new HttpUtil().getHttpClientForBatch(); + private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); private BackendUtil backendUtil; public DorisBatchStreamLoad( @@ -274,32 +275,35 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { int retry = 0; while (retry <= executionOptions.getMaxRetries()) { LOG.info("stream load started for {} on host {}", label, hostPort); - try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) { - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode == 200 && response.getEntity() != null) { - String loadResult = EntityUtils.toString(response.getEntity()); - LOG.info("load Result {}", loadResult); - RespContent respContent = - OBJECT_MAPPER.readValue(loadResult, RespContent.class); - if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg = - String.format( - "stream load error: %s, see more in %s", - respContent.getMessage(), respContent.getErrorURL()); - throw new DorisBatchLoadException(errMsg); - } else { - return; + try (CloseableHttpClient httpClient = httpClientBuilder.build()) { + try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + LOG.info("load Result {}", loadResult); + RespContent respContent = + OBJECT_MAPPER.readValue(loadResult, RespContent.class); + if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { + String errMsg = + String.format( + "stream load error: %s, see more in %s", + respContent.getMessage(), + respContent.getErrorURL()); + throw new DorisBatchLoadException(errMsg); + } else { + return; + } } + LOG.error( + "stream load failed with {}, reason {}, to retry", + hostPort, + response.getStatusLine().toString()); + } catch (Exception ex) { + if (retry == executionOptions.getMaxRetries()) { + throw new DorisBatchLoadException("stream load error: ", ex); + } + LOG.error("stream load error with {}, to retry, cause by", hostPort, ex); } - LOG.error( - "stream load failed with {}, reason {}, to retry", - hostPort, - response.getStatusLine().toString()); - } catch (Exception ex) { - if (retry == executionOptions.getMaxRetries()) { - throw new DorisBatchLoadException("stream load error: ", ex); - } - LOG.error("stream load error with {}, to retry, cause by", hostPort, ex); } retry++; // get available backend retry diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java index 97e217dea..e81ed7cd3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java @@ -35,6 +35,7 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +85,7 @@ public class BatchStageLoad implements Serializable { private final AtomicBoolean started; private volatile boolean loadThreadAlive = false; private AtomicReference exception = new AtomicReference<>(null); - private CloseableHttpClient httpClient = new HttpUtil().getHttpClientWithTimeout(); + private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); public BatchStageLoad( DorisOptions dorisOptions, @@ -290,33 +291,38 @@ public String uploadToInternalStage(String address, ByteBuffer data) BackoffAndRetryUtils.backoffAndRetry( BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE, () -> { - try (CloseableHttpResponse response = - httpClient.execute(httpPut)) { - final int statusCode = - response.getStatusLine().getStatusCode(); - String requestId = getRequestId(response.getAllHeaders()); - if (statusCode == 200 && response.getEntity() != null) { - String loadResult = - EntityUtils.toString(response.getEntity()); - if (loadResult == null || loadResult.isEmpty()) { - // upload finished - return requestId; + try (CloseableHttpClient httpClient = + httpClientBuilder.build()) { + try (CloseableHttpResponse response = + httpClient.execute(httpPut)) { + final int statusCode = + response.getStatusLine().getStatusCode(); + String requestId = + getRequestId(response.getAllHeaders()); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = + EntityUtils.toString(response.getEntity()); + if (loadResult == null || loadResult.isEmpty()) { + // upload finished + return requestId; + } + LOG.error( + "upload file failed, requestId is {}, response result: {}", + requestId, + loadResult); + throw new CopyLoadException( + "upload file failed: " + + response.getStatusLine() + .toString() + + ", with requestId " + + requestId); } - LOG.error( - "upload file failed, requestId is {}, response result: {}", - requestId, - loadResult); throw new CopyLoadException( - "upload file failed: " + "upload file error: " + response.getStatusLine().toString() + ", with requestId " + requestId); } - throw new CopyLoadException( - "upload file error: " - + response.getStatusLine().toString() - + ", with requestId " - + requestId); } }); return String.valueOf(result); @@ -361,27 +367,33 @@ public String getUploadAddress(String fileName) throws CopyLoadException { BackoffAndRetryUtils.backoffAndRetry( BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS, () -> { - try (CloseableHttpResponse execute = - httpClient.execute(putBuilder.build())) { - int statusCode = execute.getStatusLine().getStatusCode(); - String reason = execute.getStatusLine().getReasonPhrase(); - if (statusCode == 307) { - Header location = execute.getFirstHeader("location"); - String uploadAddress = location.getValue(); - return uploadAddress; - } else { - HttpEntity entity = execute.getEntity(); - String result = - entity == null - ? null - : EntityUtils.toString(entity); - LOG.error( - "Failed to get internalStage address, status {}, reason {}, response {}", - statusCode, - reason, - result); - throw new CopyLoadException( - "Failed get internalStage address"); + try (CloseableHttpClient httpClient = + httpClientBuilder.build()) { + try (CloseableHttpResponse execute = + httpClient.execute(putBuilder.build())) { + int statusCode = + execute.getStatusLine().getStatusCode(); + String reason = + execute.getStatusLine().getReasonPhrase(); + if (statusCode == 307) { + Header location = + execute.getFirstHeader("location"); + String uploadAddress = location.getValue(); + return uploadAddress; + } else { + HttpEntity entity = execute.getEntity(); + String result = + entity == null + ? null + : EntityUtils.toString(entity); + LOG.error( + "Failed to get internalStage address, status {}, reason {}, response {}", + statusCode, + reason, + result); + throw new CopyLoadException( + "Failed get internalStage address"); + } } } }); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java index 16be357f6..e746fcf7c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java @@ -31,6 +31,7 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +48,20 @@ public class DorisCopyCommitter implements Committer, Clos private static final int SUCCESS = 0; private static final String FAIL = "1"; private ObjectMapper objectMapper = new ObjectMapper(); - private final CloseableHttpClient httpClient; private final DorisOptions dorisOptions; + private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); int maxRetry; public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry) { - this(dorisOptions, maxRetry, new HttpUtil().getHttpClientWithTimeout()); + this.dorisOptions = dorisOptions; + this.maxRetry = maxRetry; } - public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry, CloseableHttpClient client) { + public DorisCopyCommitter( + DorisOptions dorisOptions, int maxRetry, HttpClientBuilder httpClientBuilder) { this.dorisOptions = dorisOptions; this.maxRetry = maxRetry; - this.httpClient = client; + this.httpClientBuilder = httpClientBuilder; } @Override @@ -88,31 +91,32 @@ private void commitTransaction(DorisCopyCommittable committable) throws IOExcept .setUrl(String.format(commitPattern, hostPort)) .baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword()) .setEntity(new StringEntity(objectMapper.writeValueAsString(params))); - - try (CloseableHttpResponse response = httpClient.execute(postBuilder.build())) { - statusCode = response.getStatusLine().getStatusCode(); - reasonPhrase = response.getStatusLine().getReasonPhrase(); - if (statusCode != 200) { - LOG.warn( - "commit failed with status {} {}, reason {}", - statusCode, - hostPort, - reasonPhrase); - } else if (response.getEntity() != null) { - loadResult = EntityUtils.toString(response.getEntity()); - success = handleCommitResponse(loadResult); - if (success) { - LOG.info( - "commit success cost {}ms, response is {}", - System.currentTimeMillis() - start, - loadResult); - break; - } else { - LOG.warn("commit failed, retry again"); + try (CloseableHttpClient httpClient = httpClientBuilder.build()) { + try (CloseableHttpResponse response = httpClient.execute(postBuilder.build())) { + statusCode = response.getStatusLine().getStatusCode(); + reasonPhrase = response.getStatusLine().getReasonPhrase(); + if (statusCode != 200) { + LOG.warn( + "commit failed with status {} {}, reason {}", + statusCode, + hostPort, + reasonPhrase); + } else if (response.getEntity() != null) { + loadResult = EntityUtils.toString(response.getEntity()); + success = handleCommitResponse(loadResult); + if (success) { + LOG.info( + "commit success cost {}ms, response is {}", + System.currentTimeMillis() - start, + loadResult); + break; + } else { + LOG.warn("commit failed, retry again"); + } } + } catch (IOException e) { + LOG.error("commit error : ", e); } - } catch (IOException e) { - LOG.error("commit error : ", e); } } @@ -158,9 +162,5 @@ public boolean handleCommitResponse(String loadResult) throws IOException { } @Override - public void close() throws IOException { - if (httpClient != null) { - httpClient.close(); - } - } + public void close() throws IOException {} } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java index 7a7f32673..23399aca4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java @@ -26,6 +26,7 @@ import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.message.BasicStatusLine; import org.junit.Assert; import org.junit.Before; @@ -48,14 +49,16 @@ public class TestDorisCopyCommitter { public void setUp() throws Exception { DorisOptions dorisOptions = OptionUtils.buildDorisOptions(); copyCommittable = new DorisCopyCommittable("127.0.0.1:8710", "copy into sql"); + HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class); CloseableHttpClient httpClient = mock(CloseableHttpClient.class); + when(httpClientBuilder.build()).thenReturn(httpClient); entityMock = new HttpEntityMock(); CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class); StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, ""); when(httpClient.execute(any())).thenReturn(httpResponse); when(httpResponse.getStatusLine()).thenReturn(normalLine); when(httpResponse.getEntity()).thenReturn(entityMock); - copyCommitter = new DorisCopyCommitter(dorisOptions, 1, httpClient); + copyCommitter = new DorisCopyCommitter(dorisOptions, 1, httpClientBuilder); } @Test