Skip to content

Commit

Permalink
set changelog version to 1
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 30, 2024
1 parent 28f6cee commit 69dae82
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
2 changes: 1 addition & 1 deletion paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public class Changelog extends Snapshot {

private static final int CURRENT_VERSION = 3;
private static final int CURRENT_VERSION = 1;

public Changelog(
long id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.Changelog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
Expand Down Expand Up @@ -386,6 +387,10 @@ private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
case DELTA:
return snapshot.deltaManifests(manifestList);
case CHANGELOG:
if (snapshot instanceof Changelog) {
return snapshot.changelogManifests(manifestList);
}

if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) {
return snapshot.changelogManifests(manifestList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
Expand Down Expand Up @@ -117,6 +116,7 @@ public void testScanFromChangelog() throws Exception {
Options options = new Options();
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
FileStoreTable table =
createFileStoreTable(
true,
Expand All @@ -128,7 +128,6 @@ public void testScanFromChangelog() throws Exception {
+ "/"
+ UUID.randomUUID()));
SnapshotManager snapshotManager = table.snapshotManager();
ExpireSnapshots expireSnapshots = table.newExpireSnapshots();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

Expand Down Expand Up @@ -157,20 +156,20 @@ public void testScanFromChangelog() throws Exception {

ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(
snapshotManager, snapshotManager.snapshot(3).timeMillis(), false, false);
snapshotManager, snapshotManager.snapshot(3).timeMillis(), true, true);
StartingScanner.NextSnapshot result =
(StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
assertThat(result.nextSnapshotId()).isEqualTo(3);
scanner =
new ContinuousFromTimestampStartingScanner(
snapshotManager, snapshotManager.snapshot(2).timeMillis(), false, false);
snapshotManager, snapshotManager.snapshot(2).timeMillis(), true, true);

assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId())
.isEqualTo(2);

scanner =
new ContinuousFromTimestampStartingScanner(
snapshotManager, snapshotManager.changelog(1).timeMillis(), false, false);
snapshotManager, snapshotManager.changelog(1).timeMillis(), true, true);
assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId())
.isEqualTo(1);

Expand Down

0 comments on commit 69dae82

Please sign in to comment.