From a1d35250bb2cb82fce392c677ea6456f66cb6179 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 26 Apr 2024 16:02:01 +0800 Subject: [PATCH] pick pr 140 102 --- .../flink/sink/writer/DorisStreamLoad.java | 2 +- .../doris/flink/sink/writer/DorisWriter.java | 29 +++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index bd29d3410..f364d9437 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -78,7 +78,7 @@ public class DorisStreamLoad implements Serializable { private final boolean enableDelete; private final Properties streamLoadProp; private final RecordStream recordStream; - private Future pendingLoadFuture; + private volatile Future pendingLoadFuture; private final CloseableHttpClient httpClient; private final ExecutorService executorService; private boolean loadBatchFirstRecord; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index aa11df579..2df66e936 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -167,19 +167,24 @@ private void checkDone() { LOG.debug("not loading, skip timer checker"); return; } - // TODO: introduce cache for reload instead of throwing exceptions. - String errorMsg; - try { - RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get()); - errorMsg = content.getMessage(); - } catch (Exception e) { - errorMsg = e.getMessage(); + // double check to interrupt when loading is true and dorisStreamLoad.getPendingLoadFuture().isDone + // fix issue #139 + if (dorisStreamLoad.getPendingLoadFuture() != null + && dorisStreamLoad.getPendingLoadFuture().isDone()) { + // TODO: introduce cache for reload instead of throwing exceptions. + String errorMsg; + try { + RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get()); + errorMsg = content.getMessage(); + } catch (Exception e) { + errorMsg = e.getMessage(); + } + + loadException = new StreamLoadException(errorMsg); + LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg); + // set the executor thread interrupted in case blocking in write data. + executorThread.interrupt(); } - - loadException = new StreamLoadException(errorMsg); - LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg); - // set the executor thread interrupted in case blocking in write data. - executorThread.interrupt(); } }