diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index e2b9fc91277d5..4604fcb797be6 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -332,6 +332,17 @@ NOTE: The consumer will prevent expiration of the snapshot. You can specify 'con
lifetime of consumers.
{{< /hint >}}
+You can set `consumer.use-legacy-mode` to `false` to use the consumer function implemented based on
+[flip-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface), which can provide
+more capabilities, such as watermark alignment.
+
+{{< hint warning >}}
+1. When there is no watermark definition, the consumer in non-legacy mode cannot provide the ability to pass the
+watermark in the snapshot to the downstream.
+2. Since the implementation of legacy mode and non-legacy mode are completely different, the state of flink is
+incompatible and cannot be restored from the state when switching modes.
+{{< /hint >}}
+
You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID.
{{< hint info >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index fba0ce19d9386..2cfaeae8d6ce5 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@
Duration |
The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value. |
+
+ consumer.use-legacy-mode |
+ true |
+ Boolean |
+ Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true. |
+
continuous.discovery-interval |
10 s |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 643aee75fc54a..5751f4ff5ba55 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -705,6 +705,13 @@ public class CoreOptions implements Serializable {
"The expiration interval of consumer files. A consumer file will be expired if "
+ "it's lifetime after last modification is over this value.");
+ public static final ConfigOption CONSUMER_WITH_LEGACY_MODE =
+ key("consumer.use-legacy-mode")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true.");
+
public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
@@ -1303,6 +1310,10 @@ public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}
+ public boolean consumerWithLegacyMode() {
+ return options.get(CONSUMER_WITH_LEGACY_MODE);
+ }
+
public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index b5ba3d839ca90..fb55ed05224f4 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -22,6 +22,7 @@
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -47,7 +48,11 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
+import java.util.TreeMap;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -71,6 +76,12 @@ public class ContinuousFileSplitEnumerator
protected final SplitAssigner splitAssigner;
+ private final TreeMap minNextSnapshotPerCheckpoint;
+
+ private final Map assignedSnapshotPerReader;
+
+ private final Map consumingSnapshotPerReader;
+
@Nullable protected Long nextSnapshotId;
protected boolean finished = false;
@@ -93,6 +104,10 @@ public ContinuousFileSplitEnumerator(
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
addSplits(remainSplits);
+
+ this.minNextSnapshotPerCheckpoint = new TreeMap<>();
+ this.assignedSnapshotPerReader = new HashMap<>(context.currentParallelism());
+ this.consumingSnapshotPerReader = new HashMap<>(context.currentParallelism());
}
@VisibleForTesting
@@ -140,7 +155,13 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
- LOG.error("Received unrecognized event: {}", sourceEvent);
+ if (sourceEvent instanceof ReaderConsumeProgressEvent) {
+ long currentSnapshotId =
+ ((ReaderConsumeProgressEvent) sourceEvent).lastConsumeSnapshotId();
+ consumingSnapshotPerReader.put(subtaskId, currentSnapshotId);
+ } else {
+ LOG.error("Received unrecognized event: {}", sourceEvent);
+ }
}
@Override
@@ -156,9 +177,23 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception
new PendingSplitsCheckpoint(splits, nextSnapshotId);
LOG.debug("Source Checkpoint is {}", checkpoint);
+
+ computeMinNextSnapshotId()
+ .ifPresent(
+ minNextSnapshotId ->
+ minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
return checkpoint;
}
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ NavigableMap nextSnapshots =
+ minNextSnapshotPerCheckpoint.headMap(checkpointId, true);
+ OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
+ max.ifPresent(scan::notifyCheckpointComplete);
+ nextSnapshots.clear();
+ }
+
// ------------------------------------------------------------------------
// this need to be synchronized because scan object is not thread safe. handleSplitRequest and
@@ -218,6 +253,8 @@ protected synchronized void assignSplits() {
List splits = splitAssigner.getNext(task, null);
if (splits.size() > 0) {
assignment.put(task, splits);
+ TableScanUtils.getSnapshotId(splits.get(0))
+ .ifPresent(snapshotId -> assignedSnapshotPerReader.put(task, snapshotId));
}
}
@@ -250,6 +287,38 @@ protected boolean noMoreSplits() {
return finished;
}
+ /** Calculate the minimum snapshot currently being consumed by all readers. */
+ private Optional computeMinNextSnapshotId() {
+ long globalMinSnapshotId = Long.MAX_VALUE;
+ for (int subtask = 0; subtask < context.currentParallelism(); subtask++) {
+ // 1. if the reader is in the waiting list, it means that all allocated splits have been
+ // consumed, and the next snapshotId is calculated from splitAssigner.
+ //
+ // 2. if the reader is not in the waiting list, the larger value between the consumption
+ // progress reported by the reader and the most recently assigned snapshot id is used.
+ Long snapshotIdForTask;
+ if (readersAwaitingSplit.contains(subtask)) {
+ snapshotIdForTask = splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId);
+ } else {
+ Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask);
+ Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask);
+ if (consumingSnapshotId != null && assignedSnapshotId != null) {
+ snapshotIdForTask = Math.max(consumingSnapshotId, assignedSnapshotId);
+ } else {
+ snapshotIdForTask =
+ consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId;
+ }
+ }
+
+ if (snapshotIdForTask != null) {
+ globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForTask);
+ } else {
+ return Optional.empty();
+ }
+ }
+ return Optional.of(globalMinSnapshotId);
+ }
+
/** The result of scan. */
protected static class PlanWithNextSnapshotId {
private final TableScan.Plan plan;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 155fbd5da9c1c..ed44ac67effc9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -20,6 +20,7 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
+import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.SourceReader;
@@ -33,6 +34,7 @@
import javax.annotation.Nullable;
import java.util.Map;
+import java.util.Optional;
/** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
public class FileStoreSourceReader
@@ -40,6 +42,7 @@ public class FileStoreSourceReader
RecordIterator, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {
@Nullable private final IOManager ioManager;
+ private long lastConsumeSnapshotId = Long.MIN_VALUE;
public FileStoreSourceReader(
SourceReaderContext readerContext,
@@ -93,6 +96,20 @@ protected void onSplitFinished(Map finishedSp
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}
+
+ long maxFinishedSplits =
+ finishedSplitIds.values().stream()
+ .map(splitState -> TableScanUtils.getSnapshotId(splitState.toSourceSplit()))
+ .filter(Optional::isPresent)
+ .mapToLong(Optional::get)
+ .max()
+ .orElse(Long.MIN_VALUE);
+
+ if (lastConsumeSnapshotId < maxFinishedSplits) {
+ lastConsumeSnapshotId = maxFinishedSplits;
+ context.sendSourceEventToCoordinator(
+ new ReaderConsumeProgressEvent(lastConsumeSnapshotId));
+ }
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 0b7c835ffafbf..9b57293c06349 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -215,7 +215,8 @@ public DataStream build() {
} else {
if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
return buildAlignedContinuousFileSource();
- } else if (conf.contains(CoreOptions.CONSUMER_ID)) {
+ } else if (conf.contains(CoreOptions.CONSUMER_ID)
+ && conf.get(CoreOptions.CONSUMER_WITH_LEGACY_MODE)) {
return buildContinuousStreamOperator();
} else {
return buildContinuousFileSource();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ReaderConsumeProgressEvent.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ReaderConsumeProgressEvent.java
new file mode 100644
index 0000000000000..f83d4b1814313
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ReaderConsumeProgressEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/**
+ * Event sent from {@link FileStoreSourceReader} to {@link ContinuousFileSplitEnumerator} to
+ * describe the current consumption progress.
+ */
+public class ReaderConsumeProgressEvent implements SourceEvent {
+
+ private static final long serialVersionUID = -1L;
+
+ private final long lastConsumeSnapshotId;
+
+ public ReaderConsumeProgressEvent(long lastConsumeSnapshotId) {
+ this.lastConsumeSnapshotId = lastConsumeSnapshotId;
+ }
+
+ public long lastConsumeSnapshotId() {
+ return lastConsumeSnapshotId;
+ }
+
+ @Override
+ public String toString() {
+ return "ReaderConsumeProgressEvent{"
+ + "lastConsumeSnapshotId="
+ + lastConsumeSnapshotId
+ + '}';
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 379a3ffda7e48..62d0abae30339 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -164,7 +164,7 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception
assignSplits();
}
Preconditions.checkArgument(alignedAssigner.isAligned());
- lastConsumedSnapshotId = alignedAssigner.minRemainingSnapshotId();
+ lastConsumedSnapshotId = alignedAssigner.getNextSnapshotId(0).orElse(null);
alignedAssigner.removeFirst();
currentCheckpointId = checkpointId;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
index bb03fb454be57..eba25a1afaf7e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
@@ -33,6 +33,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* Splits are allocated at the granularity of snapshots. When the splits of the current snapshot are
@@ -98,6 +99,12 @@ public Collection remainingSplits() {
return remainingSplits;
}
+ @Override
+ public Optional getNextSnapshotId(int subtask) {
+ PendingSnapshot head = pendingSplitAssignment.peek();
+ return Optional.ofNullable(head != null ? head.snapshotId : null);
+ }
+
public boolean isAligned() {
PendingSnapshot head = pendingSplitAssignment.peek();
return head != null && head.empty();
@@ -114,11 +121,6 @@ public void removeFirst() {
"The head pending splits is not empty. This is a bug, please file an issue.");
}
- public Long minRemainingSnapshotId() {
- PendingSnapshot head = pendingSplitAssignment.peek();
- return head != null ? head.snapshotId : null;
- }
-
private static class PendingSnapshot {
private final long snapshotId;
private final boolean isPlaceHolder;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
index 52cf28f211324..7af226ab015d3 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
@@ -28,6 +28,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import java.util.Optional;
+
+import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId;
/**
* Splits are assigned preemptively in the order requested by the task. Only one split is assigned
@@ -65,4 +68,11 @@ public void addSplitsBack(int subtask, List splits) {
public Collection remainingSplits() {
return new ArrayList<>(pendingSplitAssignment);
}
+
+ @Override
+ public Optional getNextSnapshotId(int subtask) {
+ return pendingSplitAssignment.isEmpty()
+ ? Optional.empty()
+ : getSnapshotId(pendingSplitAssignment.peekFirst());
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index 63e24026ff35f..118ed109eec9a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -32,8 +32,11 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
+import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId;
+
/**
* Pre-calculate which splits each task should process according to the weight, and then distribute
* the splits fairly.
@@ -104,4 +107,12 @@ private static Map> createBatchFairSpl
}
return assignment;
}
+
+ @Override
+ public Optional getNextSnapshotId(int subtask) {
+ LinkedList pendingSplits = pendingSplitAssignment.get(subtask);
+ return (pendingSplits == null || pendingSplits.isEmpty())
+ ? Optional.empty()
+ : getSnapshotId(pendingSplits.peekFirst());
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
index e89d9fa63bdea..fecd59c8872ac 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
@@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
/**
* The {@code SplitAssigner} is responsible for deciding what splits should be processed next by
@@ -50,4 +51,7 @@ public interface SplitAssigner {
/** Gets the remaining splits that this assigner has pending. */
Collection remainingSplits();
+
+ /** Gets the snapshot id of the next split. */
+ Optional getNextSnapshotId(int subtask);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index 05486972f7bc7..a5645302f93f3 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -19,10 +19,13 @@
package org.apache.paimon.flink.utils;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
import java.util.HashMap;
+import java.util.Optional;
/** Utility methods for {@link TableScan}, such as validating. */
public class TableScanUtils {
@@ -48,4 +51,12 @@ public static void streamingReadingValidate(Table table) {
}
}
}
+
+ /** Get snapshot id from {@link FileStoreSourceSplit}. */
+ public static Optional getSnapshotId(FileStoreSourceSplit split) {
+ if (split.split() instanceof DataSplit) {
+ return Optional.of(((DataSplit) split.split()).snapshotId());
+ }
+ return Optional.empty();
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index d02cd2b89a33e..c5c71b569d25b 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -37,6 +37,7 @@
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -706,6 +707,86 @@ public void testEnumeratorWithCheckpoint() {
assertThat(toDataSplits(state.splits())).containsExactlyElementsOf(expectedResults.get(2L));
}
+ @Test
+ public void testEnumeratorWithConsumer() throws Exception {
+ final TestingAsyncSplitEnumeratorContext context =
+ new TestingAsyncSplitEnumeratorContext<>(3);
+ for (int i = 0; i < 3; i++) {
+ context.registerReader(i, "test-host");
+ }
+
+ // prepare test data
+ TreeMap dataSplits = new TreeMap<>();
+ for (int i = 1; i <= 2; i++) {
+ dataSplits.put(
+ (long) i,
+ new DataFilePlan(
+ Arrays.asList(
+ createDataSplit(i, 0, Collections.emptyList()),
+ createDataSplit(i, 2, Collections.emptyList()))));
+ }
+ MockScan scan = new MockScan(dataSplits);
+
+ final ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .build();
+ enumerator.start();
+
+ long checkpointId = 1L;
+
+ // request for splits
+ for (int i = 0; i < 3; i++) {
+ enumerator.handleSplitRequest(i, "test-host");
+ }
+
+ // checkpoint is triggered for the first time and no snapshot is found
+ triggerCheckpointAndComplete(enumerator, checkpointId++);
+ assertThat(scan.getNextSnapshotIdForConsumer()).isNull();
+
+ // find a new snapshot and trigger for the second checkpoint, but no snapshot is consumed
+ scanNextSnapshot(context);
+ triggerCheckpointAndComplete(enumerator, checkpointId++);
+ assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(1L);
+
+ // subtask-0 has consumed the snapshot-1 and trigger for the next checkpoint
+ enumerator.handleSourceEvent(0, new ReaderConsumeProgressEvent(1L));
+ triggerCheckpointAndComplete(enumerator, checkpointId++);
+ assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(1L);
+
+ // subtask-2 has consumed the snapshot-1 and trigger for the next checkpoint
+ enumerator.handleSourceEvent(2, new ReaderConsumeProgressEvent(1L));
+ triggerCheckpointAndComplete(enumerator, checkpointId++);
+ assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(1L);
+
+ // subtask-0 and subtask-2 request for the next splits but there are no new snapshot
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(2, "test-host");
+ triggerCheckpointAndComplete(enumerator, checkpointId++);
+ assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(2L);
+
+ // find next snapshot and trigger for the next checkpoint, subtask-0 and subtask-2 has been
+ // assigned new snapshot
+ scanNextSnapshot(context);
+ triggerCheckpointAndComplete(enumerator, checkpointId++);
+ assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(2L);
+ }
+
+ private void triggerCheckpointAndComplete(
+ ContinuousFileSplitEnumerator enumerator, long checkpointId) throws Exception {
+ enumerator.snapshotState(checkpointId);
+ enumerator.notifyCheckpointComplete(checkpointId);
+ }
+
+ private void scanNextSnapshot(
+ TestingAsyncSplitEnumeratorContext context) {
+ context.workerExecutor.triggerPeriodicScheduledTasks();
+ context.triggerAlCoordinatorAction();
+ }
+
private static PendingSplitsCheckpoint checkpointWithoutException(
ContinuousFileSplitEnumerator enumerator, long checkpointId) {
try {
@@ -792,10 +873,12 @@ private static class MockScan implements StreamTableScan {
private final TreeMap results;
private @Nullable Long nextSnapshotId;
private boolean allowEnd = true;
+ private Long nextSnapshotIdForConsumer;
public MockScan(TreeMap results) {
this.results = results;
this.nextSnapshotId = null;
+ this.nextSnapshotIdForConsumer = null;
}
@Override
@@ -823,7 +906,9 @@ public Long checkpoint() {
}
@Override
- public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {}
+ public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
+ nextSnapshotIdForConsumer = nextSnapshot;
+ }
@Nullable
@Override
@@ -837,6 +922,10 @@ public void restore(Long state) {}
public void allowEnd(boolean allowEnd) {
this.allowEnd = allowEnd;
}
+
+ public Long getNextSnapshotIdForConsumer() {
+ return nextSnapshotIdForConsumer;
+ }
}
private static class TestingAsyncSplitEnumeratorContext
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index b92d30f32f66f..6db17aff61f62 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -27,6 +27,7 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.table.data.RowData;
@@ -36,6 +37,7 @@
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.apache.paimon.flink.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
@@ -104,6 +106,26 @@ public void testAddMultipleSplits() throws Exception {
assertThat(context.getNumSplitRequests()).isEqualTo(2);
}
+ @Test
+ public void testReaderOnSplitFinished() throws Exception {
+ final TestingReaderContext context = new TestingReaderContext();
+ final FileStoreSourceReader reader = createReader(context);
+
+ reader.start();
+ reader.addSplits(Collections.singletonList(createTestFileSplit("id1")));
+ TestingReaderOutput output = new TestingReaderOutput<>();
+ while (reader.getNumberOfCurrentlyAssignedSplits() > 0) {
+ reader.pollNext(output);
+ Thread.sleep(10);
+ }
+
+ List sourceEvents = context.getSentEvents();
+ assertThat(sourceEvents.size()).isEqualTo(1);
+ assertThat(sourceEvents.get(0)).isExactlyInstanceOf(ReaderConsumeProgressEvent.class);
+ assertThat(((ReaderConsumeProgressEvent) sourceEvents.get(0)))
+ .matches(event -> event.lastConsumeSnapshotId() == 1L);
+ }
+
protected FileStoreSourceReader createReader(TestingReaderContext context) {
return new FileStoreSourceReader(
context,
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index 0d38ae3cf0984..37e6260692d63 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -26,6 +26,7 @@
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.table.data.RowData;
+import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -60,6 +61,12 @@ public void testAddMultipleSplits() throws Exception {
assertThat(context.getNumSplitRequests()).isEqualTo(2);
}
+ @Override
+ @Ignore
+ public void testReaderOnSplitFinished() throws Exception {
+ // ignore
+ }
+
@Override
protected FileStoreSourceReader createReader(TestingReaderContext context) {
return new AlignedSourceReader(