From 000f2b57d4245df722d0efcd1947523a0ee3b2b0 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 10 Sep 2024 17:06:51 +0800 Subject: [PATCH] update --- .../flink/source/reader/DorisSourceSplitReader.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index ce863377b..a0337b9f5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -66,7 +66,7 @@ public RecordsWithSplitIds fetch() throws IOException { } private void checkSplitOrStartNext() throws IOException, DorisException { - if (valueReader != null && valueReader.hasNext()) { + if (valueReader != null) { return; } final DorisSourceSplit nextSplit = splits.poll(); @@ -74,14 +74,23 @@ private void checkSplitOrStartNext() throws IOException, DorisException { throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); - LOG.info("currentSplitId {}, split {}", currentSplitId, nextSplit); + LOG.info("Fetch a new split {}", nextSplit); valueReader = ValueReader.createReader( nextSplit.getPartitionDefinition(), options, readOptions, LOG); } private DorisSplitRecords finishSplit() { + if (valueReader != null) { + try { + valueReader.close(); + } catch (Exception e) { + LOG.warn("Error while closing value reader: {}", e.getMessage()); + } + valueReader = null; + } LOG.info("Finished split {}", currentSplitId); + final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); currentSplitId = null; LOG.info("After Finished split {}, {} ", currentSplitId, finishRecords.finishedSplits());