diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index d049a2bb9..ca42f0ef3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -169,6 +169,7 @@ public void abortPreCommit(String labelPrefix, long chkID) throws Exception { // Currently, it can only be aborted based on txnid, so we must // first request a streamload based on the label to get the txnid. String label = labelGenerator.generateTableLabel(startChkID); + LOG.info("start a check label {} to load.", label); HttpPutBuilder builder = new HttpPutBuilder(); builder.setUrl(loadUrlStr) .baseAuth(user, passwd) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 54facc704..2b4dbc02f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -43,8 +43,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -133,6 +135,7 @@ public void initializeLoad(Collection state) { private void abortLingeringTransactions(Collection recoveredStates) throws Exception { List alreadyAborts = new ArrayList<>(); + Set alreadyAbortTables = new HashSet<>(); // abort label in state for (DorisWriterState state : recoveredStates) { // Todo: When the sink parallelism is reduced, @@ -152,6 +155,7 @@ private void abortLingeringTransactions(Collection recoveredSt DorisStreamLoad streamLoader = getStreamLoader(key); streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); alreadyAborts.add(state.getLabelPrefix()); + alreadyAbortTables.add(key); } // TODO: In a multi-table scenario, if do not restore from checkpoint, @@ -162,6 +166,19 @@ private void abortLingeringTransactions(Collection recoveredSt // abort current labelPrefix DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier()); streamLoader.abortPreCommit(labelPrefix, curCheckpointId); + alreadyAbortTables.add(dorisOptions.getTableIdentifier()); + } + + LOG.info( + "doris stream loader size is {}, already abort table size is {}", + dorisStreamLoadMap.size(), + alreadyAbortTables.size()); + if (dorisStreamLoadMap.size() != alreadyAbortTables.size()) { + Set diff = new HashSet<>(dorisStreamLoadMap.keySet()); + diff.removeAll(alreadyAbortTables); + LOG.warn( + "there are some tables in dorisStreamLoadMap, but they are not aborted: {}", + diff); } }