diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index ce863377b..a0337b9f5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -66,7 +66,7 @@ public RecordsWithSplitIds fetch() throws IOException { } private void checkSplitOrStartNext() throws IOException, DorisException { - if (valueReader != null && valueReader.hasNext()) { + if (valueReader != null) { return; } final DorisSourceSplit nextSplit = splits.poll(); @@ -74,14 +74,23 @@ private void checkSplitOrStartNext() throws IOException, DorisException { throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); - LOG.info("currentSplitId {}, split {}", currentSplitId, nextSplit); + LOG.info("Fetch a new split {}", nextSplit); valueReader = ValueReader.createReader( nextSplit.getPartitionDefinition(), options, readOptions, LOG); } private DorisSplitRecords finishSplit() { + if (valueReader != null) { + try { + valueReader.close(); + } catch (Exception e) { + LOG.warn("Error while closing value reader: {}", e.getMessage()); + } + valueReader = null; + } LOG.info("Finished split {}", currentSplitId); + final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); currentSplitId = null; LOG.info("After Finished split {}, {} ", currentSplitId, finishRecords.finishedSplits());