diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 3747257d1..5a32949e9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -33,7 +33,6 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; -import org.apache.doris.flink.sink.LoadStatus; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -481,11 +480,6 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { lock.unlock(); } return; - } else if (LoadStatus.LABEL_ALREADY_EXIST.equals( - respContent.getStatus())) { - // todo: need to abort transaction when JobStatus not finished - putBuilder.setLabel(label + "_" + retry); - reason = respContent.getMessage(); } else { String errMsg = null; if (StringUtils.isBlank(respContent.getMessage()) @@ -522,6 +516,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { // get available backend retry refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); putBuilder.setUrl(loadUrl); + putBuilder.setLabel(label + "_" + retry); } buffer.clear(); buffer = null;