Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyu committed Oct 12, 2024
1 parent 51ef850 commit 6c91e63
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,35 @@
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/** Contains time travel functions. */
/** The util class of resolve snapshot from scan params for time travel. */
public class TimeTravelUtil {

public static String[] scanKeys = {
private static String[] scanKeys = {
CoreOptions.SCAN_SNAPSHOT_ID.key(),
CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_WATERMARK.key(),
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()
};

public static Snapshot resolveSnapshotFromOption(
private static final Logger LOG = LoggerFactory.getLogger(TimeTravelUtil.class);

public static Snapshot resolveSnapshotFromOptions(
CoreOptions options, SnapshotManager snapshotManager) {
List<String> scanHandleKey = new ArrayList<>();
List<String> scanHandleKey = new ArrayList<>(1);
for (String key : scanKeys) {
if (options.toConfiguration().containsKey(key)) {
scanHandleKey.add(key);
}
}

if (scanHandleKey.size() == 0) {
LOG.warn("Not set any time travel parameter.");
return null;
}

Expand All @@ -62,36 +68,40 @@ public static Snapshot resolveSnapshotFromOption(
String key = scanHandleKey.get(0);
Snapshot snapshot = null;
if (key.equals(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
snapshot = handleSnapshotId(snapshotManager, options);
snapshot = resolveSnapshotBySnapshotId(snapshotManager, options);
} else if (key.equals(CoreOptions.SCAN_WATERMARK.key())) {
snapshot = handleWatermark(snapshotManager, options);
snapshot = resolveSnapshotByWatermark(snapshotManager, options);
} else if (key.equals(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())) {
snapshot = handleTimestamp(snapshotManager, options);
snapshot = resolveSnapshotByTimestamp(snapshotManager, options);
} else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) {
snapshot = handleTagName(snapshotManager, options);
snapshot = resolveSnapshotByTagName(snapshotManager, options);
}
return snapshot;
}

private static Snapshot handleSnapshotId(SnapshotManager snapshotManager, CoreOptions options) {
private static Snapshot resolveSnapshotBySnapshotId(
SnapshotManager snapshotManager, CoreOptions options) {
Long snapshotId = options.scanSnapshotId();
if (snapshotId != null && snapshotManager.snapshotExists(snapshotId)) {
return snapshotManager.snapshot(snapshotId);
}
return null;
}

private static Snapshot handleTimestamp(SnapshotManager snapshotManager, CoreOptions options) {
private static Snapshot resolveSnapshotByTimestamp(
SnapshotManager snapshotManager, CoreOptions options) {
Long timestamp = options.scanTimestampMills();
return snapshotManager.earlierOrEqualTimeMills(timestamp);
}

private static Snapshot handleWatermark(SnapshotManager snapshotManager, CoreOptions options) {
private static Snapshot resolveSnapshotByWatermark(
SnapshotManager snapshotManager, CoreOptions options) {
Long watermark = options.scanWatermark();
return snapshotManager.laterOrEqualWatermark(watermark);
}

private static Snapshot handleTagName(SnapshotManager snapshotManager, CoreOptions options) {
private static Snapshot resolveSnapshotByTagName(
SnapshotManager snapshotManager, CoreOptions options) {
String tagName = options.scanTagName();
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,22 @@

import java.util.HashMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Tests for {@link TimeTravelUtil}. */
public class TimeTravelUtilsTest extends ScannerTestBase {

@Test
public void testScan() throws Exception {
public void testResolveSnapshotFromOptions() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowData(1, 10, 100L));
commit.commit(0, write.prepareCommit(true, 0));

long ts = System.currentTimeMillis();

write.write(rowData(2, 30, 101L));
Expand All @@ -50,31 +52,32 @@ public void testScan() throws Exception {
write.write(rowData(3, 50, 500L));
commit.commit(2, write.prepareCommit(true, 2));

HashMap<String, String> optMap = new HashMap<>();
HashMap<String, String> optMap = new HashMap<>(4);
optMap.put("scan.snapshot-id", "2");
CoreOptions options = CoreOptions.fromMap(optMap);
Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager);
assertThat(snapshot.id() == 2);
Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager);
assertNotNull(snapshot);
assertTrue(snapshot.id() == 2);

optMap.clear();
optMap.put("scan.timestamp-millis", ts + "");
options = CoreOptions.fromMap(optMap);
snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager);
assertThat(snapshot.id() == 1);
snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager);
assertTrue(snapshot.id() == 1);

table.createTag("tag3", 3);
optMap.clear();
optMap.put("scan.tag-name", "tag3");
options = CoreOptions.fromMap(optMap);
snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager);
assertThat(snapshot.id() == 3);
snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager);
assertTrue(snapshot.id() == 3);

// if contain more scan.xxx config would throw out
optMap.put("scan.snapshot-id", "2");
CoreOptions options1 = CoreOptions.fromMap(optMap);
assertThrows(
IllegalArgumentException.class,
() -> TimeTravelUtil.resolveSnapshotFromOption(options1, snapshotManager),
() -> TimeTravelUtil.resolveSnapshotFromOptions(options1, snapshotManager),
"scan.snapshot-id scan.tag-name scan.watermark and scan.timestamp-millis can contains only one");
write.close();
commit.close();
Expand Down

0 comments on commit 6c91e63

Please sign in to comment.