From 1911b2d7ec38cf2e854d9942b2801601d66c1c43 Mon Sep 17 00:00:00 2001 From: wudi <> Date: Tue, 21 Nov 2023 21:31:13 +0800 Subject: [PATCH] fix stop load bug --- .../org/apache/doris/flink/sink/writer/DorisWriter.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 8550a214e..c5ce847d5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -209,13 +209,15 @@ public Collection prepareCommit() throws IOException, Interrup } // disable exception checker before stop load. globalLoading = false; - // clean loadingMap - loadingMap.clear(); // submit stream load http request List committableList = new ArrayList<>(); for(Map.Entry streamLoader : dorisStreamLoadMap.entrySet()){ String tableIdentifier = streamLoader.getKey(); + if(!loadingMap.getOrDefault(tableIdentifier, false)){ + LOG.debug("skip table {}, no data need to load.", tableIdentifier); + continue; + } DorisStreamLoad dorisStreamLoad = streamLoader.getValue(); LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); @@ -229,6 +231,8 @@ public Collection prepareCommit() throws IOException, Interrup committableList.add(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); } } + // clean loadingMap + loadingMap.clear(); return committableList; }