From be93f571bfef7cbca54416cb3b792b1d12071b21 Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Mon, 30 Oct 2023 19:37:22 +0800 Subject: [PATCH] [flink] support the feature of ContinuousFileStoreSource using consumer-id. --- docs/content/how-to/querying-tables.md | 11 +++ .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 11 +++ .../source/ContinuousFileSplitEnumerator.java | 71 ++++++++++++++- .../flink/source/FileStoreSourceReader.java | 17 ++++ .../flink/source/FlinkSourceBuilder.java | 3 +- .../source/ReaderConsumeProgressEvent.java | 48 ++++++++++ .../AlignedContinuousFileSplitEnumerator.java | 2 +- .../assigners/AlignedSplitAssigner.java | 12 ++- .../source/assigners/FIFOSplitAssigner.java | 10 ++ .../assigners/PreAssignSplitAssigner.java | 11 +++ .../flink/source/assigners/SplitAssigner.java | 4 + .../paimon/flink/utils/TableScanUtils.java | 11 +++ .../ContinuousFileSplitEnumeratorTest.java | 91 ++++++++++++++++++- .../source/FileStoreSourceReaderTest.java | 22 +++++ .../source/align/AlignedSourceReaderTest.java | 7 ++ 16 files changed, 328 insertions(+), 9 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ReaderConsumeProgressEvent.java diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md index e2b9fc91277d5..4604fcb797be6 100644 --- a/docs/content/how-to/querying-tables.md +++ b/docs/content/how-to/querying-tables.md @@ -332,6 +332,17 @@ NOTE: The consumer will prevent expiration of the snapshot. You can specify 'con lifetime of consumers. {{< /hint >}} +You can set `consumer.use-legacy-mode` to `false` to use the consumer function implemented based on +[flip-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface), which can provide +more capabilities, such as watermark alignment. + +{{< hint warning >}} +1. When there is no watermark definition, the consumer in non-legacy mode cannot provide the ability to pass the +watermark in the snapshot to the downstream. +2. Since the implementation of legacy mode and non-legacy mode are completely different, the state of flink is +incompatible and cannot be restored from the state when switching modes. +{{< /hint >}} + You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID. {{< hint info >}} diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index fba0ce19d9386..2cfaeae8d6ce5 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -110,6 +110,12 @@ Duration The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value. + +
consumer.use-legacy-mode
+ true + Boolean + Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true. +
continuous.discovery-interval
10 s diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 643aee75fc54a..5751f4ff5ba55 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -705,6 +705,13 @@ public class CoreOptions implements Serializable { "The expiration interval of consumer files. A consumer file will be expired if " + "it's lifetime after last modification is over this value."); + public static final ConfigOption CONSUMER_WITH_LEGACY_MODE = + key("consumer.use-legacy-mode") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true."); + public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM = key("dynamic-bucket.target-row-num") .longType() @@ -1303,6 +1310,10 @@ public Duration consumerExpireTime() { return options.get(CONSUMER_EXPIRATION_TIME); } + public boolean consumerWithLegacyMode() { + return options.get(CONSUMER_WITH_LEGACY_MODE); + } + public boolean partitionedTableInMetastore() { return options.get(METASTORE_PARTITIONED_TABLE); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index b5ba3d839ca90..fb55ed05224f4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; +import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; @@ -47,7 +48,11 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; +import java.util.TreeMap; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -71,6 +76,12 @@ public class ContinuousFileSplitEnumerator protected final SplitAssigner splitAssigner; + private final TreeMap minNextSnapshotPerCheckpoint; + + private final Map assignedSnapshotPerReader; + + private final Map consumingSnapshotPerReader; + @Nullable protected Long nextSnapshotId; protected boolean finished = false; @@ -93,6 +104,10 @@ public ContinuousFileSplitEnumerator( this.scan = scan; this.splitAssigner = createSplitAssigner(bucketMode); addSplits(remainSplits); + + this.minNextSnapshotPerCheckpoint = new TreeMap<>(); + this.assignedSnapshotPerReader = new HashMap<>(context.currentParallelism()); + this.consumingSnapshotPerReader = new HashMap<>(context.currentParallelism()); } @VisibleForTesting @@ -140,7 +155,13 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { - LOG.error("Received unrecognized event: {}", sourceEvent); + if (sourceEvent instanceof ReaderConsumeProgressEvent) { + long currentSnapshotId = + ((ReaderConsumeProgressEvent) sourceEvent).lastConsumeSnapshotId(); + consumingSnapshotPerReader.put(subtaskId, currentSnapshotId); + } else { + LOG.error("Received unrecognized event: {}", sourceEvent); + } } @Override @@ -156,9 +177,23 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception new PendingSplitsCheckpoint(splits, nextSnapshotId); LOG.debug("Source Checkpoint is {}", checkpoint); + + computeMinNextSnapshotId() + .ifPresent( + minNextSnapshotId -> + minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId)); return checkpoint; } + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + NavigableMap nextSnapshots = + minNextSnapshotPerCheckpoint.headMap(checkpointId, true); + OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max(); + max.ifPresent(scan::notifyCheckpointComplete); + nextSnapshots.clear(); + } + // ------------------------------------------------------------------------ // this need to be synchronized because scan object is not thread safe. handleSplitRequest and @@ -218,6 +253,8 @@ protected synchronized void assignSplits() { List splits = splitAssigner.getNext(task, null); if (splits.size() > 0) { assignment.put(task, splits); + TableScanUtils.getSnapshotId(splits.get(0)) + .ifPresent(snapshotId -> assignedSnapshotPerReader.put(task, snapshotId)); } } @@ -250,6 +287,38 @@ protected boolean noMoreSplits() { return finished; } + /** Calculate the minimum snapshot currently being consumed by all readers. */ + private Optional computeMinNextSnapshotId() { + long globalMinSnapshotId = Long.MAX_VALUE; + for (int subtask = 0; subtask < context.currentParallelism(); subtask++) { + // 1. if the reader is in the waiting list, it means that all allocated splits have been + // consumed, and the next snapshotId is calculated from splitAssigner. + // + // 2. if the reader is not in the waiting list, the larger value between the consumption + // progress reported by the reader and the most recently assigned snapshot id is used. + Long snapshotIdForTask; + if (readersAwaitingSplit.contains(subtask)) { + snapshotIdForTask = splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId); + } else { + Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask); + Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask); + if (consumingSnapshotId != null && assignedSnapshotId != null) { + snapshotIdForTask = Math.max(consumingSnapshotId, assignedSnapshotId); + } else { + snapshotIdForTask = + consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId; + } + } + + if (snapshotIdForTask != null) { + globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForTask); + } else { + return Optional.empty(); + } + } + return Optional.of(globalMinSnapshotId); + } + /** The result of scan. */ protected static class PlanWithNextSnapshotId { private final TableScan.Plan plan; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java index 155fbd5da9c1c..ed44ac67effc9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java @@ -20,6 +20,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; +import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.table.source.TableRead; import org.apache.flink.api.connector.source.SourceReader; @@ -33,6 +34,7 @@ import javax.annotation.Nullable; import java.util.Map; +import java.util.Optional; /** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */ public class FileStoreSourceReader @@ -40,6 +42,7 @@ public class FileStoreSourceReader RecordIterator, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> { @Nullable private final IOManager ioManager; + private long lastConsumeSnapshotId = Long.MIN_VALUE; public FileStoreSourceReader( SourceReaderContext readerContext, @@ -93,6 +96,20 @@ protected void onSplitFinished(Map finishedSp if (getNumberOfCurrentlyAssignedSplits() == 0) { context.sendSplitRequest(); } + + long maxFinishedSplits = + finishedSplitIds.values().stream() + .map(splitState -> TableScanUtils.getSnapshotId(splitState.toSourceSplit())) + .filter(Optional::isPresent) + .mapToLong(Optional::get) + .max() + .orElse(Long.MIN_VALUE); + + if (lastConsumeSnapshotId < maxFinishedSplits) { + lastConsumeSnapshotId = maxFinishedSplits; + context.sendSourceEventToCoordinator( + new ReaderConsumeProgressEvent(lastConsumeSnapshotId)); + } } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 0b7c835ffafbf..9b57293c06349 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -215,7 +215,8 @@ public DataStream build() { } else { if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { return buildAlignedContinuousFileSource(); - } else if (conf.contains(CoreOptions.CONSUMER_ID)) { + } else if (conf.contains(CoreOptions.CONSUMER_ID) + && conf.get(CoreOptions.CONSUMER_WITH_LEGACY_MODE)) { return buildContinuousStreamOperator(); } else { return buildContinuousFileSource(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ReaderConsumeProgressEvent.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ReaderConsumeProgressEvent.java new file mode 100644 index 0000000000000..f83d4b1814313 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ReaderConsumeProgressEvent.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * Event sent from {@link FileStoreSourceReader} to {@link ContinuousFileSplitEnumerator} to + * describe the current consumption progress. + */ +public class ReaderConsumeProgressEvent implements SourceEvent { + + private static final long serialVersionUID = -1L; + + private final long lastConsumeSnapshotId; + + public ReaderConsumeProgressEvent(long lastConsumeSnapshotId) { + this.lastConsumeSnapshotId = lastConsumeSnapshotId; + } + + public long lastConsumeSnapshotId() { + return lastConsumeSnapshotId; + } + + @Override + public String toString() { + return "ReaderConsumeProgressEvent{" + + "lastConsumeSnapshotId=" + + lastConsumeSnapshotId + + '}'; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java index 379a3ffda7e48..62d0abae30339 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java @@ -164,7 +164,7 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception assignSplits(); } Preconditions.checkArgument(alignedAssigner.isAligned()); - lastConsumedSnapshotId = alignedAssigner.minRemainingSnapshotId(); + lastConsumedSnapshotId = alignedAssigner.getNextSnapshotId(0).orElse(null); alignedAssigner.removeFirst(); currentCheckpointId = checkpointId; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java index bb03fb454be57..eba25a1afaf7e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java @@ -33,6 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; /** * Splits are allocated at the granularity of snapshots. When the splits of the current snapshot are @@ -98,6 +99,12 @@ public Collection remainingSplits() { return remainingSplits; } + @Override + public Optional getNextSnapshotId(int subtask) { + PendingSnapshot head = pendingSplitAssignment.peek(); + return Optional.ofNullable(head != null ? head.snapshotId : null); + } + public boolean isAligned() { PendingSnapshot head = pendingSplitAssignment.peek(); return head != null && head.empty(); @@ -114,11 +121,6 @@ public void removeFirst() { "The head pending splits is not empty. This is a bug, please file an issue."); } - public Long minRemainingSnapshotId() { - PendingSnapshot head = pendingSplitAssignment.peek(); - return head != null ? head.snapshotId : null; - } - private static class PendingSnapshot { private final long snapshotId; private final boolean isPlaceHolder; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java index 52cf28f211324..7af226ab015d3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java @@ -28,6 +28,9 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; +import java.util.Optional; + +import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId; /** * Splits are assigned preemptively in the order requested by the task. Only one split is assigned @@ -65,4 +68,11 @@ public void addSplitsBack(int subtask, List splits) { public Collection remainingSplits() { return new ArrayList<>(pendingSplitAssignment); } + + @Override + public Optional getNextSnapshotId(int subtask) { + return pendingSplitAssignment.isEmpty() + ? Optional.empty() + : getSnapshotId(pendingSplitAssignment.peekFirst()); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java index 63e24026ff35f..118ed109eec9a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java @@ -32,8 +32,11 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Optional; import java.util.Queue; +import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId; + /** * Pre-calculate which splits each task should process according to the weight, and then distribute * the splits fairly. @@ -104,4 +107,12 @@ private static Map> createBatchFairSpl } return assignment; } + + @Override + public Optional getNextSnapshotId(int subtask) { + LinkedList pendingSplits = pendingSplitAssignment.get(subtask); + return (pendingSplits == null || pendingSplits.isEmpty()) + ? Optional.empty() + : getSnapshotId(pendingSplits.peekFirst()); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java index e89d9fa63bdea..fecd59c8872ac 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; +import java.util.Optional; /** * The {@code SplitAssigner} is responsible for deciding what splits should be processed next by @@ -50,4 +51,7 @@ public interface SplitAssigner { /** Gets the remaining splits that this assigner has pending. */ Collection remainingSplits(); + + /** Gets the snapshot id of the next split. */ + Optional getNextSnapshotId(int subtask); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java index 05486972f7bc7..a5645302f93f3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java @@ -19,10 +19,13 @@ package org.apache.paimon.flink.utils; import org.apache.paimon.CoreOptions; +import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.TableScan; import java.util.HashMap; +import java.util.Optional; /** Utility methods for {@link TableScan}, such as validating. */ public class TableScanUtils { @@ -48,4 +51,12 @@ public static void streamingReadingValidate(Table table) { } } } + + /** Get snapshot id from {@link FileStoreSourceSplit}. */ + public static Optional getSnapshotId(FileStoreSourceSplit split) { + if (split.split() instanceof DataSplit) { + return Optional.of(((DataSplit) split.split()).snapshotId()); + } + return Optional.empty(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java index d02cd2b89a33e..c5c71b569d25b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -706,6 +707,86 @@ public void testEnumeratorWithCheckpoint() { assertThat(toDataSplits(state.splits())).containsExactlyElementsOf(expectedResults.get(2L)); } + @Test + public void testEnumeratorWithConsumer() throws Exception { + final TestingAsyncSplitEnumeratorContext context = + new TestingAsyncSplitEnumeratorContext<>(3); + for (int i = 0; i < 3; i++) { + context.registerReader(i, "test-host"); + } + + // prepare test data + TreeMap dataSplits = new TreeMap<>(); + for (int i = 1; i <= 2; i++) { + dataSplits.put( + (long) i, + new DataFilePlan( + Arrays.asList( + createDataSplit(i, 0, Collections.emptyList()), + createDataSplit(i, 2, Collections.emptyList())))); + } + MockScan scan = new MockScan(dataSplits); + + final ContinuousFileSplitEnumerator enumerator = + new Builder() + .setSplitEnumeratorContext(context) + .setInitialSplits(Collections.emptyList()) + .setDiscoveryInterval(1) + .setScan(scan) + .build(); + enumerator.start(); + + long checkpointId = 1L; + + // request for splits + for (int i = 0; i < 3; i++) { + enumerator.handleSplitRequest(i, "test-host"); + } + + // checkpoint is triggered for the first time and no snapshot is found + triggerCheckpointAndComplete(enumerator, checkpointId++); + assertThat(scan.getNextSnapshotIdForConsumer()).isNull(); + + // find a new snapshot and trigger for the second checkpoint, but no snapshot is consumed + scanNextSnapshot(context); + triggerCheckpointAndComplete(enumerator, checkpointId++); + assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(1L); + + // subtask-0 has consumed the snapshot-1 and trigger for the next checkpoint + enumerator.handleSourceEvent(0, new ReaderConsumeProgressEvent(1L)); + triggerCheckpointAndComplete(enumerator, checkpointId++); + assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(1L); + + // subtask-2 has consumed the snapshot-1 and trigger for the next checkpoint + enumerator.handleSourceEvent(2, new ReaderConsumeProgressEvent(1L)); + triggerCheckpointAndComplete(enumerator, checkpointId++); + assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(1L); + + // subtask-0 and subtask-2 request for the next splits but there are no new snapshot + enumerator.handleSplitRequest(0, "test-host"); + enumerator.handleSplitRequest(2, "test-host"); + triggerCheckpointAndComplete(enumerator, checkpointId++); + assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(2L); + + // find next snapshot and trigger for the next checkpoint, subtask-0 and subtask-2 has been + // assigned new snapshot + scanNextSnapshot(context); + triggerCheckpointAndComplete(enumerator, checkpointId++); + assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(2L); + } + + private void triggerCheckpointAndComplete( + ContinuousFileSplitEnumerator enumerator, long checkpointId) throws Exception { + enumerator.snapshotState(checkpointId); + enumerator.notifyCheckpointComplete(checkpointId); + } + + private void scanNextSnapshot( + TestingAsyncSplitEnumeratorContext context) { + context.workerExecutor.triggerPeriodicScheduledTasks(); + context.triggerAlCoordinatorAction(); + } + private static PendingSplitsCheckpoint checkpointWithoutException( ContinuousFileSplitEnumerator enumerator, long checkpointId) { try { @@ -792,10 +873,12 @@ private static class MockScan implements StreamTableScan { private final TreeMap results; private @Nullable Long nextSnapshotId; private boolean allowEnd = true; + private Long nextSnapshotIdForConsumer; public MockScan(TreeMap results) { this.results = results; this.nextSnapshotId = null; + this.nextSnapshotIdForConsumer = null; } @Override @@ -823,7 +906,9 @@ public Long checkpoint() { } @Override - public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {} + public void notifyCheckpointComplete(@Nullable Long nextSnapshot) { + nextSnapshotIdForConsumer = nextSnapshot; + } @Nullable @Override @@ -837,6 +922,10 @@ public void restore(Long state) {} public void allowEnd(boolean allowEnd) { this.allowEnd = allowEnd; } + + public Long getNextSnapshotIdForConsumer() { + return nextSnapshotIdForConsumer; + } } private static class TestingAsyncSplitEnumeratorContext diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java index b92d30f32f66f..6db17aff61f62 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.table.data.RowData; @@ -36,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.apache.paimon.flink.source.FileStoreSourceSplitSerializerTest.newSourceSplit; import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; @@ -104,6 +106,26 @@ public void testAddMultipleSplits() throws Exception { assertThat(context.getNumSplitRequests()).isEqualTo(2); } + @Test + public void testReaderOnSplitFinished() throws Exception { + final TestingReaderContext context = new TestingReaderContext(); + final FileStoreSourceReader reader = createReader(context); + + reader.start(); + reader.addSplits(Collections.singletonList(createTestFileSplit("id1"))); + TestingReaderOutput output = new TestingReaderOutput<>(); + while (reader.getNumberOfCurrentlyAssignedSplits() > 0) { + reader.pollNext(output); + Thread.sleep(10); + } + + List sourceEvents = context.getSentEvents(); + assertThat(sourceEvents.size()).isEqualTo(1); + assertThat(sourceEvents.get(0)).isExactlyInstanceOf(ReaderConsumeProgressEvent.class); + assertThat(((ReaderConsumeProgressEvent) sourceEvents.get(0))) + .matches(event -> event.lastConsumeSnapshotId() == 1L); + } + protected FileStoreSourceReader createReader(TestingReaderContext context) { return new FileStoreSourceReader( context, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java index 0d38ae3cf0984..37e6260692d63 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.table.data.RowData; +import org.junit.Ignore; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -60,6 +61,12 @@ public void testAddMultipleSplits() throws Exception { assertThat(context.getNumSplitRequests()).isEqualTo(2); } + @Override + @Ignore + public void testReaderOnSplitFinished() throws Exception { + // ignore + } + @Override protected FileStoreSourceReader createReader(TestingReaderContext context) { return new AlignedSourceReader(