diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md index 4604fcb797be6..223c6c7e95af0 100644 --- a/docs/content/how-to/querying-tables.md +++ b/docs/content/how-to/querying-tables.md @@ -332,15 +332,16 @@ NOTE: The consumer will prevent expiration of the snapshot. You can specify 'con lifetime of consumers. {{< /hint >}} -You can set `consumer.use-legacy-mode` to `false` to use the consumer function implemented based on -[flip-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface), which can provide -more capabilities, such as watermark alignment. +By default, the consumer uses `exactly-once` mode to record consumption progress, which strictly ensures that what is +recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed. You can set `consumer.mode` to +`at-least-once` to allow readers consume snapshots at different rates and record the slowest snapshot-id among all +readers into the consumer. This mode can provide more capabilities, such as watermark alignment. {{< hint warning >}} -1. When there is no watermark definition, the consumer in non-legacy mode cannot provide the ability to pass the +1. When there is no watermark definition, the consumer in `at-least-once` mode cannot provide the ability to pass the watermark in the snapshot to the downstream. -2. Since the implementation of legacy mode and non-legacy mode are completely different, the state of flink is -incompatible and cannot be restored from the state when switching modes. +2. Since the implementation of `exactly-once` mode and `at-least-once` mode are completely different, the state of +flink is incompatible and cannot be restored from the state when switching modes. {{< /hint >}} You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 2cfaeae8d6ce5..cd18f0c4e17a0 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -111,10 +111,10 @@ The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value. -
consumer.use-legacy-mode
- true +
consumer.mode
+ exactly-once Boolean - Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true. + Specify the consumer consistency mode for table.

Possible values:
continuous.discovery-interval
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 5751f4ff5ba55..05528f1c3ee3d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -705,12 +705,11 @@ public class CoreOptions implements Serializable { "The expiration interval of consumer files. A consumer file will be expired if " + "it's lifetime after last modification is over this value."); - public static final ConfigOption CONSUMER_WITH_LEGACY_MODE = - key("consumer.use-legacy-mode") - .booleanType() - .defaultValue(true) - .withDescription( - "Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true."); + public static final ConfigOption CONSUMER_CONSISTENCY_MODE = + key("consumer.mode") + .enumType(ConsumerMode.class) + .defaultValue(ConsumerMode.EXACTLY_ONCE) + .withDescription("Specify the consumer consistency mode for table."); public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM = key("dynamic-bucket.target-row-num") @@ -1310,8 +1309,8 @@ public Duration consumerExpireTime() { return options.get(CONSUMER_EXPIRATION_TIME); } - public boolean consumerWithLegacyMode() { - return options.get(CONSUMER_WITH_LEGACY_MODE); + public ConsumerMode consumerWithLegacyMode() { + return options.get(CONSUMER_CONSISTENCY_MODE); } public boolean partitionedTableInMetastore() { @@ -1958,4 +1957,33 @@ public InlineElement getDescription() { return text(description); } } + + /** Specifies the log consistency mode for table. */ + public enum ConsumerMode implements DescribedEnum { + EXACTLY_ONCE( + "exactly-once", + "Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed."), + + AT_LEAST_ONCE( + "at-least-once", + "Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer."); + + private final String value; + private final String description; + + ConsumerMode(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java new file mode 100644 index 0000000000000..f88428a2e8243 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ConsumerProgressCalculator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.flink.utils.TableScanUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; + +/** Calculator for calculating consumer consumption progress. */ +public class ConsumerProgressCalculator { + private final TreeMap minNextSnapshotPerCheckpoint; + + private final Map assignedSnapshotPerReader; + + private final Map consumingSnapshotPerReader; + + public ConsumerProgressCalculator(int parallelism) { + this.minNextSnapshotPerCheckpoint = new TreeMap<>(); + this.assignedSnapshotPerReader = new HashMap<>(parallelism); + this.consumingSnapshotPerReader = new HashMap<>(parallelism); + } + + public void updateConsumeProgress(int subtaskId, ReaderConsumeProgressEvent event) { + consumingSnapshotPerReader.put(subtaskId, event.lastConsumeSnapshotId()); + } + + public void updateAssignInformation(int subtaskId, FileStoreSourceSplit split) { + TableScanUtils.getSnapshotId(split) + .ifPresent(snapshotId -> assignedSnapshotPerReader.put(subtaskId, snapshotId)); + } + + public void notifySnapshotState( + long checkpointId, + Set readersAwaitingSplit, + Function unassignedCalculationFunction, + int parallelism) { + computeMinNextSnapshotId(readersAwaitingSplit, unassignedCalculationFunction, parallelism) + .ifPresent( + minNextSnapshotId -> + minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId)); + } + + public OptionalLong notifyCheckpointComplete(long checkpointId) { + NavigableMap nextSnapshots = + minNextSnapshotPerCheckpoint.headMap(checkpointId, true); + OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max(); + nextSnapshots.clear(); + return max; + } + + /** Calculate the minimum snapshot currently being consumed by all readers. */ + private Optional computeMinNextSnapshotId( + Set readersAwaitingSplit, + Function unassignedCalculationFunction, + int parallelism) { + long globalMinSnapshotId = Long.MAX_VALUE; + for (int subtask = 0; subtask < parallelism; subtask++) { + // 1. if the reader is in the waiting list, it means that all allocated splits have + // been + // consumed, and the next snapshotId is calculated from splitAssigner. + // + // 2. if the reader is not in the waiting list, the larger value between the + // consumption + // progress reported by the reader and the most recently assigned snapshot id is + // used. + Long snapshotIdForSubtask; + if (readersAwaitingSplit.contains(subtask)) { + snapshotIdForSubtask = unassignedCalculationFunction.apply(subtask); + } else { + Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask); + Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask); + if (consumingSnapshotId != null && assignedSnapshotId != null) { + snapshotIdForSubtask = Math.max(consumingSnapshotId, assignedSnapshotId); + } else { + snapshotIdForSubtask = + consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId; + } + } + + if (snapshotIdForSubtask != null) { + globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForSubtask); + } else { + return Optional.empty(); + } + } + return Optional.of(globalMinSnapshotId); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index fb55ed05224f4..b1c0d9d3c6ae0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -22,7 +22,6 @@ import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; -import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; @@ -48,11 +47,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.NavigableMap; -import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; -import java.util.TreeMap; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -76,11 +71,7 @@ public class ContinuousFileSplitEnumerator protected final SplitAssigner splitAssigner; - private final TreeMap minNextSnapshotPerCheckpoint; - - private final Map assignedSnapshotPerReader; - - private final Map consumingSnapshotPerReader; + private final ConsumerProgressCalculator consumerProgressCalculator; @Nullable protected Long nextSnapshotId; @@ -105,9 +96,8 @@ public ContinuousFileSplitEnumerator( this.splitAssigner = createSplitAssigner(bucketMode); addSplits(remainSplits); - this.minNextSnapshotPerCheckpoint = new TreeMap<>(); - this.assignedSnapshotPerReader = new HashMap<>(context.currentParallelism()); - this.consumingSnapshotPerReader = new HashMap<>(context.currentParallelism()); + this.consumerProgressCalculator = + new ConsumerProgressCalculator(context.currentParallelism()); } @VisibleForTesting @@ -156,9 +146,8 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { if (sourceEvent instanceof ReaderConsumeProgressEvent) { - long currentSnapshotId = - ((ReaderConsumeProgressEvent) sourceEvent).lastConsumeSnapshotId(); - consumingSnapshotPerReader.put(subtaskId, currentSnapshotId); + consumerProgressCalculator.updateConsumeProgress( + subtaskId, (ReaderConsumeProgressEvent) sourceEvent); } else { LOG.error("Received unrecognized event: {}", sourceEvent); } @@ -176,22 +165,21 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception final PendingSplitsCheckpoint checkpoint = new PendingSplitsCheckpoint(splits, nextSnapshotId); - LOG.debug("Source Checkpoint is {}", checkpoint); + consumerProgressCalculator.notifySnapshotState( + checkpointId, + readersAwaitingSplit, + subtask -> splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId), + context.currentParallelism()); - computeMinNextSnapshotId() - .ifPresent( - minNextSnapshotId -> - minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId)); + LOG.debug("Source Checkpoint is {}", checkpoint); return checkpoint; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - NavigableMap nextSnapshots = - minNextSnapshotPerCheckpoint.headMap(checkpointId, true); - OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max(); - max.ifPresent(scan::notifyCheckpointComplete); - nextSnapshots.clear(); + consumerProgressCalculator + .notifyCheckpointComplete(checkpointId) + .ifPresent(scan::notifyCheckpointComplete); } // ------------------------------------------------------------------------ @@ -253,8 +241,7 @@ protected synchronized void assignSplits() { List splits = splitAssigner.getNext(task, null); if (splits.size() > 0) { assignment.put(task, splits); - TableScanUtils.getSnapshotId(splits.get(0)) - .ifPresent(snapshotId -> assignedSnapshotPerReader.put(task, snapshotId)); + consumerProgressCalculator.updateAssignInformation(task, splits.get(0)); } } @@ -287,38 +274,6 @@ protected boolean noMoreSplits() { return finished; } - /** Calculate the minimum snapshot currently being consumed by all readers. */ - private Optional computeMinNextSnapshotId() { - long globalMinSnapshotId = Long.MAX_VALUE; - for (int subtask = 0; subtask < context.currentParallelism(); subtask++) { - // 1. if the reader is in the waiting list, it means that all allocated splits have been - // consumed, and the next snapshotId is calculated from splitAssigner. - // - // 2. if the reader is not in the waiting list, the larger value between the consumption - // progress reported by the reader and the most recently assigned snapshot id is used. - Long snapshotIdForTask; - if (readersAwaitingSplit.contains(subtask)) { - snapshotIdForTask = splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId); - } else { - Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask); - Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask); - if (consumingSnapshotId != null && assignedSnapshotId != null) { - snapshotIdForTask = Math.max(consumingSnapshotId, assignedSnapshotId); - } else { - snapshotIdForTask = - consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId; - } - } - - if (snapshotIdForTask != null) { - globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForTask); - } else { - return Optional.empty(); - } - } - return Optional.of(globalMinSnapshotId); - } - /** The result of scan. */ protected static class PlanWithNextSnapshotId { private final TableScan.Plan plan; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3b9012b11e845..215d2b6c675f4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -236,7 +236,8 @@ public DataStream build() { if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { return buildAlignedContinuousFileSource(); } else if (conf.contains(CoreOptions.CONSUMER_ID) - && conf.get(CoreOptions.CONSUMER_WITH_LEGACY_MODE)) { + && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) + == CoreOptions.ConsumerMode.EXACTLY_ONCE) { return buildContinuousStreamOperator(); } else { return buildContinuousFileSource();