diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index 8fa906431cca..6b2d8ac23588 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -346,7 +346,8 @@ SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages:
1. When previous job is stopped, the newly started job can continue to consume from the previous progress without
- resuming from the state. The newly reading will start reading from next snapshot id found in consumer files.
+ resuming from the state. The newly reading will start reading from next snapshot id found in consumer files.
+ If you don't want this behavior, you can set `'consumer.ignore-progress'` to true.
2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system,
and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.
3. When there is no watermark definition, the Paimon table will pass the watermark in the snapshot to the downstream
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 16d6376efad5..511cdd7b125d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@
Duration |
The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value. |
+
+ consumer.ignore-progress |
+ false |
+ Boolean |
+ Whether to ignore consumer progress for the newly started job. |
+
consumer.mode |
exactly-once |
@@ -141,16 +147,16 @@
Whether to ignore delete records in deduplicate mode. |
- dynamic-bucket.initial-buckets |
+ dynamic-bucket.assigner-parallelism |
(none) |
Integer |
- Initial buckets for a partition in assigner operator for dynamic bucket mode. |
+ Parallelism of assigner operator for dynamic bucket mode, it is related to the number of initialized bucket, too small will lead to insufficient processing speed of assigner. |
- dynamic-bucket.assigner-parallelism |
+ dynamic-bucket.initial-buckets |
(none) |
Integer |
- Parallelism of assigner operator for dynamic bucket mode, it is related to the number of initialized bucket, too small will lead to insufficient processing speed of assigner. |
+ Initial buckets for a partition in assigner operator for dynamic bucket mode. |
dynamic-bucket.target-row-num |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 550c76c9511e..d09cd32074e8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -767,6 +767,13 @@ public class CoreOptions implements Serializable {
.defaultValue(ConsumerMode.EXACTLY_ONCE)
.withDescription("Specify the consumer consistency mode for table.");
+ public static final ConfigOption CONSUMER_IGNORE_PROGRESS =
+ key("consumer.ignore-progress")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to ignore consumer progress for the newly started job.");
+
public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
@@ -1420,8 +1427,8 @@ public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}
- public ConsumerMode consumerWithLegacyMode() {
- return options.get(CONSUMER_CONSISTENCY_MODE);
+ public boolean consumerIgnoreProgress() {
+ return options.get(CONSUMER_IGNORE_PROGRESS);
}
public boolean partitionedTableInMetastore() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
index 0aad5bd63584..80e5ec33fbc7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
@@ -35,9 +35,6 @@ public class Consumer {
private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";
- private static final int READ_CONSUMER_RETRY_NUM = 3;
- private static final int READ_CONSUMER_RETRY_INTERVAL = 100;
-
private final long nextSnapshot;
@JsonCreator
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 5fad72a2aee9..fd605b64e13b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -100,7 +100,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
// read from consumer id
String consumerId = options.consumerId();
- if (consumerId != null) {
+ if (consumerId != null && !options.consumerIgnoreProgress()) {
ConsumerManager consumerManager = snapshotReader.consumerManager();
Optional consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index ec6f959bedf7..abb07083af74 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -101,6 +101,7 @@
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.paimon.CoreOptions.CONSUMER_IGNORE_PROGRESS;
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_EXECUTION_MODE;
@@ -646,6 +647,18 @@ public void testConsumeId() throws Exception {
assertThat(result)
.containsExactlyInAnyOrder("+1|30|300|binary|varbinary|mapKey:mapVal|multiset");
+ // read again using consume id with ignore progress
+ scan =
+ table.copy(Collections.singletonMap(CONSUMER_IGNORE_PROGRESS.key(), "true"))
+ .newStreamScan();
+ List splits = scan.plan().splits();
+ result = getResult(read, splits, STREAMING_ROW_TO_STRING);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ "+1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "+1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "+1|30|300|binary|varbinary|mapKey:mapVal|multiset");
+
// test snapshot expiration
for (int i = 3; i <= 8; i++) {
write.write(rowData(1, (i + 1) * 10, (i + 1) * 100L));
@@ -665,6 +678,7 @@ public void testConsumeId() throws Exception {
table.copy(
Collections.singletonMap(
CoreOptions.CONSUMER_EXPIRATION_TIME.key(), "1 s"));
+
// commit to trigger expiration
writeBuilder = table.newStreamWriteBuilder();
write = writeBuilder.newWrite();