From 6c91e63a1abb2e02caa28c76c6e9e42e4c7a4905 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 12 Oct 2024 14:17:37 +0800 Subject: [PATCH] address --- .../table/source/snapshot/TimeTravelUtil.java | 34 ++++++++++++------- .../source/snapshot/TimeTravelUtilsTest.java | 23 +++++++------ 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index dd32a44392f6..2ef0d5153f84 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -24,22 +24,27 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.List; -/** Contains time travel functions. */ +/** The util class of resolve snapshot from scan params for time travel. */ public class TimeTravelUtil { - public static String[] scanKeys = { + private static String[] scanKeys = { CoreOptions.SCAN_SNAPSHOT_ID.key(), CoreOptions.SCAN_TAG_NAME.key(), CoreOptions.SCAN_WATERMARK.key(), CoreOptions.SCAN_TIMESTAMP_MILLIS.key() }; - public static Snapshot resolveSnapshotFromOption( + private static final Logger LOG = LoggerFactory.getLogger(TimeTravelUtil.class); + + public static Snapshot resolveSnapshotFromOptions( CoreOptions options, SnapshotManager snapshotManager) { - List scanHandleKey = new ArrayList<>(); + List scanHandleKey = new ArrayList<>(1); for (String key : scanKeys) { if (options.toConfiguration().containsKey(key)) { scanHandleKey.add(key); @@ -47,6 +52,7 @@ public static Snapshot resolveSnapshotFromOption( } if (scanHandleKey.size() == 0) { + LOG.warn("Not set any time travel parameter."); return null; } @@ -62,18 +68,19 @@ public static Snapshot resolveSnapshotFromOption( String key = scanHandleKey.get(0); Snapshot snapshot = null; if (key.equals(CoreOptions.SCAN_SNAPSHOT_ID.key())) { - snapshot = handleSnapshotId(snapshotManager, options); + snapshot = resolveSnapshotBySnapshotId(snapshotManager, options); } else if (key.equals(CoreOptions.SCAN_WATERMARK.key())) { - snapshot = handleWatermark(snapshotManager, options); + snapshot = resolveSnapshotByWatermark(snapshotManager, options); } else if (key.equals(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())) { - snapshot = handleTimestamp(snapshotManager, options); + snapshot = resolveSnapshotByTimestamp(snapshotManager, options); } else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) { - snapshot = handleTagName(snapshotManager, options); + snapshot = resolveSnapshotByTagName(snapshotManager, options); } return snapshot; } - private static Snapshot handleSnapshotId(SnapshotManager snapshotManager, CoreOptions options) { + private static Snapshot resolveSnapshotBySnapshotId( + SnapshotManager snapshotManager, CoreOptions options) { Long snapshotId = options.scanSnapshotId(); if (snapshotId != null && snapshotManager.snapshotExists(snapshotId)) { return snapshotManager.snapshot(snapshotId); @@ -81,17 +88,20 @@ private static Snapshot handleSnapshotId(SnapshotManager snapshotManager, CoreOp return null; } - private static Snapshot handleTimestamp(SnapshotManager snapshotManager, CoreOptions options) { + private static Snapshot resolveSnapshotByTimestamp( + SnapshotManager snapshotManager, CoreOptions options) { Long timestamp = options.scanTimestampMills(); return snapshotManager.earlierOrEqualTimeMills(timestamp); } - private static Snapshot handleWatermark(SnapshotManager snapshotManager, CoreOptions options) { + private static Snapshot resolveSnapshotByWatermark( + SnapshotManager snapshotManager, CoreOptions options) { Long watermark = options.scanWatermark(); return snapshotManager.laterOrEqualWatermark(watermark); } - private static Snapshot handleTagName(SnapshotManager snapshotManager, CoreOptions options) { + private static Snapshot resolveSnapshotByTagName( + SnapshotManager snapshotManager, CoreOptions options) { String tagName = options.scanTagName(); TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java index ee95a12f8cda..4d5ea873f108 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java @@ -28,20 +28,22 @@ import java.util.HashMap; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; /** Tests for {@link TimeTravelUtil}. */ public class TimeTravelUtilsTest extends ScannerTestBase { @Test - public void testScan() throws Exception { + public void testResolveSnapshotFromOptions() throws Exception { SnapshotManager snapshotManager = table.snapshotManager(); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); write.write(rowData(1, 10, 100L)); commit.commit(0, write.prepareCommit(true, 0)); + long ts = System.currentTimeMillis(); write.write(rowData(2, 30, 101L)); @@ -50,31 +52,32 @@ public void testScan() throws Exception { write.write(rowData(3, 50, 500L)); commit.commit(2, write.prepareCommit(true, 2)); - HashMap optMap = new HashMap<>(); + HashMap optMap = new HashMap<>(4); optMap.put("scan.snapshot-id", "2"); CoreOptions options = CoreOptions.fromMap(optMap); - Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager); - assertThat(snapshot.id() == 2); + Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager); + assertNotNull(snapshot); + assertTrue(snapshot.id() == 2); optMap.clear(); optMap.put("scan.timestamp-millis", ts + ""); options = CoreOptions.fromMap(optMap); - snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager); - assertThat(snapshot.id() == 1); + snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager); + assertTrue(snapshot.id() == 1); table.createTag("tag3", 3); optMap.clear(); optMap.put("scan.tag-name", "tag3"); options = CoreOptions.fromMap(optMap); - snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager); - assertThat(snapshot.id() == 3); + snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager); + assertTrue(snapshot.id() == 3); // if contain more scan.xxx config would throw out optMap.put("scan.snapshot-id", "2"); CoreOptions options1 = CoreOptions.fromMap(optMap); assertThrows( IllegalArgumentException.class, - () -> TimeTravelUtil.resolveSnapshotFromOption(options1, snapshotManager), + () -> TimeTravelUtil.resolveSnapshotFromOptions(options1, snapshotManager), "scan.snapshot-id scan.tag-name scan.watermark and scan.timestamp-millis can contains only one"); write.close(); commit.close();