Skip to content

Commit

Permalink
[core] Introduce ChangelogDeletion to Expiration (apache#3383)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored May 27, 2024
1 parent cc7d308 commit f1154f5
Show file tree
Hide file tree
Showing 18 changed files with 596 additions and 275 deletions.
14 changes: 14 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
Expand Down Expand Up @@ -1371,6 +1372,19 @@ public boolean snapshotExpireCleanEmptyDirectories() {
return options.get(SNAPSHOT_EXPIRE_CLEAN_EMPTY_DIRECTORIES);
}

public ExpireConfig expireConfig() {
return ExpireConfig.builder()
.snapshotRetainMax(snapshotNumRetainMax())
.snapshotRetainMin(snapshotNumRetainMin())
.snapshotTimeRetain(snapshotTimeRetain())
.snapshotMaxDeletes(snapshotExpireLimit())
.changelogRetainMax(options.getOptional(CHANGELOG_NUM_RETAINED_MAX).orElse(null))
.changelogRetainMin(options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(null))
.changelogTimeRetain(options.getOptional(CHANGELOG_TIME_RETAINED).orElse(null))
.changelogMaxDeletes(snapshotExpireLimit())
.build();
}

public int manifestMergeMinCount() {
return options.get(MANIFEST_MERGE_MIN_COUNT);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.options;

import java.time.Duration;

/** Config of snapshot and changelog expiration. */
public class ExpireConfig {
private final int snapshotRetainMax;
private final int snapshotRetainMin;
private final Duration snapshotTimeRetain;
private final int snapshotMaxDeletes;
private final int changelogRetainMax;
private final int changelogRetainMin;
private final Duration changelogTimeRetain;
private final int changelogMaxDeletes;
private final boolean changelogDecoupled;

public ExpireConfig(
int snapshotRetainMax,
int snapshotRetainMin,
Duration snapshotTimeRetain,
int snapshotMaxDeletes,
int changelogRetainMax,
int changelogRetainMin,
Duration changelogTimeRetain,
int changelogMaxDeletes) {
this.snapshotRetainMax = snapshotRetainMax;
this.snapshotRetainMin = snapshotRetainMin;
this.snapshotTimeRetain = snapshotTimeRetain;
this.snapshotMaxDeletes = snapshotMaxDeletes;
this.changelogRetainMax = changelogRetainMax;
this.changelogRetainMin = changelogRetainMin;
this.changelogTimeRetain = changelogTimeRetain;
this.changelogMaxDeletes = changelogMaxDeletes;
this.changelogDecoupled =
changelogRetainMax > snapshotRetainMax
|| changelogRetainMin > snapshotRetainMin
|| changelogTimeRetain.compareTo(snapshotTimeRetain) > 0;
}

public int getSnapshotRetainMax() {
return snapshotRetainMax;
}

public int getSnapshotRetainMin() {
return snapshotRetainMin;
}

public Duration getSnapshotTimeRetain() {
return snapshotTimeRetain;
}

public int getChangelogRetainMax() {
return changelogRetainMax;
}

public int getChangelogRetainMin() {
return changelogRetainMin;
}

public Duration getChangelogTimeRetain() {
return changelogTimeRetain;
}

public int getSnapshotMaxDeletes() {
return snapshotMaxDeletes;
}

public int getChangelogMaxDeletes() {
return changelogMaxDeletes;
}

public boolean isChangelogDecoupled() {
return changelogDecoupled;
}

public static Builder builder() {
return new Builder();
}

/** The builder for {@link ExpireConfig}. */
public static final class Builder {
private int snapshotRetainMax = Integer.MAX_VALUE;
private int snapshotRetainMin = 1;
private Duration snapshotTimeRetain = Duration.ofMillis(Long.MAX_VALUE);
private int snapshotMaxDeletes = Integer.MAX_VALUE;
// If changelog config is not set, use the snapshot's by default
private Integer changelogRetainMax = null;
private Integer changelogRetainMin = null;
private Duration changelogTimeRetain = null;
private Integer changelogMaxDeletes = null;

public static Builder builder() {
return new Builder();
}

public Builder snapshotRetainMax(int snapshotRetainMax) {
this.snapshotRetainMax = snapshotRetainMax;
return this;
}

public Builder snapshotRetainMin(int snapshotRetainMin) {
this.snapshotRetainMin = snapshotRetainMin;
return this;
}

public Builder snapshotTimeRetain(Duration snapshotTimeRetain) {
this.snapshotTimeRetain = snapshotTimeRetain;
return this;
}

public Builder snapshotMaxDeletes(int snapshotMaxDeletes) {
this.snapshotMaxDeletes = snapshotMaxDeletes;
return this;
}

public Builder changelogRetainMax(Integer changelogRetainMax) {
this.changelogRetainMax = changelogRetainMax;
return this;
}

public Builder changelogRetainMin(Integer changelogRetainMin) {
this.changelogRetainMin = changelogRetainMin;
return this;
}

public Builder changelogTimeRetain(Duration changelogTimeRetain) {
this.changelogTimeRetain = changelogTimeRetain;
return this;
}

public Builder changelogMaxDeletes(Integer changelogMaxDeletes) {
this.changelogMaxDeletes = changelogMaxDeletes;
return this;
}

public ExpireConfig build() {
return new ExpireConfig(
snapshotRetainMax,
snapshotRetainMin,
snapshotTimeRetain,
snapshotMaxDeletes,
changelogRetainMax == null ? snapshotRetainMax : changelogRetainMax,
changelogRetainMin == null ? snapshotRetainMin : changelogRetainMin,
changelogTimeRetain == null ? snapshotTimeRetain : changelogTimeRetain,
changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes);
}
}
}
12 changes: 12 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
Expand Down Expand Up @@ -206,6 +207,17 @@ public SnapshotDeletion newSnapshotDeletion() {
newStatsFileHandler());
}

@Override
public ChangelogDeletion newChangelogDeletion() {
return new ChangelogDeletion(
fileIO,
pathFactory(),
manifestFileFactory().create(),
manifestListFactory().create(),
newIndexFileHandler(),
newStatsFileHandler());
}

@Override
public TagManager newTagManager() {
return new TagManager(fileIO, options.path());
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
Expand Down Expand Up @@ -81,6 +82,8 @@ public interface FileStore<T> extends Serializable {

SnapshotDeletion newSnapshotDeletion();

ChangelogDeletion newChangelogDeletion();

TagManager newTagManager();

TagDeletion newTagDeletion();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation;

import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.FileStorePathFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

/** Delete changelog files. */
public class ChangelogDeletion extends FileDeletionBase<Changelog> {
public ChangelogDeletion(
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestFile manifestFile,
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler) {
super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler);
}

@Override
public void cleanUnusedDataFiles(Changelog changelog, Predicate<ManifestEntry> skipper) {
if (changelog.changelogManifestList() != null) {
deleteAddedDataFiles(changelog.changelogManifestList());
}

if (manifestList.exists(changelog.deltaManifestList())) {
cleanUnusedDataFiles(changelog.deltaManifestList(), skipper);
}
}

@Override
public void cleanUnusedManifests(Changelog changelog, Set<String> skippingSet) {
if (changelog.changelogManifestList() != null) {
cleanUnusedManifestList(changelog.changelogManifestList(), skippingSet);
}

if (manifestList.exists(changelog.deltaManifestList())) {
cleanUnusedManifestList(changelog.deltaManifestList(), skippingSet);
}

if (manifestList.exists(changelog.baseManifestList())) {
cleanUnusedManifestList(changelog.baseManifestList(), skippingSet);
}

// the index and statics manifest list should handle by snapshot deletion.
}

public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
Set<String> skippingSet = new HashSet<>();

for (Snapshot skippingSnapshot : skippingSnapshots) {
// base manifests
if (manifestList.exists(skippingSnapshot.baseManifestList())) {
skippingSet.add(skippingSnapshot.baseManifestList());
manifestList.read(skippingSnapshot.baseManifestList()).stream()
.map(ManifestFileMeta::fileName)
.forEach(skippingSet::add);
}

// delta manifests
if (manifestList.exists(skippingSnapshot.deltaManifestList())) {
skippingSet.add(skippingSnapshot.deltaManifestList());
manifestList.read(skippingSnapshot.deltaManifestList()).stream()
.map(ManifestFileMeta::fileName)
.forEach(skippingSet::add);
}

// index manifests
String indexManifest = skippingSnapshot.indexManifest();
if (indexManifest != null) {
skippingSet.add(indexManifest);
indexFileHandler.readManifest(indexManifest).stream()
.map(IndexManifestEntry::indexFile)
.map(IndexFileMeta::fileName)
.forEach(skippingSet::add);
}

// statistics
if (skippingSnapshot.statistics() != null) {
skippingSet.add(skippingSnapshot.statistics());
}
}

return skippingSet;
}
}
Loading

0 comments on commit f1154f5

Please sign in to comment.