diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index acbbc1dde84f..3ea9b31be5d9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -243,7 +243,6 @@ public int expireUntil(long earliestId, long endExclusiveId) { snapshot.changelogRecordCount(), snapshot.watermark(), null); - commitChangelog(changelog); snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, false); } else { // no changelog @@ -265,9 +264,9 @@ public int expireUntil(long earliestId, long endExclusiveId) { snapshot.changelogRecordCount(), snapshot.watermark(), snapshot.statistics()); - commitChangelog(changelog); snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, true); } + commitChangelog(changelog); } else { snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, true); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 00f47b5449a0..c8c7be7e12e3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -390,6 +390,41 @@ public void testExpireWithUpgradedFile() throws Exception { store.assertCleaned(); } + @Test + public void testChangelogOutLivedSnapshot() throws Exception { + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + commit(10, allData, snapshotPositions); + ExpireSnapshots snapshot = store.newExpire(1, 2, Long.MAX_VALUE, true, true); + ExpireSnapshots changelog = store.newChangelogExpire(1, 3, Long.MAX_VALUE, true); + // expire twice to check for idempotence + snapshot.expire(); + snapshot.expire(); + + int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); + int earliestSnapshotId = snapshotManager.earliestSnapshotId().intValue(); + int latestLongLivedChangelogId = snapshotManager.latestLongLivedChangelogId().intValue(); + int earliestLongLivedChangelogId = + snapshotManager.earliestLongLivedChangelogId().intValue(); + + // 2 snapshot in /snapshot + assertThat(latestSnapshotId - earliestSnapshotId).isEqualTo(1); + assertThat(earliestLongLivedChangelogId).isEqualTo(1); + // The changelog id and snapshot id is continuous + assertThat(earliestSnapshotId - latestLongLivedChangelogId).isEqualTo(1); + + changelog.expire(); + changelog.expire(); + + assertThat(snapshotManager.latestSnapshotId().intValue()).isEqualTo(latestSnapshotId); + assertThat(snapshotManager.earliestSnapshotId().intValue()).isEqualTo(earliestSnapshotId); + assertThat(snapshotManager.latestLongLivedChangelogId()) + .isEqualTo(snapshotManager.earliestSnapshotId() - 1); + assertThat(snapshotManager.earliestLongLivedChangelogId()) + .isEqualTo(snapshotManager.earliestSnapshotId() - 1); + store.assertCleaned(); + } + private TestFileStore createStore() { ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -449,38 +484,4 @@ protected void assertSnapshot( Map actual = store.toKvMap(actualKvs); assertThat(actual).isEqualTo(expected); } - - @Test - public void testChangelogOutLivedSnapshot() throws Exception { - List allData = new ArrayList<>(); - List snapshotPositions = new ArrayList<>(); - commit(10, allData, snapshotPositions); - ExpireSnapshots snapshot = store.newExpire(1, 2, Long.MAX_VALUE, true, true); - ExpireSnapshots changelog = store.newChangelogExpire(1, 3, Long.MAX_VALUE, true); - // expire twice to check for idempotence - snapshot.expire(); - snapshot.expire(); - - int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); - int earliestSnapshotId = snapshotManager.earliestSnapshotId().intValue(); - int latestLongLivedChangelogId = snapshotManager.latestLongLivedChangelogId().intValue(); - int earliestLongLivedChangelogId = - snapshotManager.earliestLongLivedChangelogId().intValue(); - - // 2 snapshot in /snapshot - assertThat(latestSnapshotId - earliestSnapshotId).isEqualTo(1); - assertThat(earliestLongLivedChangelogId).isEqualTo(1); - // The changelog id and snapshot id is continuous - assertThat(earliestSnapshotId - latestLongLivedChangelogId).isEqualTo(1); - - changelog.expire(); - changelog.expire(); - - assertThat(snapshotManager.latestSnapshotId().intValue()).isEqualTo(latestSnapshotId); - assertThat(snapshotManager.earliestSnapshotId().intValue()).isEqualTo(earliestSnapshotId); - assertThat(snapshotManager.latestLongLivedChangelogId()) - .isEqualTo(snapshotManager.earliestSnapshotId() - 1); - assertThat(snapshotManager.earliestLongLivedChangelogId()) - .isEqualTo(snapshotManager.earliestSnapshotId() - 1); - } }