From f1154f502d16362c83893a2ab29586b688e644e4 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Mon, 27 May 2024 10:22:13 +0800 Subject: [PATCH] [core] Introduce ChangelogDeletion to Expiration (#3383) --- .../java/org/apache/paimon/CoreOptions.java | 14 ++ .../apache/paimon/options/ExpireConfig.java | 166 ++++++++++++++++++ .../org/apache/paimon/AbstractFileStore.java | 12 ++ .../java/org/apache/paimon/FileStore.java | 3 + .../paimon/operation/ChangelogDeletion.java | 117 ++++++++++++ .../paimon/operation/FileDeletionBase.java | 134 +++++++++++++- .../paimon/operation/SnapshotDeletion.java | 131 +------------- .../apache/paimon/operation/TagDeletion.java | 2 +- .../paimon/privilege/PrivilegedFileStore.java | 7 + .../paimon/table/AbstractFileStoreTable.java | 26 +-- .../paimon/table/ExpireChangelogImpl.java | 75 ++++---- .../apache/paimon/table/ExpireSnapshots.java | 10 +- .../paimon/table/ExpireSnapshotsImpl.java | 48 ++--- .../java/org/apache/paimon/TestFileStore.java | 57 +++--- .../paimon/operation/ExpireSnapshotsTest.java | 25 ++- .../procedure/ExpireSnapshotsProcedure.java | 8 +- .../procedure/ExpireSnapshotsProcedure.java | 21 ++- .../procedure/ExpireSnapshotsProcedure.java | 15 +- 18 files changed, 596 insertions(+), 275 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/options/ExpireConfig.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 5927d7c1a45a..76f6b7a08231 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -27,6 +27,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.options.description.DescribedEnum; @@ -1371,6 +1372,19 @@ public boolean snapshotExpireCleanEmptyDirectories() { return options.get(SNAPSHOT_EXPIRE_CLEAN_EMPTY_DIRECTORIES); } + public ExpireConfig expireConfig() { + return ExpireConfig.builder() + .snapshotRetainMax(snapshotNumRetainMax()) + .snapshotRetainMin(snapshotNumRetainMin()) + .snapshotTimeRetain(snapshotTimeRetain()) + .snapshotMaxDeletes(snapshotExpireLimit()) + .changelogRetainMax(options.getOptional(CHANGELOG_NUM_RETAINED_MAX).orElse(null)) + .changelogRetainMin(options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(null)) + .changelogTimeRetain(options.getOptional(CHANGELOG_TIME_RETAINED).orElse(null)) + .changelogMaxDeletes(snapshotExpireLimit()) + .build(); + } + public int manifestMergeMinCount() { return options.get(MANIFEST_MERGE_MIN_COUNT); } diff --git a/paimon-common/src/main/java/org/apache/paimon/options/ExpireConfig.java b/paimon-common/src/main/java/org/apache/paimon/options/ExpireConfig.java new file mode 100644 index 000000000000..0de3828db033 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/options/ExpireConfig.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.options; + +import java.time.Duration; + +/** Config of snapshot and changelog expiration. */ +public class ExpireConfig { + private final int snapshotRetainMax; + private final int snapshotRetainMin; + private final Duration snapshotTimeRetain; + private final int snapshotMaxDeletes; + private final int changelogRetainMax; + private final int changelogRetainMin; + private final Duration changelogTimeRetain; + private final int changelogMaxDeletes; + private final boolean changelogDecoupled; + + public ExpireConfig( + int snapshotRetainMax, + int snapshotRetainMin, + Duration snapshotTimeRetain, + int snapshotMaxDeletes, + int changelogRetainMax, + int changelogRetainMin, + Duration changelogTimeRetain, + int changelogMaxDeletes) { + this.snapshotRetainMax = snapshotRetainMax; + this.snapshotRetainMin = snapshotRetainMin; + this.snapshotTimeRetain = snapshotTimeRetain; + this.snapshotMaxDeletes = snapshotMaxDeletes; + this.changelogRetainMax = changelogRetainMax; + this.changelogRetainMin = changelogRetainMin; + this.changelogTimeRetain = changelogTimeRetain; + this.changelogMaxDeletes = changelogMaxDeletes; + this.changelogDecoupled = + changelogRetainMax > snapshotRetainMax + || changelogRetainMin > snapshotRetainMin + || changelogTimeRetain.compareTo(snapshotTimeRetain) > 0; + } + + public int getSnapshotRetainMax() { + return snapshotRetainMax; + } + + public int getSnapshotRetainMin() { + return snapshotRetainMin; + } + + public Duration getSnapshotTimeRetain() { + return snapshotTimeRetain; + } + + public int getChangelogRetainMax() { + return changelogRetainMax; + } + + public int getChangelogRetainMin() { + return changelogRetainMin; + } + + public Duration getChangelogTimeRetain() { + return changelogTimeRetain; + } + + public int getSnapshotMaxDeletes() { + return snapshotMaxDeletes; + } + + public int getChangelogMaxDeletes() { + return changelogMaxDeletes; + } + + public boolean isChangelogDecoupled() { + return changelogDecoupled; + } + + public static Builder builder() { + return new Builder(); + } + + /** The builder for {@link ExpireConfig}. */ + public static final class Builder { + private int snapshotRetainMax = Integer.MAX_VALUE; + private int snapshotRetainMin = 1; + private Duration snapshotTimeRetain = Duration.ofMillis(Long.MAX_VALUE); + private int snapshotMaxDeletes = Integer.MAX_VALUE; + // If changelog config is not set, use the snapshot's by default + private Integer changelogRetainMax = null; + private Integer changelogRetainMin = null; + private Duration changelogTimeRetain = null; + private Integer changelogMaxDeletes = null; + + public static Builder builder() { + return new Builder(); + } + + public Builder snapshotRetainMax(int snapshotRetainMax) { + this.snapshotRetainMax = snapshotRetainMax; + return this; + } + + public Builder snapshotRetainMin(int snapshotRetainMin) { + this.snapshotRetainMin = snapshotRetainMin; + return this; + } + + public Builder snapshotTimeRetain(Duration snapshotTimeRetain) { + this.snapshotTimeRetain = snapshotTimeRetain; + return this; + } + + public Builder snapshotMaxDeletes(int snapshotMaxDeletes) { + this.snapshotMaxDeletes = snapshotMaxDeletes; + return this; + } + + public Builder changelogRetainMax(Integer changelogRetainMax) { + this.changelogRetainMax = changelogRetainMax; + return this; + } + + public Builder changelogRetainMin(Integer changelogRetainMin) { + this.changelogRetainMin = changelogRetainMin; + return this; + } + + public Builder changelogTimeRetain(Duration changelogTimeRetain) { + this.changelogTimeRetain = changelogTimeRetain; + return this; + } + + public Builder changelogMaxDeletes(Integer changelogMaxDeletes) { + this.changelogMaxDeletes = changelogMaxDeletes; + return this; + } + + public ExpireConfig build() { + return new ExpireConfig( + snapshotRetainMax, + snapshotRetainMin, + snapshotTimeRetain, + snapshotMaxDeletes, + changelogRetainMax == null ? snapshotRetainMax : changelogRetainMax, + changelogRetainMin == null ? snapshotRetainMin : changelogRetainMin, + changelogTimeRetain == null ? snapshotTimeRetain : changelogTimeRetain, + changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 7528b393d6e0..75c532a5ca05 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.metastore.AddPartitionTagCallback; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; @@ -206,6 +207,17 @@ public SnapshotDeletion newSnapshotDeletion() { newStatsFileHandler()); } + @Override + public ChangelogDeletion newChangelogDeletion() { + return new ChangelogDeletion( + fileIO, + pathFactory(), + manifestFileFactory().create(), + manifestListFactory().create(), + newIndexFileHandler(), + newStatsFileHandler()); + } + @Override public TagManager newTagManager() { return new TagManager(fileIO, options.path()); diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 66bf3363de01..e943d38cf5e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -22,6 +22,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.FileStoreWrite; @@ -81,6 +82,8 @@ public interface FileStore extends Serializable { SnapshotDeletion newSnapshotDeletion(); + ChangelogDeletion newChangelogDeletion(); + TagManager newTagManager(); TagDeletion newTagDeletion(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java new file mode 100644 index 000000000000..198c66a1ceac --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.Changelog; +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.stats.StatsFileHandler; +import org.apache.paimon.utils.FileStorePathFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; + +/** Delete changelog files. */ +public class ChangelogDeletion extends FileDeletionBase { + public ChangelogDeletion( + FileIO fileIO, + FileStorePathFactory pathFactory, + ManifestFile manifestFile, + ManifestList manifestList, + IndexFileHandler indexFileHandler, + StatsFileHandler statsFileHandler) { + super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler); + } + + @Override + public void cleanUnusedDataFiles(Changelog changelog, Predicate skipper) { + if (changelog.changelogManifestList() != null) { + deleteAddedDataFiles(changelog.changelogManifestList()); + } + + if (manifestList.exists(changelog.deltaManifestList())) { + cleanUnusedDataFiles(changelog.deltaManifestList(), skipper); + } + } + + @Override + public void cleanUnusedManifests(Changelog changelog, Set skippingSet) { + if (changelog.changelogManifestList() != null) { + cleanUnusedManifestList(changelog.changelogManifestList(), skippingSet); + } + + if (manifestList.exists(changelog.deltaManifestList())) { + cleanUnusedManifestList(changelog.deltaManifestList(), skippingSet); + } + + if (manifestList.exists(changelog.baseManifestList())) { + cleanUnusedManifestList(changelog.baseManifestList(), skippingSet); + } + + // the index and statics manifest list should handle by snapshot deletion. + } + + public Set manifestSkippingSet(List skippingSnapshots) { + Set skippingSet = new HashSet<>(); + + for (Snapshot skippingSnapshot : skippingSnapshots) { + // base manifests + if (manifestList.exists(skippingSnapshot.baseManifestList())) { + skippingSet.add(skippingSnapshot.baseManifestList()); + manifestList.read(skippingSnapshot.baseManifestList()).stream() + .map(ManifestFileMeta::fileName) + .forEach(skippingSet::add); + } + + // delta manifests + if (manifestList.exists(skippingSnapshot.deltaManifestList())) { + skippingSet.add(skippingSnapshot.deltaManifestList()); + manifestList.read(skippingSnapshot.deltaManifestList()).stream() + .map(ManifestFileMeta::fileName) + .forEach(skippingSet::add); + } + + // index manifests + String indexManifest = skippingSnapshot.indexManifest(); + if (indexManifest != null) { + skippingSet.add(indexManifest); + indexFileHandler.readManifest(indexManifest).stream() + .map(IndexManifestEntry::indexFile) + .map(IndexFileMeta::fileName) + .forEach(skippingSet::add); + } + + // statistics + if (skippingSnapshot.statistics() != null) { + skippingSet.add(skippingSnapshot.statistics()); + } + } + + return skippingSet; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 8bd422e53f00..c0b5c289cf7f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -25,6 +25,7 @@ import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; @@ -33,6 +34,8 @@ import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FileUtils; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +60,7 @@ * Base class for file deletion including methods for clean data files, manifest files and empty * data directories. */ -public abstract class FileDeletionBase { +public abstract class FileDeletionBase { private static final Logger LOG = LoggerFactory.getLogger(FileDeletionBase.class); @@ -70,6 +73,13 @@ public abstract class FileDeletionBase { protected final Map> deletionBuckets; protected final Executor ioExecutor; + protected boolean changelogDecoupled; + + /** Used to record which tag is cached in tagged snapshots list. */ + private int cachedTagIndex = -1; + + /** Used to cache data files used by current tag. */ + private final Map>> cachedTagDataFiles = new HashMap<>(); public FileDeletionBase( FileIO fileIO, @@ -95,7 +105,7 @@ public FileDeletionBase( * @param skipper if the test result of a data file is true, it will be skipped when deleting; * else it will be deleted */ - public abstract void cleanUnusedDataFiles(Snapshot snapshot, Predicate skipper); + public abstract void cleanUnusedDataFiles(T snapshot, Predicate skipper); /** * Clean metadata files that will not be used anymore of a snapshot, including data manifests, @@ -104,7 +114,11 @@ public FileDeletionBase( * @param snapshot {@link Snapshot} that will be cleaned * @param skippingSet manifests that should not be deleted */ - public abstract void cleanUnusedManifests(Snapshot snapshot, Set skippingSet); + public abstract void cleanUnusedManifests(T snapshot, Set skippingSet); + + public void setChangelogDecoupled(boolean changelogDecoupled) { + this.changelogDecoupled = changelogDecoupled; + } /** Try to delete data directories that may be empty after data file deletion. */ public void cleanDataDirectories() { @@ -151,6 +165,105 @@ protected void recordDeletionBuckets(ManifestEntry entry) { .add(entry.bucket()); } + public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { + // try read manifests + List manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList)); + List manifestEntries; + // data file path -> (original manifest entry, extra file paths) + Map>> dataFileToDelete = new HashMap<>(); + for (String manifest : manifestFileNames) { + try { + manifestEntries = manifestFile.read(manifest); + } catch (Exception e) { + // cancel deletion if any exception occurs + LOG.warn("Failed to read some manifest files. Cancel deletion.", e); + return; + } + + getDataFileToDelete(dataFileToDelete, manifestEntries); + } + + doCleanUnusedDataFile(dataFileToDelete, skipper); + } + + protected void doCleanUnusedDataFile( + Map>> dataFileToDelete, + Predicate skipper) { + List actualDataFileToDelete = new ArrayList<>(); + dataFileToDelete.forEach( + (path, pair) -> { + ManifestEntry entry = pair.getLeft(); + // check whether we should skip the data file + if (!skipper.test(entry)) { + // delete data files + actualDataFileToDelete.add(path); + actualDataFileToDelete.addAll(pair.getRight()); + + recordDeletionBuckets(entry); + } + }); + deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly); + } + + protected void getDataFileToDelete( + Map>> dataFileToDelete, + List dataFileEntries) { + // we cannot delete a data file directly when we meet a DELETE entry, because that + // file might be upgraded + for (ManifestEntry entry : dataFileEntries) { + Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); + Path dataFilePath = new Path(bucketPath, entry.file().fileName()); + switch (entry.kind()) { + case ADD: + dataFileToDelete.remove(dataFilePath); + break; + case DELETE: + List extraFiles = new ArrayList<>(entry.file().extraFiles().size()); + for (String file : entry.file().extraFiles()) { + extraFiles.add(new Path(bucketPath, file)); + } + dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles)); + break; + default: + throw new UnsupportedOperationException( + "Unknown value kind " + entry.kind().name()); + } + } + } + + /** + * Delete added file in the manifest list files. Added files marked as "ADD" in manifests. + * + * @param manifestListName name of manifest list + */ + public void deleteAddedDataFiles(String manifestListName) { + List manifestFileNames = + readManifestFileNames(tryReadManifestList(manifestListName)); + for (String file : manifestFileNames) { + try { + List manifestEntries = manifestFile.read(file); + deleteAddedDataFiles(manifestEntries); + } catch (Exception e) { + // We want to delete the data file, so just ignore the unavailable files + LOG.info("Failed to read manifest " + file + ". Ignore it.", e); + } + } + } + + private void deleteAddedDataFiles(List manifestEntries) { + List dataFileToDelete = new ArrayList<>(); + for (ManifestEntry entry : manifestEntries) { + if (entry.kind() == FileKind.ADD) { + dataFileToDelete.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + recordDeletionBuckets(entry); + } + } + deleteFiles(dataFileToDelete, fileIO::deleteQuietly); + } + public void cleanUnusedStatisticsManifests(Snapshot snapshot, Set skippingSet) { // clean statistics if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) { @@ -193,7 +306,7 @@ public void cleanUnusedManifestList(String manifestName, Set skippingSet deleteFiles(toDeleteManifests, manifestFile::delete); } - public void cleanUnusedManifests( + protected void cleanUnusedManifests( Snapshot snapshot, Set skippingSet, boolean deleteChangelog) { cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet); cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet); @@ -204,6 +317,19 @@ public void cleanUnusedManifests( cleanUnusedStatisticsManifests(snapshot, skippingSet); } + public Predicate dataFileSkipper( + List taggedSnapshots, long expiringSnapshotId) throws Exception { + int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId); + // refresh tag data files + if (index >= 0 && cachedTagIndex != index) { + cachedTagIndex = index; + cachedTagDataFiles.clear(); + addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index)); + } + + return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, entry); + } + /** * It is possible that a job was killed during expiration and some manifest files have been * deleted, so if the clean methods need to get manifests of a snapshot to be cleaned, we should diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java index a0c442fcd96d..f7fbdfac20e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java @@ -20,23 +20,16 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; -import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.TagManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,15 +37,7 @@ import java.util.function.Predicate; /** Delete snapshot files. */ -public class SnapshotDeletion extends FileDeletionBase { - - private static final Logger LOG = LoggerFactory.getLogger(SnapshotDeletion.class); - - /** Used to record which tag is cached in tagged snapshots list. */ - private int cachedTagIndex = -1; - - /** Used to cache data files used by current tag. */ - private final Map>> cachedTagDataFiles = new HashMap<>(); +public class SnapshotDeletion extends FileDeletionBase { public SnapshotDeletion( FileIO fileIO, @@ -69,75 +54,9 @@ public void cleanUnusedDataFiles(Snapshot snapshot, Predicate ski cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper); } - public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { - // try read manifests - List manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList)); - List manifestEntries; - // data file path -> (original manifest entry, extra file paths) - Map>> dataFileToDelete = new HashMap<>(); - for (String manifest : manifestFileNames) { - try { - manifestEntries = manifestFile.read(manifest); - } catch (Exception e) { - // cancel deletion if any exception occurs - LOG.warn("Failed to read some manifest files. Cancel deletion.", e); - return; - } - - getDataFileToDelete(dataFileToDelete, manifestEntries); - } - - doCleanUnusedDataFile(dataFileToDelete, skipper); - } - @Override public void cleanUnusedManifests(Snapshot snapshot, Set skippingSet) { - cleanUnusedManifests(snapshot, skippingSet, true); - } - - private void getDataFileToDelete( - Map>> dataFileToDelete, - List dataFileEntries) { - // we cannot delete a data file directly when we meet a DELETE entry, because that - // file might be upgraded - for (ManifestEntry entry : dataFileEntries) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - Path dataFilePath = new Path(bucketPath, entry.file().fileName()); - switch (entry.kind()) { - case ADD: - dataFileToDelete.remove(dataFilePath); - break; - case DELETE: - List extraFiles = new ArrayList<>(entry.file().extraFiles().size()); - for (String file : entry.file().extraFiles()) { - extraFiles.add(new Path(bucketPath, file)); - } - dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles)); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } - } - } - - private void doCleanUnusedDataFile( - Map>> dataFileToDelete, - Predicate skipper) { - List actualDataFileToDelete = new ArrayList<>(); - dataFileToDelete.forEach( - (path, pair) -> { - ManifestEntry entry = pair.getLeft(); - // check whether we should skip the data file - if (!skipper.test(entry)) { - // delete data files - actualDataFileToDelete.add(path); - actualDataFileToDelete.addAll(pair.getRight()); - - recordDeletionBuckets(entry); - } - }); - deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly); + cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled); } @VisibleForTesting @@ -146,50 +65,4 @@ void cleanUnusedDataFile(List dataFileLog) { getDataFileToDelete(dataFileToDelete, dataFileLog); doCleanUnusedDataFile(dataFileToDelete, f -> false); } - - /** - * Delete added file in the manifest list files. Added files marked as "ADD" in manifests. - * - * @param manifestListName name of manifest list - */ - public void deleteAddedDataFiles(String manifestListName) { - List manifestFileNames = - readManifestFileNames(tryReadManifestList(manifestListName)); - for (String file : manifestFileNames) { - try { - List manifestEntries = manifestFile.read(file); - deleteAddedDataFiles(manifestEntries); - } catch (Exception e) { - // We want to delete the data file, so just ignore the unavailable files - LOG.info("Failed to read manifest " + file + ". Ignore it.", e); - } - } - } - - private void deleteAddedDataFiles(List manifestEntries) { - List dataFileToDelete = new ArrayList<>(); - for (ManifestEntry entry : manifestEntries) { - if (entry.kind() == FileKind.ADD) { - dataFileToDelete.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); - recordDeletionBuckets(entry); - } - } - deleteFiles(dataFileToDelete, fileIO::deleteQuietly); - } - - public Predicate dataFileSkipper( - List taggedSnapshots, long expiringSnapshotId) throws Exception { - int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId); - // refresh tag data files - if (index >= 0 && cachedTagIndex != index) { - cachedTagIndex = index; - cachedTagDataFiles.clear(); - addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index)); - } - - return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, entry); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index 7ef258046d82..01db29ec94c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -43,7 +43,7 @@ import java.util.function.Predicate; /** Delete tag files. */ -public class TagDeletion extends FileDeletionBase { +public class TagDeletion extends FileDeletionBase { private static final Logger LOG = LoggerFactory.getLogger(TagDeletion.class); diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index e71a0cde1cb8..ca2ad04a232d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.FileStoreWrite; @@ -144,6 +145,12 @@ public SnapshotDeletion newSnapshotDeletion() { return wrapped.newSnapshotDeletion(); } + @Override + public ChangelogDeletion newChangelogDeletion() { + privilegeChecker.assertCanInsert(identifier); + return wrapped.newChangelogDeletion(); + } + @Override public TagManager newTagManager() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index baaac994494c..655e81431690 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -29,6 +29,7 @@ import org.apache.paimon.metastore.TagPreviewCommitCallback; import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; @@ -289,8 +290,7 @@ public ExpireSnapshots newExpireSnapshots() { snapshotManager(), store().newSnapshotDeletion(), store().newTagManager(), - coreOptions().snapshotExpireCleanEmptyDirectories(), - coreOptions().changelogLifecycleDecoupled()); + coreOptions().snapshotExpireCleanEmptyDirectories()); } @Override @@ -298,7 +298,7 @@ public ExpireSnapshots newExpireChangelog() { return new ExpireChangelogImpl( snapshotManager(), tagManager(), - store().newSnapshotDeletion(), + store().newChangelogDeletion(), coreOptions().snapshotExpireCleanEmptyDirectories()); } @@ -308,24 +308,14 @@ public TableCommitImpl newCommit(String commitUser) { Runnable snapshotExpire = null; if (!options.writeOnly()) { boolean changelogDecoupled = options.changelogLifecycleDecoupled(); - ExpireSnapshots expireChangelog = - newExpireChangelog() - .maxDeletes(options.snapshotExpireLimit()) - .retainMin(options.changelogNumRetainMin()) - .retainMax(options.changelogNumRetainMax()); - ExpireSnapshots expireSnapshots = - newExpireSnapshots() - .retainMax(options.snapshotNumRetainMax()) - .retainMin(options.snapshotNumRetainMin()) - .maxDeletes(options.snapshotExpireLimit()); - long snapshotTimeRetain = options.snapshotTimeRetain().toMillis(); - long changelogTimeRetain = options.changelogTimeRetain().toMillis(); + ExpireConfig expireConfig = options.expireConfig(); + ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig); + ExpireSnapshots expireSnapshots = newExpireSnapshots().config(expireConfig); snapshotExpire = () -> { - long current = System.currentTimeMillis(); - expireSnapshots.olderThanMills(current - snapshotTimeRetain).expire(); + expireSnapshots.expire(); if (changelogDecoupled) { - expireChangelog.olderThanMills(current - changelogTimeRetain).expire(); + expireChangelog.expire(); } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index b237fe63069e..33891997cc92 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -19,8 +19,11 @@ package org.apache.paimon.table; import org.apache.paimon.Changelog; +import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.ConsumerManager; -import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.ChangelogDeletion; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -30,7 +33,9 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; /** Cleanup the changelog in changelog directory. */ public class ExpireChangelogImpl implements ExpireSnapshots { @@ -39,54 +44,39 @@ public class ExpireChangelogImpl implements ExpireSnapshots { private final SnapshotManager snapshotManager; private final ConsumerManager consumerManager; - private final SnapshotDeletion snapshotDeletion; + private final ChangelogDeletion changelogDeletion; private final boolean cleanEmptyDirectories; private final TagManager tagManager; - private long olderThanMills = 0; - public int retainMin = 1; - private int retainMax = Integer.MAX_VALUE; - private int maxDeletes = Integer.MAX_VALUE; + private ExpireConfig expireConfig; public ExpireChangelogImpl( SnapshotManager snapshotManager, TagManager tagManager, - SnapshotDeletion snapshotDeletion, + ChangelogDeletion changelogDeletion, boolean cleanEmptyDirectories) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - this.snapshotDeletion = snapshotDeletion; this.cleanEmptyDirectories = cleanEmptyDirectories; + this.changelogDeletion = changelogDeletion; + this.expireConfig = ExpireConfig.builder().build(); } @Override - public ExpireChangelogImpl retainMax(int retainMax) { - this.retainMax = retainMax; - return this; - } - - @Override - public ExpireChangelogImpl retainMin(int retainMin) { - this.retainMin = retainMin; - return this; - } - - @Override - public ExpireChangelogImpl olderThanMills(long olderThanMills) { - this.olderThanMills = olderThanMills; - return this; - } - - @Override - public ExpireChangelogImpl maxDeletes(int maxDeletes) { - this.maxDeletes = maxDeletes; + public ExpireSnapshots config(ExpireConfig expireConfig) { + this.expireConfig = expireConfig; return this; } @Override public int expire() { + int retainMax = expireConfig.getChangelogRetainMax(); + int retainMin = expireConfig.getChangelogRetainMin(); + int maxDeletes = expireConfig.getChangelogMaxDeletes(); + long olderThanMills = + System.currentTimeMillis() - expireConfig.getChangelogTimeRetain().toMillis(); Long latestSnapshotId = snapshotManager.latestSnapshotId(); if (latestSnapshotId == null) { // no snapshot, nothing to expire @@ -144,23 +134,36 @@ public int expireUntil(long earliestId, long endExclusiveId) { LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); } + List taggedSnapshots = tagManager.taggedSnapshots(); + + List skippingSnapshots = + TagManager.findOverlappedSnapshots(taggedSnapshots, earliestId, endExclusiveId); + skippingSnapshots.add(snapshotManager.changelog(endExclusiveId)); + Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete changelog files from snapshot #" + id); + LOG.debug("Ready to delete changelog files from changelog #" + id); } Changelog changelog = snapshotManager.longLivedChangelog(id); - // delete changelog files - if (changelog.changelogManifestList() != null) { - snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList()); - snapshotDeletion.cleanUnusedManifestList( - changelog.changelogManifestList(), new HashSet<>()); + Predicate skipper; + try { + skipper = changelogDeletion.dataFileSkipper(taggedSnapshots, id); + } catch (Exception e) { + LOG.info( + String.format( + "Skip cleaning data files of changelog '%s' due to failed to build skipping set.", + id), + e); + continue; } + changelogDeletion.cleanUnusedDataFiles(changelog, skipper); + changelogDeletion.cleanUnusedManifests(changelog, manifestSkippSet); snapshotManager.fileIO().deleteQuietly(snapshotManager.longLivedChangelogPath(id)); } if (cleanEmptyDirectories) { - snapshotDeletion.cleanDataDirectories(); + changelogDeletion.cleanDataDirectories(); } writeEarliestHintFile(endExclusiveId); return (int) (endExclusiveId - earliestId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java index a4459d75fd30..2350f806683e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java @@ -18,16 +18,12 @@ package org.apache.paimon.table; +import org.apache.paimon.options.ExpireConfig; + /** Expire snapshots. */ public interface ExpireSnapshots { - ExpireSnapshots retainMax(int retainMax); - - ExpireSnapshots retainMin(int retainMin); - - ExpireSnapshots olderThanMills(long olderThanMills); - - ExpireSnapshots maxDeletes(int maxDeletes); + ExpireSnapshots config(ExpireConfig expireConfig); /** @return How many snapshots have been expired. */ int expire(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 50cf39eb8971..f4361481c0c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -24,6 +24,7 @@ import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -47,55 +48,38 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { private final SnapshotDeletion snapshotDeletion; private final TagManager tagManager; private final boolean cleanEmptyDirectories; - /** Whether to clean the changelog or delta files. */ - private final boolean changelogDecoupled; - private int retainMax = Integer.MAX_VALUE; - private int retainMin = 1; - private long olderThanMills = 0; - private int maxDeletes = Integer.MAX_VALUE; + private ExpireConfig expireConfig; public ExpireSnapshotsImpl( SnapshotManager snapshotManager, SnapshotDeletion snapshotDeletion, TagManager tagManager, - boolean cleanEmptyDirectories, - boolean changelogDecoupled) { + boolean cleanEmptyDirectories) { this.snapshotManager = snapshotManager; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); this.snapshotDeletion = snapshotDeletion; this.tagManager = tagManager; this.cleanEmptyDirectories = cleanEmptyDirectories; - this.changelogDecoupled = changelogDecoupled; + this.expireConfig = ExpireConfig.builder().build(); } @Override - public ExpireSnapshotsImpl retainMax(int retainMax) { - this.retainMax = retainMax; - return this; - } - - @Override - public ExpireSnapshotsImpl retainMin(int retainMin) { - this.retainMin = retainMin; - return this; - } - - @Override - public ExpireSnapshotsImpl olderThanMills(long olderThanMills) { - this.olderThanMills = olderThanMills; - return this; - } - - @Override - public ExpireSnapshotsImpl maxDeletes(int maxDeletes) { - this.maxDeletes = maxDeletes; + public ExpireSnapshots config(ExpireConfig expireConfig) { + this.expireConfig = expireConfig; return this; } @Override public int expire() { + snapshotDeletion.setChangelogDecoupled(expireConfig.isChangelogDecoupled()); + int retainMax = expireConfig.getSnapshotRetainMax(); + int retainMin = expireConfig.getSnapshotRetainMin(); + int maxDeletes = expireConfig.getSnapshotMaxDeletes(); + long olderThanMills = + System.currentTimeMillis() - expireConfig.getSnapshotTimeRetain().toMillis(); + Long latestSnapshotId = snapshotManager.latestSnapshotId(); if (latestSnapshotId == null) { // no snapshot, nothing to expire @@ -196,7 +180,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { } // delete changelog files - if (!changelogDecoupled) { + if (!expireConfig.isChangelogDecoupled()) { for (long id = beginInclusiveId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { LOG.debug("Ready to delete changelog files from snapshot #" + id); @@ -226,8 +210,8 @@ public int expireUntil(long earliestId, long endExclusiveId) { } Snapshot snapshot = snapshotManager.snapshot(id); - snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled); - if (changelogDecoupled) { + snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet); + if (expireConfig.isChangelogDecoupled()) { commitChangelog(new Changelog(snapshot)); } snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id)); diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index e2bb1593615f..91b08635fc96 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -39,6 +39,7 @@ import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReaderIterator; @@ -65,6 +66,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -151,53 +153,48 @@ public FileStoreCommitImpl newCommit() { return super.newCommit(commitUser); } + public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long millisRetained) { + return newExpire(numRetainedMin, numRetainedMax, millisRetained, true); + } + public ExpireSnapshots newExpire( int numRetainedMin, int numRetainedMax, long millisRetained, boolean snapshotExpireCleanEmptyDirectories) { - return newExpire( - numRetainedMin, - numRetainedMax, - millisRetained, - snapshotExpireCleanEmptyDirectories, - false); - } - - public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long millisRetained) { - return newExpire(numRetainedMin, numRetainedMax, millisRetained, true, false); + return new ExpireSnapshotsImpl( + snapshotManager(), + newSnapshotDeletion(), + new TagManager(fileIO, options.path()), + snapshotExpireCleanEmptyDirectories) + .config( + ExpireConfig.builder() + .snapshotRetainMax(numRetainedMax) + .snapshotRetainMin(numRetainedMin) + .snapshotTimeRetain(Duration.ofMillis(millisRetained)) + .build()); } public ExpireSnapshots newExpire( - int numRetainedMin, - int numRetainedMax, - long millisRetained, - boolean snapshotExpireCleanEmptyDirectories, - boolean changelogDecoupled) { + ExpireConfig expireConfig, boolean snapshotExpireCleanEmptyDirectories) { return new ExpireSnapshotsImpl( snapshotManager(), newSnapshotDeletion(), new TagManager(fileIO, options.path()), - snapshotExpireCleanEmptyDirectories, - changelogDecoupled) - .retainMax(numRetainedMax) - .retainMin(numRetainedMin) - .olderThanMills(System.currentTimeMillis() - millisRetained); + snapshotExpireCleanEmptyDirectories) + .config(expireConfig); } public ExpireSnapshots newChangelogExpire( - int numRetainedMin, - int numRetainedMax, - long millisRetained, - boolean snapshotExpireCleanEmptyDirectories) { - return new ExpireChangelogImpl( + ExpireConfig config, boolean snapshotExpireCleanEmptyDirectories) { + ExpireChangelogImpl impl = + new ExpireChangelogImpl( snapshotManager(), new TagManager(fileIO, options.path()), - newSnapshotDeletion(), - snapshotExpireCleanEmptyDirectories) - .retainMin(numRetainedMin) - .retainMax(numRetainedMax) - .olderThanMills(System.currentTimeMillis() - millisRetained); + newChangelogDeletion(), + snapshotExpireCleanEmptyDirectories); + impl.config(config); + return impl; } public List commitData( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 8d56a516904f..7c725164ef68 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.ExpireSnapshots; @@ -48,6 +49,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -330,7 +332,11 @@ public void testExpireWithNumber() throws Exception { @Test public void testExpireWithTime() throws Exception { - ExpireSnapshots expire = store.newExpire(1, Integer.MAX_VALUE, 1000); + ExpireConfig.Builder builder = ExpireConfig.builder(); + builder.snapshotRetainMin(1) + .snapshotRetainMax(Integer.MAX_VALUE) + .snapshotTimeRetain(Duration.ofMillis(1000)); + ExpireSnapshots expire = store.newExpire(builder.build(), true); List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); @@ -339,8 +345,9 @@ public void testExpireWithTime() throws Exception { commit(5, allData, snapshotPositions); long expireMillis = System.currentTimeMillis(); // expire twice to check for idempotence - expire.olderThanMills(expireMillis - 1000).expire(); - expire.olderThanMills(expireMillis - 1000).expire(); + + expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire(); + expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire(); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); for (int i = 1; i <= latestSnapshotId; i++) { @@ -402,8 +409,16 @@ public void testChangelogOutLivedSnapshot() throws Exception { List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); commit(10, allData, snapshotPositions); - ExpireSnapshots snapshot = store.newExpire(1, 2, Long.MAX_VALUE, true, true); - ExpireSnapshots changelog = store.newChangelogExpire(1, 3, Long.MAX_VALUE, true); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(2) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .changelogRetainMax(3) + .build(); + + ExpireSnapshots snapshot = store.newExpire(config, true); + ExpireSnapshots changelog = store.newChangelogExpire(config, true); // expire twice to check for idempotence snapshot.expire(); snapshot.expire(); diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java index 32a0fcc5b9a2..9e0fb79116f1 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.options.ExpireConfig; import org.apache.flink.table.procedure.ProcedureContext; @@ -32,8 +33,13 @@ public String identifier() { public String[] call(ProcedureContext procedureContext, String tableId, int retainMax) throws Catalog.TableNotExistException { + ExpireConfig.Builder builder = ExpireConfig.builder(); return new String[] { - table(tableId).newExpireSnapshots().retainMax(retainMax).expire() + "" + table(tableId) + .newExpireSnapshots() + .config(builder.snapshotRetainMax(retainMax).build()) + .expire() + + "" }; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java index 81ea223e4ec1..986511017181 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.utils.DateTimeUtils; @@ -27,6 +28,8 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import java.time.Duration; + /** A procedure to expire snapshots. */ public class ExpireSnapshotsProcedure extends ProcedureBase { @@ -64,20 +67,24 @@ public String[] call( Integer maxDeletes) throws Catalog.TableNotExistException { ExpireSnapshots expireSnapshots = table(tableId).newExpireSnapshots(); + ExpireConfig.Builder builder = ExpireConfig.builder(); if (retainMax != null) { - expireSnapshots.retainMax(retainMax); + builder.snapshotRetainMax(retainMax); } if (retainMin != null) { - expireSnapshots.retainMin(retainMin); + builder.snapshotRetainMin(retainMin); } if (olderThanStr != null) { - expireSnapshots.olderThanMills( - DateTimeUtils.parseTimestampData(olderThanStr, 3, DateTimeUtils.LOCAL_TZ) - .getMillisecond()); + builder.snapshotTimeRetain( + Duration.ofMillis( + System.currentTimeMillis() + - DateTimeUtils.parseTimestampData( + olderThanStr, 3, DateTimeUtils.LOCAL_TZ) + .getMillisecond())); } if (maxDeletes != null) { - expireSnapshots.maxDeletes(maxDeletes); + builder.snapshotMaxDeletes(maxDeletes); } - return new String[] {expireSnapshots.expire() + ""}; + return new String[] {expireSnapshots.config(builder.build()).expire() + ""}; } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java index 98fc6548a723..4716f6add5c5 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.table.ExpireSnapshots; import org.apache.spark.sql.catalyst.InternalRow; @@ -27,6 +28,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.time.Duration; + import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; import static org.apache.spark.sql.types.DataTypes.TimestampType; @@ -75,19 +78,21 @@ public InternalRow[] call(InternalRow args) { tableIdent, table -> { ExpireSnapshots expireSnapshots = table.newExpireSnapshots(); + ExpireConfig.Builder builder = ExpireConfig.builder(); if (retainMax != null) { - expireSnapshots.retainMax(retainMax); + builder.snapshotRetainMax(retainMax); } if (retainMin != null) { - expireSnapshots.retainMin(retainMin); + builder.snapshotRetainMin(retainMin); } if (olderThanMills != null) { - expireSnapshots.olderThanMills(olderThanMills); + builder.snapshotTimeRetain( + Duration.ofMillis(System.currentTimeMillis() - olderThanMills)); } if (maxDeletes != null) { - expireSnapshots.maxDeletes(maxDeletes); + builder.snapshotMaxDeletes(maxDeletes); } - int deleted = expireSnapshots.expire(); + int deleted = expireSnapshots.config(builder.build()).expire(); return new InternalRow[] {newInternalRow(deleted)}; }); }