diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index 4604fcb797be..223c6c7e95af 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -332,15 +332,16 @@ NOTE: The consumer will prevent expiration of the snapshot. You can specify 'con
lifetime of consumers.
{{< /hint >}}
-You can set `consumer.use-legacy-mode` to `false` to use the consumer function implemented based on
-[flip-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface), which can provide
-more capabilities, such as watermark alignment.
+By default, the consumer uses `exactly-once` mode to record consumption progress, which strictly ensures that what is
+recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed. You can set `consumer.mode` to
+`at-least-once` to allow readers consume snapshots at different rates and record the slowest snapshot-id among all
+readers into the consumer. This mode can provide more capabilities, such as watermark alignment.
{{< hint warning >}}
-1. When there is no watermark definition, the consumer in non-legacy mode cannot provide the ability to pass the
+1. When there is no watermark definition, the consumer in `at-least-once` mode cannot provide the ability to pass the
watermark in the snapshot to the downstream.
-2. Since the implementation of legacy mode and non-legacy mode are completely different, the state of flink is
-incompatible and cannot be restored from the state when switching modes.
+2. Since the implementation of `exactly-once` mode and `at-least-once` mode are completely different, the state of
+flink is incompatible and cannot be restored from the state when switching modes.
{{< /hint >}}
You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 79f36207e1f6..d6c533a7fd49 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -111,10 +111,10 @@
The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value. |
- consumer.use-legacy-mode |
- true |
+ consumer.mode |
+ exactly-once |
Boolean |
- Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true. |
+ Specify the consumer consistency mode for table.
Possible values:- "exactly-once": Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed.
- "at-least-once": Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.
|
continuous.discovery-interval |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index f55570db6db0..55057e36eeee 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -709,12 +709,11 @@ public class CoreOptions implements Serializable {
"The expiration interval of consumer files. A consumer file will be expired if "
+ "it's lifetime after last modification is over this value.");
- public static final ConfigOption CONSUMER_WITH_LEGACY_MODE =
- key("consumer.use-legacy-mode")
- .booleanType()
- .defaultValue(true)
- .withDescription(
- "Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true.");
+ public static final ConfigOption CONSUMER_CONSISTENCY_MODE =
+ key("consumer.mode")
+ .enumType(ConsumerMode.class)
+ .defaultValue(ConsumerMode.EXACTLY_ONCE)
+ .withDescription("Specify the consumer consistency mode for table.");
public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
@@ -1314,8 +1313,8 @@ public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}
- public boolean consumerWithLegacyMode() {
- return options.get(CONSUMER_WITH_LEGACY_MODE);
+ public ConsumerMode consumerWithLegacyMode() {
+ return options.get(CONSUMER_CONSISTENCY_MODE);
}
public boolean partitionedTableInMetastore() {
@@ -1962,4 +1961,33 @@ public InlineElement getDescription() {
return text(description);
}
}
+
+ /** Specifies the log consistency mode for table. */
+ public enum ConsumerMode implements DescribedEnum {
+ EXACTLY_ONCE(
+ "exactly-once",
+ "Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed."),
+
+ AT_LEAST_ONCE(
+ "at-least-once",
+ "Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.");
+
+ private final String value;
+ private final String description;
+
+ ConsumerMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java
new file mode 100644
index 000000000000..f88428a2e824
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.flink.utils.TableScanUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+/** Calculator for calculating consumer consumption progress. */
+public class ConsumerProgressCalculator {
+ private final TreeMap minNextSnapshotPerCheckpoint;
+
+ private final Map assignedSnapshotPerReader;
+
+ private final Map consumingSnapshotPerReader;
+
+ public ConsumerProgressCalculator(int parallelism) {
+ this.minNextSnapshotPerCheckpoint = new TreeMap<>();
+ this.assignedSnapshotPerReader = new HashMap<>(parallelism);
+ this.consumingSnapshotPerReader = new HashMap<>(parallelism);
+ }
+
+ public void updateConsumeProgress(int subtaskId, ReaderConsumeProgressEvent event) {
+ consumingSnapshotPerReader.put(subtaskId, event.lastConsumeSnapshotId());
+ }
+
+ public void updateAssignInformation(int subtaskId, FileStoreSourceSplit split) {
+ TableScanUtils.getSnapshotId(split)
+ .ifPresent(snapshotId -> assignedSnapshotPerReader.put(subtaskId, snapshotId));
+ }
+
+ public void notifySnapshotState(
+ long checkpointId,
+ Set readersAwaitingSplit,
+ Function unassignedCalculationFunction,
+ int parallelism) {
+ computeMinNextSnapshotId(readersAwaitingSplit, unassignedCalculationFunction, parallelism)
+ .ifPresent(
+ minNextSnapshotId ->
+ minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
+ }
+
+ public OptionalLong notifyCheckpointComplete(long checkpointId) {
+ NavigableMap nextSnapshots =
+ minNextSnapshotPerCheckpoint.headMap(checkpointId, true);
+ OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
+ nextSnapshots.clear();
+ return max;
+ }
+
+ /** Calculate the minimum snapshot currently being consumed by all readers. */
+ private Optional computeMinNextSnapshotId(
+ Set readersAwaitingSplit,
+ Function unassignedCalculationFunction,
+ int parallelism) {
+ long globalMinSnapshotId = Long.MAX_VALUE;
+ for (int subtask = 0; subtask < parallelism; subtask++) {
+ // 1. if the reader is in the waiting list, it means that all allocated splits have
+ // been
+ // consumed, and the next snapshotId is calculated from splitAssigner.
+ //
+ // 2. if the reader is not in the waiting list, the larger value between the
+ // consumption
+ // progress reported by the reader and the most recently assigned snapshot id is
+ // used.
+ Long snapshotIdForSubtask;
+ if (readersAwaitingSplit.contains(subtask)) {
+ snapshotIdForSubtask = unassignedCalculationFunction.apply(subtask);
+ } else {
+ Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask);
+ Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask);
+ if (consumingSnapshotId != null && assignedSnapshotId != null) {
+ snapshotIdForSubtask = Math.max(consumingSnapshotId, assignedSnapshotId);
+ } else {
+ snapshotIdForSubtask =
+ consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId;
+ }
+ }
+
+ if (snapshotIdForSubtask != null) {
+ globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForSubtask);
+ } else {
+ return Optional.empty();
+ }
+ }
+ return Optional.of(globalMinSnapshotId);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index fb55ed05224f..b1c0d9d3c6ae 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -22,7 +22,6 @@
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
-import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -48,11 +47,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Optional;
-import java.util.OptionalLong;
import java.util.Set;
-import java.util.TreeMap;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -76,11 +71,7 @@ public class ContinuousFileSplitEnumerator
protected final SplitAssigner splitAssigner;
- private final TreeMap minNextSnapshotPerCheckpoint;
-
- private final Map assignedSnapshotPerReader;
-
- private final Map consumingSnapshotPerReader;
+ private final ConsumerProgressCalculator consumerProgressCalculator;
@Nullable protected Long nextSnapshotId;
@@ -105,9 +96,8 @@ public ContinuousFileSplitEnumerator(
this.splitAssigner = createSplitAssigner(bucketMode);
addSplits(remainSplits);
- this.minNextSnapshotPerCheckpoint = new TreeMap<>();
- this.assignedSnapshotPerReader = new HashMap<>(context.currentParallelism());
- this.consumingSnapshotPerReader = new HashMap<>(context.currentParallelism());
+ this.consumerProgressCalculator =
+ new ConsumerProgressCalculator(context.currentParallelism());
}
@VisibleForTesting
@@ -156,9 +146,8 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof ReaderConsumeProgressEvent) {
- long currentSnapshotId =
- ((ReaderConsumeProgressEvent) sourceEvent).lastConsumeSnapshotId();
- consumingSnapshotPerReader.put(subtaskId, currentSnapshotId);
+ consumerProgressCalculator.updateConsumeProgress(
+ subtaskId, (ReaderConsumeProgressEvent) sourceEvent);
} else {
LOG.error("Received unrecognized event: {}", sourceEvent);
}
@@ -176,22 +165,21 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception
final PendingSplitsCheckpoint checkpoint =
new PendingSplitsCheckpoint(splits, nextSnapshotId);
- LOG.debug("Source Checkpoint is {}", checkpoint);
+ consumerProgressCalculator.notifySnapshotState(
+ checkpointId,
+ readersAwaitingSplit,
+ subtask -> splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId),
+ context.currentParallelism());
- computeMinNextSnapshotId()
- .ifPresent(
- minNextSnapshotId ->
- minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
+ LOG.debug("Source Checkpoint is {}", checkpoint);
return checkpoint;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- NavigableMap nextSnapshots =
- minNextSnapshotPerCheckpoint.headMap(checkpointId, true);
- OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
- max.ifPresent(scan::notifyCheckpointComplete);
- nextSnapshots.clear();
+ consumerProgressCalculator
+ .notifyCheckpointComplete(checkpointId)
+ .ifPresent(scan::notifyCheckpointComplete);
}
// ------------------------------------------------------------------------
@@ -253,8 +241,7 @@ protected synchronized void assignSplits() {
List splits = splitAssigner.getNext(task, null);
if (splits.size() > 0) {
assignment.put(task, splits);
- TableScanUtils.getSnapshotId(splits.get(0))
- .ifPresent(snapshotId -> assignedSnapshotPerReader.put(task, snapshotId));
+ consumerProgressCalculator.updateAssignInformation(task, splits.get(0));
}
}
@@ -287,38 +274,6 @@ protected boolean noMoreSplits() {
return finished;
}
- /** Calculate the minimum snapshot currently being consumed by all readers. */
- private Optional computeMinNextSnapshotId() {
- long globalMinSnapshotId = Long.MAX_VALUE;
- for (int subtask = 0; subtask < context.currentParallelism(); subtask++) {
- // 1. if the reader is in the waiting list, it means that all allocated splits have been
- // consumed, and the next snapshotId is calculated from splitAssigner.
- //
- // 2. if the reader is not in the waiting list, the larger value between the consumption
- // progress reported by the reader and the most recently assigned snapshot id is used.
- Long snapshotIdForTask;
- if (readersAwaitingSplit.contains(subtask)) {
- snapshotIdForTask = splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId);
- } else {
- Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask);
- Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask);
- if (consumingSnapshotId != null && assignedSnapshotId != null) {
- snapshotIdForTask = Math.max(consumingSnapshotId, assignedSnapshotId);
- } else {
- snapshotIdForTask =
- consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId;
- }
- }
-
- if (snapshotIdForTask != null) {
- globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForTask);
- } else {
- return Optional.empty();
- }
- }
- return Optional.of(globalMinSnapshotId);
- }
-
/** The result of scan. */
protected static class PlanWithNextSnapshotId {
private final TableScan.Plan plan;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 3b9012b11e84..215d2b6c675f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -236,7 +236,8 @@ public DataStream build() {
if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
return buildAlignedContinuousFileSource();
} else if (conf.contains(CoreOptions.CONSUMER_ID)
- && conf.get(CoreOptions.CONSUMER_WITH_LEGACY_MODE)) {
+ && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
+ == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
return buildContinuousStreamOperator();
} else {
return buildContinuousFileSource();