Skip to content

Commit

Permalink
[improvement] fix batch streamload when label already exist (apache#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Aug 15, 2024
1 parent 411020a commit 16b01d0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public HttpPutBuilder setLabel(String label) {
return this;
}

public String getLabel() {
return header.get("label");
}

public HttpPut build() {
Preconditions.checkNotNull(url);
Preconditions.checkNotNull(httpEntity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -450,25 +451,22 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
if (enableGroupCommit) {
LOG.info("stream load started with group commit on host {}", hostPort);
} else {
LOG.info("stream load started for {} on host {}", label, hostPort);
LOG.info(
"stream load started for {} on host {}",
putBuilder.getLabel(),
hostPort);
}

try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
String reason = response.getStatusLine().toString();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent =
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
} else {
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
long cacheByteBeforeFlush =
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
LOG.info(
Expand All @@ -482,16 +480,26 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
lock.unlock();
}
return;
} else if (LoadStatus.LABEL_ALREADY_EXIST.equals(
respContent.getStatus())) {
// todo: need to abort transaction when JobStatus not finished
putBuilder.setLabel(label + "_" + retry);
reason = respContent.getMessage();
} else {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
}
}
LOG.error(
"stream load failed with {}, reason {}, to retry",
hostPort,
response.getStatusLine().toString());
reason);
if (retry == executionOptions.getMaxRetries()) {
resEx =
new DorisRuntimeException(
"stream load failed with: " + response.getStatusLine());
resEx = new DorisRuntimeException("stream load failed with: " + reason);
}
} catch (Exception ex) {
resEx = ex;
Expand Down

0 comments on commit 16b01d0

Please sign in to comment.