Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Apr 18, 2024
1 parent 2ae1028 commit e710b5c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.operation;

import org.apache.paimon.Changelog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
Expand Down Expand Up @@ -96,13 +95,9 @@ public class OrphanFilesClean {
private int deletedFilesNum = 0;
private final List<Path> deleteFiles;
private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
private final boolean produceChangelog;

public OrphanFilesClean(FileStoreTable table) {
this.snapshotManager = table.snapshotManager();
this.produceChangelog =
new CoreOptions(table.options()).changelogProducer()
!= CoreOptions.ChangelogProducer.NONE;
this.tagManager = table.tagManager();
this.fileIO = table.fileIO();
this.location = table.location();
Expand Down Expand Up @@ -220,6 +215,7 @@ private List<String> getUsedFilesForChangelog(Changelog changelog) {
List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
try {
// try to read manifests
// changelog manifest
List<ManifestFileMeta> changelogManifest = new ArrayList<>();
if (changelog.changelogManifestList() != null) {
files.add(changelog.changelogManifestList());
Expand All @@ -233,10 +229,20 @@ private List<String> getUsedFilesForChangelog(Changelog changelog) {
}
}

// base manifest
if (manifestList.exists(changelog.baseManifestList())) {
files.add(changelog.baseManifestList());
List<ManifestFileMeta> baseManifest =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.baseManifestList()));
if (baseManifest != null) {
manifestFileMetas.addAll(baseManifest);
}
}

// delta manifest
List<ManifestFileMeta> deltaManifest = null;
if (manifestList.exists(changelog.deltaManifestList())) {
files.add(changelog.deltaManifestList());
Expand All @@ -245,26 +251,11 @@ private List<String> getUsedFilesForChangelog(Changelog changelog) {
() ->
manifestList.readWithIOException(
changelog.deltaManifestList()));
// delta
if (deltaManifest != null) {
manifestFileMetas.addAll(deltaManifest);
}
}

List<ManifestFileMeta> baseManifest;
if (manifestList.exists(changelog.changelogManifestList())) {
files.add(changelog.changelogManifestList());
baseManifest =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.changelogManifestList()));
// delta
if (baseManifest != null) {
manifestFileMetas.addAll(baseManifest);
}
}

files.addAll(
manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
Expand All @@ -279,7 +270,7 @@ private List<String> getUsedFilesForChangelog(Changelog changelog) {
: changelogManifest.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList()));
} else if (changelog.commitKind() == Snapshot.CommitKind.APPEND) {
} else {
manifestFileName.addAll(
deltaManifest == null
? new ArrayList<>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,9 @@ private static Set<Path> getSnapshotFileInUse(
entry.file().fileName()));
}

// Add 'DELETE' 'APPEND' file
// These 'delete' files can be merged by the plan#splits, so it's not showed in the entries
// above.
// Add 'DELETE' 'APPEND' file in snapshot
// These 'delete' files can be merged by the plan#splits,
// so it's not shown in the entries above.
// In other words, these files are not used (by snapshot or changelog) now,
// but it can only be cleaned after this snapshot expired, so we should add it to the file
// use list.
Expand Down

0 comments on commit e710b5c

Please sign in to comment.