diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index 6faed87df..1b05453ad 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -125,6 +125,7 @@ public SplitEnumerator restoreEnumera SplitEnumeratorContext context, PendingSplitsCheckpoint checkpoint) throws Exception { Collection splits = checkpoint.getSplits(); + LOG.info("Restore splits from checkpoint, size {}, splits {}", splits.size(), splits); DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(splits); return new DorisSourceEnumerator(context, splitAssigner); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java index ee96f6873..7948244ca 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -52,6 +52,7 @@ public void addSplits(Collection splits) { @Override public PendingSplitsCheckpoint snapshotState(long checkpointId) { + LOG.info("Snapshot splits {} for checkpoint {}", splits, checkpointId); return new PendingSplitsCheckpoint(splits); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java index 65fcc6fa7..be93f39dc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java @@ -85,7 +85,7 @@ public void addSplitsBack(List splits, int subtaskId) { @Override public void addReader(int subtaskId) { - // do nothing + LOG.info("add reader: {}", subtaskId); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java index 7e2a72087..dce23ee20 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java @@ -27,6 +27,8 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.source.split.DorisSourceSplit; import org.apache.doris.flink.source.split.DorisSourceSplitState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; @@ -36,6 +38,8 @@ public class DorisSourceReader extends SingleThreadMultiplexSourceReaderBase< List, T, DorisSourceSplit, DorisSourceSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(DorisSourceReader.class); + public DorisSourceReader( DorisOptions options, DorisReadOptions readOptions, @@ -64,6 +68,7 @@ protected void onSplitFinished(Map finishedSplitI @Override protected DorisSourceSplitState initializedState(DorisSourceSplit split) { + LOG.info("Initialized reader state for split: {}", split); return new DorisSourceSplitState(split); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index c9ed6f9ce..39187f87c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -74,12 +74,22 @@ private void checkSplitOrStartNext() throws IOException, DorisException { throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); + LOG.info("Fetch a new split {}", nextSplit); valueReader = ValueReader.createReader( nextSplit.getPartitionDefinition(), options, readOptions, LOG); } private DorisSplitRecords finishSplit() { + if (valueReader != null) { + try { + valueReader.close(); + } catch (Exception e) { + LOG.warn("Error while closing value reader: {}", e.getMessage()); + } + valueReader = null; + } + final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); currentSplitId = null; return finishRecords; @@ -87,7 +97,7 @@ private DorisSplitRecords finishSplit() { @Override public void handleSplitsChanges(SplitsChange splitsChange) { - LOG.debug("Handling split change {}", splitsChange); + LOG.info("Handling split change {}", splitsChange); splits.addAll(splitsChange.splits()); }