From ac627323133af132c8878e391a29e46cbbb0843b Mon Sep 17 00:00:00 2001 From: big face cat <731030576@qq.com> Date: Thu, 11 Apr 2024 17:02:41 +0800 Subject: [PATCH] [core] Support time travel to watermark in batch mode (#3199) --- docs/content/flink/sql-query.md | 3 + docs/content/spark/sql-query.md | 4 + .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 13 ++++ .../paimon/schema/SchemaValidation.java | 7 +- .../paimon/table/AbstractFileStoreTable.java | 18 +++++ .../table/source/AbstractInnerTableScan.java | 9 ++- .../StaticFromWatermarkStartingScanner.java | 73 +++++++++++++++++++ .../apache/paimon/utils/SnapshotManager.java | 57 +++++++++++++++ .../paimon/flink/BatchFileStoreITCase.java | 33 +++++++++ .../paimon/spark/SparkReadTestBase.java | 25 +++++++ .../paimon/spark/SparkTimeTravelITCase.java | 58 +++++++++++++++ 12 files changed, 304 insertions(+), 2 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md index 46d6fb588c40..80fc68dfb3a3 100644 --- a/docs/content/flink/sql-query.md +++ b/docs/content/flink/sql-query.md @@ -53,6 +53,9 @@ SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */; -- read tag 'my-tag' SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */; + +-- read the snapshot from watermark, will match the first snapshot after the watermark +SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */; ``` {{< /tab >}} diff --git a/docs/content/spark/sql-query.md b/docs/content/spark/sql-query.md index e29f0cf9e18a..fc6c81e8bc18 100644 --- a/docs/content/spark/sql-query.md +++ b/docs/content/spark/sql-query.md @@ -52,6 +52,10 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047; -- read tag 'my-tag' SELECT * FROM t VERSION AS OF 'my-tag'; + +-- read the snapshot from specified watermark. will match the first snapshot after the watermark +SELECT * FROM t VERSION AS OF 'watermark-1678883047356'; + ``` {{< hint warning >}} diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 5959b6b4f096..8d39919d5dc4 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -515,6 +515,12 @@ Long Optional timestamp used in case of "from-timestamp" scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen. + +
scan.watermark
+ (none) + Long + Optional watermark used in case of "from-snapshot" scan mode. If there is no snapshot later than this watermark, will throw an exceptions. +
sequence.field
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index cbe5f45c5b50..54ab8b6f3242 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -508,6 +508,14 @@ public class CoreOptions implements Serializable { "Optional timestamp used in case of \"from-timestamp\" scan mode. " + "If there is no snapshot earlier than this time, the earliest snapshot will be chosen."); + public static final ConfigOption SCAN_WATERMARK = + key("scan.watermark") + .longType() + .noDefaultValue() + .withDescription( + "Optional watermark used in case of \"from-snapshot\" scan mode. " + + "If there is no snapshot later than this watermark, will throw an exceptions."); + public static final ConfigOption SCAN_FILE_CREATION_TIME_MILLIS = key("scan.file-creation-time-millis") .longType() @@ -1450,6 +1458,7 @@ public static StartupMode startupMode(Options options) { return StartupMode.FROM_TIMESTAMP; } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent() || options.getOptional(SCAN_TAG_NAME).isPresent() + || options.getOptional(SCAN_WATERMARK).isPresent() || options.getOptional(SCAN_VERSION).isPresent()) { return StartupMode.FROM_SNAPSHOT; } else if (options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) { @@ -1471,6 +1480,10 @@ public Long scanTimestampMills() { return options.get(SCAN_TIMESTAMP_MILLIS); } + public Long scanWatermark() { + return options.get(SCAN_WATERMARK); + } + public Long scanFileCreationTimeMills() { return options.get(SCAN_FILE_CREATION_TIME_MILLIS); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 3697f18e123b..722e27de968c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -58,6 +58,7 @@ import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS; +import static org.apache.paimon.CoreOptions.SCAN_WATERMARK; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE; @@ -247,7 +248,11 @@ private static void validateStartupMode(CoreOptions options) { Collections.singletonList(SCAN_TIMESTAMP_MILLIS)); } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) { checkExactOneOptionExistInMode( - options, options.startupMode(), SCAN_SNAPSHOT_ID, SCAN_TAG_NAME); + options, + options.startupMode(), + SCAN_SNAPSHOT_ID, + SCAN_TAG_NAME, + SCAN_WATERMARK); checkOptionsConflict( options, Arrays.asList( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 99ad605df8ef..70e33ddb32f4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -50,6 +50,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; +import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; import org.apache.paimon.tag.TagPreview; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; @@ -77,6 +78,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { private static final long serialVersionUID = 1L; + private static final String WATERMARK_PREFIX = "watermark-"; protected final FileIO fileIO; protected final Path path; @@ -386,6 +388,8 @@ private Optional tryTimeTravel(Options options) { return travelToVersion(coreOptions.scanVersion(), options); } else if (coreOptions.scanSnapshotId() != null) { return travelToSnapshot(coreOptions.scanSnapshotId(), options); + } else if (coreOptions.scanWatermark() != null) { + return travelToWatermark(coreOptions.scanWatermark(), options); } else { return travelToTag(coreOptions.scanTagName(), options); } @@ -405,6 +409,10 @@ private Optional travelToVersion(String version, Options options) { if (tagManager().tagExists(version)) { options.set(CoreOptions.SCAN_TAG_NAME, version); return travelToTag(version, options); + } else if (version.startsWith(WATERMARK_PREFIX)) { + long watermark = Long.parseLong(version.substring(WATERMARK_PREFIX.length())); + options.set(CoreOptions.SCAN_WATERMARK, watermark); + return travelToWatermark(watermark, options); } else if (version.chars().allMatch(Character::isDigit)) { options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version); return travelToSnapshot(Long.parseLong(version), options); @@ -425,6 +433,16 @@ private Optional travelToSnapshot(long snapshotId, Options options) return Optional.empty(); } + private Optional travelToWatermark(long watermark, Options options) { + Snapshot snapshot = + StaticFromWatermarkStartingScanner.timeTravelToWatermark( + snapshotManager(), watermark); + if (snapshot != null) { + return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap())); + } + return Optional.empty(); + } + private Optional travelToSnapshot(@Nullable Snapshot snapshot, Options options) { if (snapshot != null) { return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap())); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index 463cd6fdfa63..6601bf548043 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -43,6 +43,7 @@ import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; +import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -167,10 +168,16 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { options.changelogLifecycleDecoupled()) : new StaticFromSnapshotStartingScanner( snapshotManager, options.scanSnapshotId()); - } else { + } else if (options.scanWatermark() != null) { + checkArgument(!isStreaming, "Cannot scan from watermark in streaming mode."); + return new StaticFromWatermarkStartingScanner( + snapshotManager, options().scanWatermark()); + } else if (options.scanTagName() != null) { checkArgument(!isStreaming, "Cannot scan from tag in streaming mode."); return new StaticFromTagStartingScanner( snapshotManager, options().scanTagName()); + } else { + throw new UnsupportedOperationException("Unknown snapshot read mode"); } case FROM_SNAPSHOT_FULL: return isStreaming diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java new file mode 100644 index 000000000000..022fdf37faf2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.utils.SnapshotManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** {@link StartingScanner} for the {@link CoreOptions#SCAN_WATERMARK} of a batch read. */ +public class StaticFromWatermarkStartingScanner extends AbstractStartingScanner { + + private static final Logger LOG = + LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class); + + private final long watermark; + + public StaticFromWatermarkStartingScanner(SnapshotManager snapshotManager, long watermark) { + super(snapshotManager); + this.watermark = watermark; + Snapshot snapshot = timeTravelToWatermark(snapshotManager, watermark); + if (snapshot != null) { + this.startingSnapshotId = snapshot.id(); + } + } + + @Override + public ScanMode startingScanMode() { + return ScanMode.ALL; + } + + @Override + public Result scan(SnapshotReader snapshotReader) { + if (startingSnapshotId == null) { + LOG.warn( + "There is currently no snapshot later than or equal to watermark[{}]", + watermark); + throw new RuntimeException( + String.format( + "There is currently no snapshot later than or equal to " + + "watermark[%d]", + watermark)); + } + return StartingScanner.fromPlan( + snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read()); + } + + @Nullable + public static Snapshot timeTravelToWatermark(SnapshotManager snapshotManager, long watermark) { + return snapshotManager.laterOrEqualWatermark(watermark); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 7c5ccd28c629..174b4233cccc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -307,6 +307,63 @@ private Snapshot changelogOrSnapshot(long snapshotId) { return finalSnapshot; } + public @Nullable Snapshot laterOrEqualWatermark(long watermark) { + Long earliest = earliestSnapshotId(); + Long latest = latestSnapshotId(); + if (earliest == null || latest == null) { + return null; + } + Long earliestWatermark = null; + // find the first snapshot with watermark + if ((earliestWatermark = snapshot(earliest).watermark()) == null) { + while (earliest < latest) { + earliest++; + earliestWatermark = snapshot(earliest).watermark(); + if (earliestWatermark != null) { + break; + } + } + } + if (earliestWatermark == null) { + return null; + } + + if (earliestWatermark >= watermark) { + return snapshot(earliest); + } + Snapshot finalSnapshot = null; + + while (earliest <= latest) { + long mid = earliest + (latest - earliest) / 2; // Avoid overflow + Snapshot snapshot = snapshot(mid); + Long commitWatermark = snapshot.watermark(); + if (commitWatermark == null) { + // find the first snapshot with watermark + while (mid >= earliest) { + mid--; + commitWatermark = snapshot(mid).watermark(); + if (commitWatermark != null) { + break; + } + } + } + if (commitWatermark == null) { + earliest = mid + 1; + } else { + if (commitWatermark > watermark) { + latest = mid - 1; // Search in the left half + finalSnapshot = snapshot; + } else if (commitWatermark < watermark) { + earliest = mid + 1; // Search in the right half + } else { + finalSnapshot = snapshot; // Found the exact match + break; + } + } + } + return finalSnapshot; + } + public long snapshotCount() throws IOException { return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX).count(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 7c8ffb6d630d..ac67e87dd992 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -25,12 +25,15 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -220,6 +223,36 @@ public void testTimeTravelRead() throws Exception { .hasRootCauseMessage("Tag 'unknown' doesn't exist."); } + @Test + @Timeout(120) + public void testTimeTravelReadWithWatermark() throws Exception { + streamSqlIter( + "CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING," + + " dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')"); + sql( + "CREATE TABLE WT (a STRING, b STRING, c STRING, dt TIMESTAMP, PRIMARY KEY (a) NOT ENFORCED)"); + CloseableIterator insert = streamSqlIter("INSERT INTO WT SELECT * FROM gen "); + List watermarks; + while (true) { + watermarks = + sql("SELECT `watermark` FROM WT$snapshots").stream() + .map(r -> (Long) r.getField("watermark")) + .collect(Collectors.toList()); + if (watermarks.size() > 1) { + insert.close(); + break; + } + Thread.sleep(1000); + } + Long maxWatermark = watermarks.get(watermarks.size() - 1); + assertThat( + batchSql( + String.format( + "SELECT * FROM WT /*+ OPTIONS('scan.watermark'='%d') */", + maxWatermark))) + .isNotEmpty(); + } + @Test public void testTimeTravelReadWithSnapshotExpiration() throws Exception { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java index 13cc1aae4180..becefb505578 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java @@ -22,12 +22,15 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; @@ -188,6 +191,28 @@ protected static void writeTable(String tableName, GenericRow... rows) throws Ex commit.close(); } + protected static void writeTableWithWatermark( + String tableName, Long watermark, GenericRow... rows) throws Exception { + FileStoreTable fileStoreTable = getTable(tableName); + StreamWriteBuilder streamWriteBuilder = fileStoreTable.newStreamWriteBuilder(); + StreamTableWrite writer = streamWriteBuilder.newWrite(); + TableCommitImpl commit = (TableCommitImpl) streamWriteBuilder.newCommit(); + + for (GenericRow row : rows) { + writer.write(row); + } + long commitIdentifier = COMMIT_IDENTIFIER.getAndIncrement(); + ManifestCommittable manifestCommittable = + new ManifestCommittable(commitIdentifier, watermark); + List commitMessages = writer.prepareCommit(true, commitIdentifier); + for (CommitMessage commitMessage : commitMessages) { + manifestCommittable.addFileCommittable(commitMessage); + } + commit.commit(manifestCommittable); + writer.close(); + commit.close(); + } + protected static void writeTable(String tableName, String... values) { spark.sql( String.format( diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java index 8345c9a9dcf1..96c8d9eb91ad 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java @@ -299,4 +299,62 @@ public void testTravelToTagWithDigitalName() throws Exception { assertThat(spark.sql("SELECT * FROM t VERSION AS OF '1'").collectAsList().toString()) .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]"); } + + @Test + public void testTravelWithWatermark() throws Exception { + spark.sql("CREATE TABLE t (k INT, v STRING)"); + + // snapshot 1 + writeTableWithWatermark( + "t", + 1L, + GenericRow.of(1, BinaryString.fromString("Hello")), + GenericRow.of(2, BinaryString.fromString("Paimon"))); + + // snapshot 2 + writeTableWithWatermark( + "t", + null, + GenericRow.of(1, BinaryString.fromString("Null")), + GenericRow.of(2, BinaryString.fromString("Watermark"))); + + // snapshot 3 + writeTableWithWatermark( + "t", + 10L, + GenericRow.of(3, BinaryString.fromString("Time")), + GenericRow.of(4, BinaryString.fromString("Travel"))); + + // time travel to watermark '1' + assertThat( + spark.sql("SELECT * FROM t version as of 'watermark-1'") + .collectAsList() + .toString()) + .isEqualTo("[[1,Hello], [2,Paimon]]"); + + try { + spark.sql("SELECT * FROM t version as of 'watermark-11'").collectAsList(); + } catch (Exception e) { + assertThat( + e.getMessage() + .equals( + "There is currently no snapshot later than or equal to watermark[11]")); + } + + // time travel to watermark '9' + assertThat( + spark.sql("SELECT * FROM t version as of 'watermark-9'") + .collectAsList() + .toString()) + .isEqualTo( + "[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]"); + + // time travel to watermark '10' + assertThat( + spark.sql("SELECT * FROM t version as of 'watermark-10'") + .collectAsList() + .toString()) + .isEqualTo( + "[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]"); + } }