Skip to content

Commit

Permalink
fix batch streamload when label already exist
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Aug 13, 2024
1 parent eb90905 commit 4c06029
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public HttpPutBuilder setLabel(String label) {
return this;
}

public String getLabel() {
return header.get("label");
}

public HttpPut build() {
Preconditions.checkNotNull(url);
Preconditions.checkNotNull(httpEntity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -302,36 +303,43 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
if (enableGroupCommit) {
LOG.info("stream load started with group commit on host {}", hostPort);
} else {
LOG.info("stream load started for {} on host {}", label, hostPort);
LOG.info(
"stream load started for {} on host {}",
putBuilder.getLabel(),
hostPort);
}

try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
String reason = response.getStatusLine().toString();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent =
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
return;
} else if (LoadStatus.LABEL_ALREADY_EXIST.equals(
respContent.getStatus())) {
// todo: need to abort transaction when JobStatus not finished
putBuilder.setLabel(label + "_" + retry);
reason = respContent.getMessage();
} else {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
} else {
return;
}
}
LOG.error(
"stream load failed with {}, reason {}, to retry",
hostPort,
response.getStatusLine().toString());
reason);
if (retry == executionOptions.getMaxRetries()) {
resEx =
new DorisRuntimeException(
"stream load failed with: " + response.getStatusLine());
resEx = new DorisRuntimeException("stream load failed with: " + reason);
}
} catch (Exception ex) {
resEx = ex;
Expand Down

0 comments on commit 4c06029

Please sign in to comment.