diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md
index 46d6fb588c40..80fc68dfb3a3 100644
--- a/docs/content/flink/sql-query.md
+++ b/docs/content/flink/sql-query.md
@@ -53,6 +53,9 @@ SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
-- read tag 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
+
+-- read the snapshot from watermark, will match the first snapshot after the watermark
+SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */;
```
{{< /tab >}}
diff --git a/docs/content/spark/sql-query.md b/docs/content/spark/sql-query.md
index e29f0cf9e18a..fc6c81e8bc18 100644
--- a/docs/content/spark/sql-query.md
+++ b/docs/content/spark/sql-query.md
@@ -52,6 +52,10 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;
-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';
+
+-- read the snapshot from specified watermark. will match the first snapshot after the watermark
+SELECT * FROM t VERSION AS OF 'watermark-1678883047356';
+
```
{{< hint warning >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 5959b6b4f096..8d39919d5dc4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -515,6 +515,12 @@
Long |
Optional timestamp used in case of "from-timestamp" scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen. |
+
+ scan.watermark |
+ (none) |
+ Long |
+ Optional watermark used in case of "from-snapshot" scan mode. If there is no snapshot later than this watermark, will throw an exceptions. |
+
sequence.field |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index cbe5f45c5b50..54ab8b6f3242 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -508,6 +508,14 @@ public class CoreOptions implements Serializable {
"Optional timestamp used in case of \"from-timestamp\" scan mode. "
+ "If there is no snapshot earlier than this time, the earliest snapshot will be chosen.");
+ public static final ConfigOption SCAN_WATERMARK =
+ key("scan.watermark")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional watermark used in case of \"from-snapshot\" scan mode. "
+ + "If there is no snapshot later than this watermark, will throw an exceptions.");
+
public static final ConfigOption SCAN_FILE_CREATION_TIME_MILLIS =
key("scan.file-creation-time-millis")
.longType()
@@ -1450,6 +1458,7 @@ public static StartupMode startupMode(Options options) {
return StartupMode.FROM_TIMESTAMP;
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
|| options.getOptional(SCAN_TAG_NAME).isPresent()
+ || options.getOptional(SCAN_WATERMARK).isPresent()
|| options.getOptional(SCAN_VERSION).isPresent()) {
return StartupMode.FROM_SNAPSHOT;
} else if (options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) {
@@ -1471,6 +1480,10 @@ public Long scanTimestampMills() {
return options.get(SCAN_TIMESTAMP_MILLIS);
}
+ public Long scanWatermark() {
+ return options.get(SCAN_WATERMARK);
+ }
+
public Long scanFileCreationTimeMills() {
return options.get(SCAN_FILE_CREATION_TIME_MILLIS);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3697f18e123b..722e27de968c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -58,6 +58,7 @@
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static org.apache.paimon.CoreOptions.SCAN_WATERMARK;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
@@ -247,7 +248,11 @@ private static void validateStartupMode(CoreOptions options) {
Collections.singletonList(SCAN_TIMESTAMP_MILLIS));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
- options, options.startupMode(), SCAN_SNAPSHOT_ID, SCAN_TAG_NAME);
+ options,
+ options.startupMode(),
+ SCAN_SNAPSHOT_ID,
+ SCAN_TAG_NAME,
+ SCAN_WATERMARK);
checkOptionsConflict(
options,
Arrays.asList(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 99ad605df8ef..70e33ddb32f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -50,6 +50,7 @@
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
@@ -77,6 +78,7 @@
abstract class AbstractFileStoreTable implements FileStoreTable {
private static final long serialVersionUID = 1L;
+ private static final String WATERMARK_PREFIX = "watermark-";
protected final FileIO fileIO;
protected final Path path;
@@ -386,6 +388,8 @@ private Optional tryTimeTravel(Options options) {
return travelToVersion(coreOptions.scanVersion(), options);
} else if (coreOptions.scanSnapshotId() != null) {
return travelToSnapshot(coreOptions.scanSnapshotId(), options);
+ } else if (coreOptions.scanWatermark() != null) {
+ return travelToWatermark(coreOptions.scanWatermark(), options);
} else {
return travelToTag(coreOptions.scanTagName(), options);
}
@@ -405,6 +409,10 @@ private Optional travelToVersion(String version, Options options) {
if (tagManager().tagExists(version)) {
options.set(CoreOptions.SCAN_TAG_NAME, version);
return travelToTag(version, options);
+ } else if (version.startsWith(WATERMARK_PREFIX)) {
+ long watermark = Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
+ options.set(CoreOptions.SCAN_WATERMARK, watermark);
+ return travelToWatermark(watermark, options);
} else if (version.chars().allMatch(Character::isDigit)) {
options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
return travelToSnapshot(Long.parseLong(version), options);
@@ -425,6 +433,16 @@ private Optional travelToSnapshot(long snapshotId, Options options)
return Optional.empty();
}
+ private Optional travelToWatermark(long watermark, Options options) {
+ Snapshot snapshot =
+ StaticFromWatermarkStartingScanner.timeTravelToWatermark(
+ snapshotManager(), watermark);
+ if (snapshot != null) {
+ return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
+ }
+ return Optional.empty();
+ }
+
private Optional travelToSnapshot(@Nullable Snapshot snapshot, Options options) {
if (snapshot != null) {
return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 463cd6fdfa63..6601bf548043 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -43,6 +43,7 @@
import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
@@ -167,10 +168,16 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
options.changelogLifecycleDecoupled())
: new StaticFromSnapshotStartingScanner(
snapshotManager, options.scanSnapshotId());
- } else {
+ } else if (options.scanWatermark() != null) {
+ checkArgument(!isStreaming, "Cannot scan from watermark in streaming mode.");
+ return new StaticFromWatermarkStartingScanner(
+ snapshotManager, options().scanWatermark());
+ } else if (options.scanTagName() != null) {
checkArgument(!isStreaming, "Cannot scan from tag in streaming mode.");
return new StaticFromTagStartingScanner(
snapshotManager, options().scanTagName());
+ } else {
+ throw new UnsupportedOperationException("Unknown snapshot read mode");
}
case FROM_SNAPSHOT_FULL:
return isStreaming
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
new file mode 100644
index 000000000000..022fdf37faf2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** {@link StartingScanner} for the {@link CoreOptions#SCAN_WATERMARK} of a batch read. */
+public class StaticFromWatermarkStartingScanner extends AbstractStartingScanner {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class);
+
+ private final long watermark;
+
+ public StaticFromWatermarkStartingScanner(SnapshotManager snapshotManager, long watermark) {
+ super(snapshotManager);
+ this.watermark = watermark;
+ Snapshot snapshot = timeTravelToWatermark(snapshotManager, watermark);
+ if (snapshot != null) {
+ this.startingSnapshotId = snapshot.id();
+ }
+ }
+
+ @Override
+ public ScanMode startingScanMode() {
+ return ScanMode.ALL;
+ }
+
+ @Override
+ public Result scan(SnapshotReader snapshotReader) {
+ if (startingSnapshotId == null) {
+ LOG.warn(
+ "There is currently no snapshot later than or equal to watermark[{}]",
+ watermark);
+ throw new RuntimeException(
+ String.format(
+ "There is currently no snapshot later than or equal to "
+ + "watermark[%d]",
+ watermark));
+ }
+ return StartingScanner.fromPlan(
+ snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ }
+
+ @Nullable
+ public static Snapshot timeTravelToWatermark(SnapshotManager snapshotManager, long watermark) {
+ return snapshotManager.laterOrEqualWatermark(watermark);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 7c5ccd28c629..174b4233cccc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -307,6 +307,63 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return finalSnapshot;
}
+ public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
+ Long earliest = earliestSnapshotId();
+ Long latest = latestSnapshotId();
+ if (earliest == null || latest == null) {
+ return null;
+ }
+ Long earliestWatermark = null;
+ // find the first snapshot with watermark
+ if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
+ while (earliest < latest) {
+ earliest++;
+ earliestWatermark = snapshot(earliest).watermark();
+ if (earliestWatermark != null) {
+ break;
+ }
+ }
+ }
+ if (earliestWatermark == null) {
+ return null;
+ }
+
+ if (earliestWatermark >= watermark) {
+ return snapshot(earliest);
+ }
+ Snapshot finalSnapshot = null;
+
+ while (earliest <= latest) {
+ long mid = earliest + (latest - earliest) / 2; // Avoid overflow
+ Snapshot snapshot = snapshot(mid);
+ Long commitWatermark = snapshot.watermark();
+ if (commitWatermark == null) {
+ // find the first snapshot with watermark
+ while (mid >= earliest) {
+ mid--;
+ commitWatermark = snapshot(mid).watermark();
+ if (commitWatermark != null) {
+ break;
+ }
+ }
+ }
+ if (commitWatermark == null) {
+ earliest = mid + 1;
+ } else {
+ if (commitWatermark > watermark) {
+ latest = mid - 1; // Search in the left half
+ finalSnapshot = snapshot;
+ } else if (commitWatermark < watermark) {
+ earliest = mid + 1; // Search in the right half
+ } else {
+ finalSnapshot = snapshot; // Found the exact match
+ break;
+ }
+ }
+ }
+ return finalSnapshot;
+ }
+
public long snapshotCount() throws IOException {
return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX).count();
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 7c8ffb6d630d..ac67e87dd992 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -25,12 +25,15 @@
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -220,6 +223,36 @@ public void testTimeTravelRead() throws Exception {
.hasRootCauseMessage("Tag 'unknown' doesn't exist.");
}
+ @Test
+ @Timeout(120)
+ public void testTimeTravelReadWithWatermark() throws Exception {
+ streamSqlIter(
+ "CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING,"
+ + " dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')");
+ sql(
+ "CREATE TABLE WT (a STRING, b STRING, c STRING, dt TIMESTAMP, PRIMARY KEY (a) NOT ENFORCED)");
+ CloseableIterator insert = streamSqlIter("INSERT INTO WT SELECT * FROM gen ");
+ List watermarks;
+ while (true) {
+ watermarks =
+ sql("SELECT `watermark` FROM WT$snapshots").stream()
+ .map(r -> (Long) r.getField("watermark"))
+ .collect(Collectors.toList());
+ if (watermarks.size() > 1) {
+ insert.close();
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Long maxWatermark = watermarks.get(watermarks.size() - 1);
+ assertThat(
+ batchSql(
+ String.format(
+ "SELECT * FROM WT /*+ OPTIONS('scan.watermark'='%d') */",
+ maxWatermark)))
+ .isNotEmpty();
+ }
+
@Test
public void testTimeTravelReadWithSnapshotExpiration() throws Exception {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 13cc1aae4180..becefb505578 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -22,12 +22,15 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
@@ -188,6 +191,28 @@ protected static void writeTable(String tableName, GenericRow... rows) throws Ex
commit.close();
}
+ protected static void writeTableWithWatermark(
+ String tableName, Long watermark, GenericRow... rows) throws Exception {
+ FileStoreTable fileStoreTable = getTable(tableName);
+ StreamWriteBuilder streamWriteBuilder = fileStoreTable.newStreamWriteBuilder();
+ StreamTableWrite writer = streamWriteBuilder.newWrite();
+ TableCommitImpl commit = (TableCommitImpl) streamWriteBuilder.newCommit();
+
+ for (GenericRow row : rows) {
+ writer.write(row);
+ }
+ long commitIdentifier = COMMIT_IDENTIFIER.getAndIncrement();
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(commitIdentifier, watermark);
+ List commitMessages = writer.prepareCommit(true, commitIdentifier);
+ for (CommitMessage commitMessage : commitMessages) {
+ manifestCommittable.addFileCommittable(commitMessage);
+ }
+ commit.commit(manifestCommittable);
+ writer.close();
+ commit.close();
+ }
+
protected static void writeTable(String tableName, String... values) {
spark.sql(
String.format(
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 8345c9a9dcf1..416efef05bc9 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -299,4 +299,62 @@ public void testTravelToTagWithDigitalName() throws Exception {
assertThat(spark.sql("SELECT * FROM t VERSION AS OF '1'").collectAsList().toString())
.isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
}
+
+ @Test
+ public void testTravelWithWatermark() throws Exception {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ writeTableWithWatermark(
+ "t",
+ 1L,
+ GenericRow.of(1, BinaryString.fromString("Hello")),
+ GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+ // snapshot 3
+ writeTableWithWatermark(
+ "t",
+ null,
+ GenericRow.of(1, BinaryString.fromString("Null")),
+ GenericRow.of(2, BinaryString.fromString("Watermark")));
+
+ // snapshot 3
+ writeTableWithWatermark(
+ "t",
+ 10L,
+ GenericRow.of(3, BinaryString.fromString("Time")),
+ GenericRow.of(4, BinaryString.fromString("Travel")));
+
+ // time travel to watermark '1'
+ assertThat(
+ spark.sql("SELECT * FROM t version as of 'watermark-1'")
+ .collectAsList()
+ .toString())
+ .isEqualTo("[[1,Hello], [2,Paimon]]");
+
+ try {
+ spark.sql("SELECT * FROM t version as of 'watermark-11'").collectAsList();
+ } catch (Exception e) {
+ assertThat(
+ e.getMessage()
+ .equals(
+ "There is currently no snapshot later than or equal to watermark[11]"));
+ }
+
+ // time travel to watermark '9'
+ assertThat(
+ spark.sql("SELECT * FROM t version as of 'watermark-9'")
+ .collectAsList()
+ .toString())
+ .isEqualTo(
+ "[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]");
+
+ // time travel to watermark '10'
+ assertThat(
+ spark.sql("SELECT * FROM t version as of 'watermark-10'")
+ .collectAsList()
+ .toString())
+ .isEqualTo(
+ "[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]");
+ }
}