diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java index 44f6c9fe3..da1a79998 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java @@ -117,6 +117,10 @@ public HttpPutBuilder setLabel(String label) { return this; } + public String getLabel() { + return header.get("label"); + } + public HttpPut build() { Preconditions.checkNotNull(url); Preconditions.checkNotNull(httpEntity); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 3240dafe4..42b832076 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; +import org.apache.doris.flink.sink.LoadStatus; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -450,25 +451,22 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { if (enableGroupCommit) { LOG.info("stream load started with group commit on host {}", hostPort); } else { - LOG.info("stream load started for {} on host {}", label, hostPort); + LOG.info( + "stream load started for {} on host {}", + putBuilder.getLabel(), + hostPort); } try (CloseableHttpClient httpClient = httpClientBuilder.build()) { try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) { int statusCode = response.getStatusLine().getStatusCode(); + String reason = response.getStatusLine().toString(); if (statusCode == 200 && response.getEntity() != null) { String loadResult = EntityUtils.toString(response.getEntity()); LOG.info("load Result {}", loadResult); RespContent respContent = OBJECT_MAPPER.readValue(loadResult, RespContent.class); - if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg = - String.format( - "stream load error: %s, see more in %s", - respContent.getMessage(), - respContent.getErrorURL()); - throw new DorisBatchLoadException(errMsg); - } else { + if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { long cacheByteBeforeFlush = currentCacheBytes.getAndAdd(-respContent.getLoadBytes()); LOG.info( @@ -482,16 +480,26 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { lock.unlock(); } return; + } else if (LoadStatus.LABEL_ALREADY_EXIST.equals( + respContent.getStatus())) { + // todo: need to abort transaction when JobStatus not finished + putBuilder.setLabel(label + "_" + retry); + reason = respContent.getMessage(); + } else { + String errMsg = + String.format( + "stream load error: %s, see more in %s", + respContent.getMessage(), + respContent.getErrorURL()); + throw new DorisBatchLoadException(errMsg); } } LOG.error( "stream load failed with {}, reason {}, to retry", hostPort, - response.getStatusLine().toString()); + reason); if (retry == executionOptions.getMaxRetries()) { - resEx = - new DorisRuntimeException( - "stream load failed with: " + response.getStatusLine()); + resEx = new DorisRuntimeException("stream load failed with: " + reason); } } catch (Exception ex) { resEx = ex;