Skip to content

Commit

Permalink
[core] Support time travel to watermark in batch mode (#3199)
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng2018 authored Apr 11, 2024
1 parent 5fbf727 commit ac62732
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 2 deletions.
3 changes: 3 additions & 0 deletions docs/content/flink/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- read tag 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

-- read the snapshot from watermark, will match the first snapshot after the watermark
SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */;
```
{{< /tab >}}

Expand Down
4 changes: 4 additions & 0 deletions docs/content/spark/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';

-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

```

{{< hint warning >}}
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,12 @@
<td>Long</td>
<td>Optional timestamp used in case of "from-timestamp" scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen.</td>
</tr>
<tr>
<td><h5>scan.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Optional watermark used in case of "from-snapshot" scan mode. If there is no snapshot later than this watermark, will throw an exceptions.</td>
</tr>
<tr>
<td><h5>sequence.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,14 @@ public class CoreOptions implements Serializable {
"Optional timestamp used in case of \"from-timestamp\" scan mode. "
+ "If there is no snapshot earlier than this time, the earliest snapshot will be chosen.");

public static final ConfigOption<Long> SCAN_WATERMARK =
key("scan.watermark")
.longType()
.noDefaultValue()
.withDescription(
"Optional watermark used in case of \"from-snapshot\" scan mode. "
+ "If there is no snapshot later than this watermark, will throw an exceptions.");

public static final ConfigOption<Long> SCAN_FILE_CREATION_TIME_MILLIS =
key("scan.file-creation-time-millis")
.longType()
Expand Down Expand Up @@ -1450,6 +1458,7 @@ public static StartupMode startupMode(Options options) {
return StartupMode.FROM_TIMESTAMP;
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
|| options.getOptional(SCAN_TAG_NAME).isPresent()
|| options.getOptional(SCAN_WATERMARK).isPresent()
|| options.getOptional(SCAN_VERSION).isPresent()) {
return StartupMode.FROM_SNAPSHOT;
} else if (options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) {
Expand All @@ -1471,6 +1480,10 @@ public Long scanTimestampMills() {
return options.get(SCAN_TIMESTAMP_MILLIS);
}

public Long scanWatermark() {
return options.get(SCAN_WATERMARK);
}

public Long scanFileCreationTimeMills() {
return options.get(SCAN_FILE_CREATION_TIME_MILLIS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.paimon.CoreOptions.SCAN_WATERMARK;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
Expand Down Expand Up @@ -247,7 +248,11 @@ private static void validateStartupMode(CoreOptions options) {
Collections.singletonList(SCAN_TIMESTAMP_MILLIS));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
options, options.startupMode(), SCAN_SNAPSHOT_ID, SCAN_TAG_NAME);
options,
options.startupMode(),
SCAN_SNAPSHOT_ID,
SCAN_TAG_NAME,
SCAN_WATERMARK);
checkOptionsConflict(
options,
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -77,6 +78,7 @@
abstract class AbstractFileStoreTable implements FileStoreTable {

private static final long serialVersionUID = 1L;
private static final String WATERMARK_PREFIX = "watermark-";

protected final FileIO fileIO;
protected final Path path;
Expand Down Expand Up @@ -386,6 +388,8 @@ private Optional<TableSchema> tryTimeTravel(Options options) {
return travelToVersion(coreOptions.scanVersion(), options);
} else if (coreOptions.scanSnapshotId() != null) {
return travelToSnapshot(coreOptions.scanSnapshotId(), options);
} else if (coreOptions.scanWatermark() != null) {
return travelToWatermark(coreOptions.scanWatermark(), options);
} else {
return travelToTag(coreOptions.scanTagName(), options);
}
Expand All @@ -405,6 +409,10 @@ private Optional<TableSchema> travelToVersion(String version, Options options) {
if (tagManager().tagExists(version)) {
options.set(CoreOptions.SCAN_TAG_NAME, version);
return travelToTag(version, options);
} else if (version.startsWith(WATERMARK_PREFIX)) {
long watermark = Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
options.set(CoreOptions.SCAN_WATERMARK, watermark);
return travelToWatermark(watermark, options);
} else if (version.chars().allMatch(Character::isDigit)) {
options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
return travelToSnapshot(Long.parseLong(version), options);
Expand All @@ -425,6 +433,16 @@ private Optional<TableSchema> travelToSnapshot(long snapshotId, Options options)
return Optional.empty();
}

private Optional<TableSchema> travelToWatermark(long watermark, Options options) {
Snapshot snapshot =
StaticFromWatermarkStartingScanner.timeTravelToWatermark(
snapshotManager(), watermark);
if (snapshot != null) {
return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
}
return Optional.empty();
}

private Optional<TableSchema> travelToSnapshot(@Nullable Snapshot snapshot, Options options) {
if (snapshot != null) {
return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -167,10 +168,16 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
options.changelogLifecycleDecoupled())
: new StaticFromSnapshotStartingScanner(
snapshotManager, options.scanSnapshotId());
} else {
} else if (options.scanWatermark() != null) {
checkArgument(!isStreaming, "Cannot scan from watermark in streaming mode.");
return new StaticFromWatermarkStartingScanner(
snapshotManager, options().scanWatermark());
} else if (options.scanTagName() != null) {
checkArgument(!isStreaming, "Cannot scan from tag in streaming mode.");
return new StaticFromTagStartingScanner(
snapshotManager, options().scanTagName());
} else {
throw new UnsupportedOperationException("Unknown snapshot read mode");
}
case FROM_SNAPSHOT_FULL:
return isStreaming
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.source.snapshot;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

/** {@link StartingScanner} for the {@link CoreOptions#SCAN_WATERMARK} of a batch read. */
public class StaticFromWatermarkStartingScanner extends AbstractStartingScanner {

private static final Logger LOG =
LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class);

private final long watermark;

public StaticFromWatermarkStartingScanner(SnapshotManager snapshotManager, long watermark) {
super(snapshotManager);
this.watermark = watermark;
Snapshot snapshot = timeTravelToWatermark(snapshotManager, watermark);
if (snapshot != null) {
this.startingSnapshotId = snapshot.id();
}
}

@Override
public ScanMode startingScanMode() {
return ScanMode.ALL;
}

@Override
public Result scan(SnapshotReader snapshotReader) {
if (startingSnapshotId == null) {
LOG.warn(
"There is currently no snapshot later than or equal to watermark[{}]",
watermark);
throw new RuntimeException(
String.format(
"There is currently no snapshot later than or equal to "
+ "watermark[%d]",
watermark));
}
return StartingScanner.fromPlan(
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}

@Nullable
public static Snapshot timeTravelToWatermark(SnapshotManager snapshotManager, long watermark) {
return snapshotManager.laterOrEqualWatermark(watermark);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,63 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return finalSnapshot;
}

public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
return null;
}
Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
if (earliestWatermark != null) {
break;
}
}
}
if (earliestWatermark == null) {
return null;
}

if (earliestWatermark >= watermark) {
return snapshot(earliest);
}
Snapshot finalSnapshot = null;

while (earliest <= latest) {
long mid = earliest + (latest - earliest) / 2; // Avoid overflow
Snapshot snapshot = snapshot(mid);
Long commitWatermark = snapshot.watermark();
if (commitWatermark == null) {
// find the first snapshot with watermark
while (mid >= earliest) {
mid--;
commitWatermark = snapshot(mid).watermark();
if (commitWatermark != null) {
break;
}
}
}
if (commitWatermark == null) {
earliest = mid + 1;
} else {
if (commitWatermark > watermark) {
latest = mid - 1; // Search in the left half
finalSnapshot = snapshot;
} else if (commitWatermark < watermark) {
earliest = mid + 1; // Search in the right half
} else {
finalSnapshot = snapshot; // Found the exact match
break;
}
}
}
return finalSnapshot;
}

public long snapshotCount() throws IOException {
return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX).count();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -220,6 +223,36 @@ public void testTimeTravelRead() throws Exception {
.hasRootCauseMessage("Tag 'unknown' doesn't exist.");
}

@Test
@Timeout(120)
public void testTimeTravelReadWithWatermark() throws Exception {
streamSqlIter(
"CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING,"
+ " dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')");
sql(
"CREATE TABLE WT (a STRING, b STRING, c STRING, dt TIMESTAMP, PRIMARY KEY (a) NOT ENFORCED)");
CloseableIterator<Row> insert = streamSqlIter("INSERT INTO WT SELECT * FROM gen ");
List<Long> watermarks;
while (true) {
watermarks =
sql("SELECT `watermark` FROM WT$snapshots").stream()
.map(r -> (Long) r.getField("watermark"))
.collect(Collectors.toList());
if (watermarks.size() > 1) {
insert.close();
break;
}
Thread.sleep(1000);
}
Long maxWatermark = watermarks.get(watermarks.size() - 1);
assertThat(
batchSql(
String.format(
"SELECT * FROM WT /*+ OPTIONS('scan.watermark'='%d') */",
maxWatermark)))
.isNotEmpty();
}

@Test
public void testTimeTravelReadWithSnapshotExpiration() throws Exception {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
Expand Down
Loading

0 comments on commit ac62732

Please sign in to comment.