diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 4375347785bc..9177c35653d4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -194,6 +194,12 @@
Enum |
Specify the consumer consistency mode for table.
Possible values:- "exactly-once": Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed.
- "at-least-once": Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.
|
+
+ consumer.snapshot-id |
+ (none) |
+ Long |
+ Specify the consumer startup mode from snapshot-id, if set it would reset snapshot-id firstly and consumer from the snapshot, default from the snapshot-id before. |
+
continuous.discovery-interval |
10 s |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 254164dbb315..d83cfb7ac88f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -969,6 +969,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to ignore consumer progress for the newly started job.");
+ public static final ConfigOption CONSUMER_SNAPSHOT_ID =
+ key("consumer.snapshot-id")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify the consumer startup mode from snapshot-id, if set it would reset snapshot-id firstly and consumer from the snapshot, default from the snapshot-id before.");
+
public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
@@ -2075,6 +2082,10 @@ public boolean consumerIgnoreProgress() {
return options.get(CONSUMER_IGNORE_PROGRESS);
}
+ public Long consumerStartupFromSnapshotID() {
+ return options.get(CONSUMER_SNAPSHOT_ID);
+ }
+
public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 6a8aa9265e5c..7b958873fddf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -48,6 +48,9 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -62,6 +65,8 @@ public abstract class AbstractDataTableScan implements DataTableScan {
private final CoreOptions options;
protected final SnapshotReader snapshotReader;
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDataTableScan.class);
+
protected AbstractDataTableScan(CoreOptions options, SnapshotReader snapshotReader) {
this.options = options;
this.snapshotReader = snapshotReader;
@@ -123,6 +128,17 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
String consumerId = options.consumerId();
if (isStreaming && consumerId != null && !options.consumerIgnoreProgress()) {
ConsumerManager consumerManager = snapshotReader.consumerManager();
+ Long snapshotId = options.consumerStartupFromSnapshotID();
+ if (snapshotId != null) {
+ if (snapshotManager.snapshotExists(snapshotId)) {
+ consumerManager.resetConsumer(consumerId, new Consumer(snapshotId));
+ } else {
+ LOG.warn(
+ String.format(
+ "You set consumer.snapshot-id: %s is not exisit, it would not work!",
+ snapshotId));
+ }
+ }
Optional consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
return new ContinuousFromSnapshotStartingScanner(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index ef921ad666ca..a6956959324a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -80,6 +80,22 @@ public void testResetConsumer(String invoker) throws Exception {
changelogRow("+I", 3L, "Paimon")))
.close();
+ // use consumer streaming read table with consumer.snapshot-id
+ testStreamingRead(
+ "SELECT * FROM `"
+ + tableName
+ + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h','consumer.snapshot-id'='2') */",
+ Arrays.asList(
+ changelogRow("+I", 2L, "Hello"), changelogRow("+I", 3L, "Paimon")))
+ .close();
+
+ testStreamingRead(
+ "SELECT * FROM `"
+ + tableName
+ + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h','consumer.snapshot-id'='3') */",
+ Arrays.asList(changelogRow("+I", 3L, "Paimon")))
+ .close();
+
Thread.sleep(1000);
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location());
Optional consumer1 = consumerManager.consumer("myid");