From 69dae826a3200d056d844485d5e6fca924debaf3 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sat, 30 Mar 2024 21:55:24 +0800 Subject: [PATCH] set changelog version to 1 --- .../src/main/java/org/apache/paimon/Changelog.java | 2 +- .../apache/paimon/operation/AbstractFileStoreScan.java | 5 +++++ .../ContinuousFromTimestampStartingScannerTest.java | 9 ++++----- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java index 023be67de840b..dc44ff762faf9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java +++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java @@ -39,7 +39,7 @@ */ public class Changelog extends Snapshot { - private static final int CURRENT_VERSION = 3; + private static final int CURRENT_VERSION = 1; public Changelog( long id, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index a911b084742b4..2982e2a9eb9d5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.Changelog; import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; @@ -386,6 +387,10 @@ private List readManifests(Snapshot snapshot) { case DELTA: return snapshot.deltaManifests(manifestList); case CHANGELOG: + if (snapshot instanceof Changelog) { + return snapshot.changelogManifests(manifestList); + } + if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) { return snapshot.changelogManifests(manifestList); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java index 38ba793ccf85a..4223e0c01a495 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; -import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; @@ -117,6 +116,7 @@ public void testScanFromChangelog() throws Exception { Options options = new Options(); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); + options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); FileStoreTable table = createFileStoreTable( true, @@ -128,7 +128,6 @@ public void testScanFromChangelog() throws Exception { + "/" + UUID.randomUUID())); SnapshotManager snapshotManager = table.snapshotManager(); - ExpireSnapshots expireSnapshots = table.newExpireSnapshots(); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); @@ -157,20 +156,20 @@ public void testScanFromChangelog() throws Exception { ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(3).timeMillis(), false, false); + snapshotManager, snapshotManager.snapshot(3).timeMillis(), true, true); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(2).timeMillis(), false, false); + snapshotManager, snapshotManager.snapshot(2).timeMillis(), true, true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(2); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.changelog(1).timeMillis(), false, false); + snapshotManager, snapshotManager.changelog(1).timeMillis(), true, true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(1);