diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index f0d49964cd90..dd57f269c22c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -19,7 +19,6 @@ package org.apache.paimon.operation; import org.apache.paimon.Changelog; -import org.apache.paimon.FileStore; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; @@ -33,8 +32,10 @@ import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -102,30 +103,21 @@ public class OrphanFilesClean { private static final int READ_FILE_RETRY_INTERVAL = 5; private static final int SHOW_LIMIT = 200; - private final SnapshotManager snapshotManager; - private final TagManager tagManager; + private final FileStoreTable table; private final FileIO fileIO; private final Path location; private final int partitionKeysNum; - private final ManifestList manifestList; - private final ManifestFile manifestFile; - private final IndexFileHandler indexFileHandler; private final List deleteFiles; private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); private Consumer fileCleaner; public OrphanFilesClean(FileStoreTable table) { - this.snapshotManager = table.snapshotManager(); - this.tagManager = table.tagManager(); + this.table = table; this.fileIO = table.fileIO(); this.location = table.location(); this.partitionKeysNum = table.partitionKeys().size(); - FileStore store = table.store(); - this.manifestList = store.manifestListFactory().create(); - this.manifestFile = store.manifestFileFactory().create(); - this.indexFileHandler = store.newIndexFileHandler(); this.deleteFiles = new ArrayList<>(); this.fileCleaner = path -> { @@ -154,23 +146,42 @@ public OrphanFilesClean fileCleaner(Consumer fileCleaner) { } public List clean() throws IOException, ExecutionException, InterruptedException { - if (snapshotManager.earliestSnapshotId() == null) { - LOG.info("No snapshot found, skip removing."); + List branches = table.branchManager().branches(); + branches.add(BranchManager.DEFAULT_MAIN_BRANCH); + + List abnormalBranches = new ArrayList<>(); + for (String branch : branches) { + if (!new SchemaManager(table.fileIO(), table.location(), branch).latest().isPresent()) { + abnormalBranches.add(branch); + } + } + if (!abnormalBranches.isEmpty()) { + LOG.warn( + "Branches {} have no schemas. Orphan files cleaning aborted. " + + "Please check these branches manually.", + abnormalBranches); return Collections.emptyList(); } - // specially handle the snapshot directory - List nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough); - nonSnapshotFiles.forEach(fileCleaner); - deleteFiles.addAll(nonSnapshotFiles); + Map candidates = getCandidateDeletingFiles(); + Set usedFiles = new HashSet<>(); - // specially handle the changelog directory - List nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough); - nonChangelogFiles.forEach(fileCleaner); - deleteFiles.addAll(nonChangelogFiles); + for (String branch : branches) { + FileStoreTable branchTable = table.switchToBranch(branch); + SnapshotManager snapshotManager = branchTable.snapshotManager(); - Map candidates = getCandidateDeletingFiles(); - Set usedFiles = getUsedFiles(); + // specially handle the snapshot directory + List nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough); + nonSnapshotFiles.forEach(fileCleaner); + deleteFiles.addAll(nonSnapshotFiles); + + // specially handle the changelog directory + List nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough); + nonChangelogFiles.forEach(fileCleaner); + deleteFiles.addAll(nonChangelogFiles); + + usedFiles.addAll(getUsedFiles(branchTable)); + } Set deleted = new HashSet<>(candidates.keySet()); deleted.removeAll(usedFiles); @@ -181,21 +192,33 @@ public List clean() throws IOException, ExecutionException, InterruptedExc } /** Get all the files used by snapshots and tags. */ - private Set getUsedFiles() - throws IOException, ExecutionException, InterruptedException { + private Set getUsedFiles(FileStoreTable branchTable) throws IOException { + SnapshotManager snapshotManager = branchTable.snapshotManager(); + TagManager tagManager = branchTable.tagManager(); + // safely get all snapshots to be read Set readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots()); List taggedSnapshots = tagManager.taggedSnapshots(); readSnapshots.addAll(taggedSnapshots); readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs()); - return Sets.newHashSet(randomlyExecute(EXECUTOR, this::getUsedFiles, readSnapshots)); + + return Sets.newHashSet( + randomlyExecute( + EXECUTOR, snapshot -> getUsedFiles(branchTable, snapshot), readSnapshots)); } - private List getUsedFiles(Snapshot snapshot) { + private List getUsedFiles(FileStoreTable branchTable, Snapshot snapshot) { + ManifestList manifestList = branchTable.store().manifestListFactory().create(); + ManifestFile manifestFile = branchTable.store().manifestFileFactory().create(); + if (snapshot instanceof Changelog) { - return getUsedFilesForChangelog((Changelog) snapshot); + return getUsedFilesForChangelog(manifestList, manifestFile, (Changelog) snapshot); } else { - return getUsedFilesForSnapshot(snapshot); + return getUsedFilesForSnapshot( + manifestList, + manifestFile, + branchTable.store().newIndexFileHandler(), + snapshot); } } @@ -220,7 +243,8 @@ private Map getCandidateDeletingFiles() { return result; } - private List getUsedFilesForChangelog(Changelog changelog) { + private List getUsedFilesForChangelog( + ManifestList manifestList, ManifestFile manifestFile, Changelog changelog) { List files = new ArrayList<>(); List manifestFileMetas = new ArrayList<>(); try { @@ -290,7 +314,7 @@ private List getUsedFilesForChangelog(Changelog changelog) { } // try to read data files - List dataFiles = retryReadingDataFiles(manifestFileName); + List dataFiles = retryReadingDataFiles(manifestFile, manifestFileName); if (dataFiles == null) { return Collections.emptyList(); } @@ -306,14 +330,19 @@ private List getUsedFilesForChangelog(Changelog changelog) { * If getting null when reading some files, the snapshot/tag is being deleted, so just return an * empty result. */ - private List getUsedFilesForSnapshot(Snapshot snapshot) { + private List getUsedFilesForSnapshot( + ManifestList manifestList, + ManifestFile manifestFile, + IndexFileHandler indexFileHandler, + Snapshot snapshot) { List files = new ArrayList<>(); addManifestList(files, snapshot); try { // try to read manifests List manifestFileMetas = - retryReadingFiles(() -> readAllManifestsWithIOException(snapshot)); + retryReadingFiles( + () -> readAllManifestsWithIOException(manifestList, snapshot)); if (manifestFileMetas == null) { return Collections.emptyList(); } @@ -324,7 +353,7 @@ private List getUsedFilesForSnapshot(Snapshot snapshot) { files.addAll(manifestFileName); // try to read data files - List dataFiles = retryReadingDataFiles(manifestFileName); + List dataFiles = retryReadingDataFiles(manifestFile, manifestFileName); if (dataFiles == null) { return Collections.emptyList(); } @@ -396,8 +425,8 @@ private T retryReadingFiles(ReaderWithIOException reader) throws IOExcept throw caught; } - private List readAllManifestsWithIOException(Snapshot snapshot) - throws IOException { + private List readAllManifestsWithIOException( + ManifestList manifestList, Snapshot snapshot) throws IOException { List result = new ArrayList<>(); result.addAll(manifestList.readWithIOException(snapshot.baseManifestList())); @@ -412,7 +441,8 @@ private List readAllManifestsWithIOException(Snapshot snapshot } @Nullable - private List retryReadingDataFiles(List manifestNames) throws IOException { + private List retryReadingDataFiles( + ManifestFile manifestFile, List manifestNames) throws IOException { List dataFiles = new ArrayList<>(); for (String manifestName : manifestNames) { List manifestEntries = diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index f412d4e5e06f..6842f014245c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -235,6 +235,12 @@ public LocalTableQuery newLocalTableQuery() { return wrapped.newLocalTableQuery(); } + @Override + public FileStoreTable switchToBranch(String branchName) { + return new PrivilegedFileStoreTable( + wrapped.switchToBranch(branchName), privilegeChecker, identifier); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 95b9d8dcf03b..a3b1735dd3c3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -58,6 +58,7 @@ import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; import org.apache.paimon.tag.TagPreview; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.SnapshotNotExistException; @@ -270,15 +271,7 @@ private FileStoreTable copyInternal(Map dynamicOptions, boolean CoreOptions.setDefaultValues(newOptions); // copy a new table schema to contain dynamic options - TableSchema newTableSchema = tableSchema; - if (newOptions.contains(CoreOptions.BRANCH)) { - newTableSchema = - schemaManager() - .copyWithBranch(new CoreOptions(newOptions).branch()) - .latest() - .get(); - } - newTableSchema = newTableSchema.copy(newOptions.toMap()); + TableSchema newTableSchema = tableSchema.copy(newOptions.toMap()); if (tryTimeTravel) { // see if merged options contain time travel option @@ -624,6 +617,21 @@ public BranchManager branchManager() { return new BranchManager(fileIO, path, snapshotManager(), tagManager(), schemaManager()); } + @Override + public FileStoreTable switchToBranch(String branchName) { + Optional optionalSchema = + new SchemaManager(fileIO(), location(), branchName).latest(); + Preconditions.checkArgument( + optionalSchema.isPresent(), "Branch " + branchName + " does not exist"); + + TableSchema branchSchema = optionalSchema.get(); + Options branchOptions = new Options(branchSchema.options()); + branchOptions.set(CoreOptions.BRANCH, branchName); + branchSchema = branchSchema.copy(branchOptions.toMap()); + return FileStoreTableFactory.create( + fileIO(), location(), branchSchema, new Options(), catalogEnvironment()); + } + private RollbackHelper rollbackHelper() { return new RollbackHelper( snapshotManager(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java index 3c56b4b3b3cb..e330db0e04a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java @@ -43,6 +43,12 @@ public interface DataTable extends InnerTable { BranchManager branchManager(); + /** + * Get {@link DataTable} with branch identified by {@code branchName}. Note that this method + * does not keep dynamic options in current table. + */ + DataTable switchToBranch(String branchName); + Path location(); FileIO fileIO(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 0cd991b7f286..f7238f033a50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -147,9 +147,20 @@ public FileStoreTable copyWithLatestSchema() { wrapped.copyWithLatestSchema(), fallback.copyWithLatestSchema()); } + @Override + public FileStoreTable switchToBranch(String branchName) { + return new FallbackReadFileStoreTable(wrapped.switchToBranch(branchName), fallback); + } + private Map rewriteFallbackOptions(Map options) { Map result = new HashMap<>(options); + // branch of fallback table should never change + String branchKey = CoreOptions.BRANCH.key(); + if (options.containsKey(branchKey)) { + result.put(branchKey, fallback.options().get(branchKey)); + } + // snapshot ids may be different between the main branch and the fallback branch, // so we need to convert main branch snapshot id to millisecond, // then convert millisecond to fallback branch snapshot id diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index ed1ba1da5819..3bd337294b4f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -111,4 +111,11 @@ default SimpleStats getSchemaFieldStats(DataFileMeta dataFileMeta) { boolean supportStreamingReadOverwrite(); RowKeyExtractor createRowKeyExtractor(); + + /** + * Get {@link DataTable} with branch identified by {@code branchName}. Note that this method + * does not keep dynamic options in current table. + */ + @Override + FileStoreTable switchToBranch(String branchName); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 58449c9d7656..d0753259f412 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -91,7 +91,7 @@ public static FileStoreTable create( Options options = new Options(table.options()); String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH); if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) { - Options branchOptions = new Options(); + Options branchOptions = new Options(dynamicOptions.toMap()); branchOptions.set(CoreOptions.BRANCH, fallbackBranch); FileStoreTable fallbackTable = createWithoutFallbackBranch( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 7192c36303db..5596493318da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -174,6 +174,11 @@ public BranchManager branchManager() { return wrapped.branchManager(); } + @Override + public DataTable switchToBranch(String branchName) { + return new AuditLogTable(wrapped.switchToBranch(branchName)); + } + @Override public InnerTableRead newRead() { return new AuditLogRead(wrapped.newRead()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index 7f1e8fb0cb92..7b67f00bf21a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -129,6 +129,11 @@ public BranchManager branchManager() { return wrapped.branchManager(); } + @Override + public DataTable switchToBranch(String branchName) { + return new BucketsTable(wrapped.switchToBranch(branchName), isContinuous, databaseName); + } + @Override public String name() { return "__internal_buckets_" + wrapped.location().getName(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index 8eb97130e8b8..bd4efc80e42a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -116,6 +116,11 @@ public BranchManager branchManager() { return wrapped.branchManager(); } + @Override + public DataTable switchToBranch(String branchName) { + return new FileMonitorTable(wrapped.switchToBranch(branchName)); + } + @Override public String name() { return "__internal_file_monitor_" + wrapped.location().getName(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 6be556f0b0da..e7b3bd17195f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -145,6 +145,11 @@ public BranchManager branchManager() { return wrapped.branchManager(); } + @Override + public DataTable switchToBranch(String branchName) { + return new ReadOptimizedTable(wrapped.switchToBranch(branchName)); + } + @Override public InnerTableRead newRead() { return wrapped.newRead(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 0e905cd68fa3..5a439a8ec715 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -33,7 +33,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -70,16 +69,6 @@ public Path branchDirectory() { return new Path(tablePath + "/branch"); } - /** Return the root Directory of branch by given tablePath. */ - public static Path branchDirectory(Path tablePath) { - return new Path(tablePath + "/branch"); - } - - public static List branchNames(FileIO fileIO, Path tablePath) throws IOException { - return listOriginalVersionedFiles(fileIO, branchDirectory(tablePath), BRANCH_PREFIX) - .collect(Collectors.toList()); - } - public static boolean isMainBranch(String branch) { return branch.equals(DEFAULT_MAIN_BRANCH); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index c5fdb042e37f..9cce9233f09e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -50,7 +50,6 @@ import java.util.stream.LongStream; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; -import static org.apache.paimon.utils.BranchManager.branchNames; import static org.apache.paimon.utils.BranchManager.branchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; @@ -117,10 +116,6 @@ public Path snapshotDirectory() { return new Path(branchPath(tablePath, branch) + "/snapshot"); } - public static Path snapshotDirectory(Path tablePath, String branch) { - return new Path(branchPath(tablePath, branch) + "/snapshot"); - } - public Snapshot snapshot(long snapshotId) { Path snapshotPath = snapshotPath(snapshotId); return Snapshot.fromPath(fileIO, snapshotPath); @@ -405,25 +400,11 @@ public Iterator changelogs() throws IOException { * be deleted by other processes, so just skip this snapshot. */ public List safelyGetAllSnapshots() throws IOException { - // For main branch List paths = listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) .map(id -> snapshotPath(id)) .collect(Collectors.toList()); - // For other branch - List allBranchNames = branchNames(fileIO, tablePath); - for (String branchName : allBranchNames) { - List branchPaths = - listVersionedFiles( - fileIO, - snapshotDirectory(tablePath, branchName), - SNAPSHOT_PREFIX) - .map(this::snapshotPath) - .collect(Collectors.toList()); - paths.addAll(branchPaths); - } - List snapshots = new ArrayList<>(); for (Path path : paths) { Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path); @@ -457,23 +438,8 @@ public List safelyGetAllChangelogs() throws IOException { * Try to get non snapshot files. If any error occurred, just ignore it and return an empty * result. */ - public List tryGetNonSnapshotFiles(Predicate fileStatusFilter) - throws IOException { - // For main branch - List nonSnapshotFiles = - listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter()); - - // For other branch - List allBranchNames = branchNames(fileIO, tablePath); - allBranchNames.stream() - .map( - branchName -> - listPathWithFilter( - snapshotDirectory(tablePath, branchName), - fileStatusFilter, - nonSnapshotFileFilter())) - .forEach(nonSnapshotFiles::addAll); - return nonSnapshotFiles; + public List tryGetNonSnapshotFiles(Predicate fileStatusFilter) { + return listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter()); } public List tryGetNonChangelogFiles(Predicate fileStatusFilter) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java index ff67dbc27d04..7e912a839940 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -361,6 +361,9 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) SnapshotManager snapshotManager = table.snapshotManager(); writeData(snapshotManager, committedData, snapshotData, changelogData, commitTimes); + // create empty branch with same schema + table.createBranch("branch1"); + // generate non used files int shouldBeDeleted = generateUnUsedFile(); assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java index 9e7b6a7c3c0a..653e97cc6f50 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java @@ -18,10 +18,19 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -37,6 +46,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -191,4 +201,54 @@ public void testRemoveDatabaseOrphanFilesITCase() throws Exception { Row.of(orphanFile21.toUri().getPath()), Row.of(orphanFile22.toUri().getPath())); } + + @Test + public void testCleanWithBranch() throws Exception { + // create main branch + FileStoreTable table = createTableAndWriteData(tableName); + Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1); + Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2); + + // create first branch and write some data + table.createBranch("br"); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location(), "br"); + TableSchema branchSchema = + schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.INT())); + Options branchOptions = new Options(branchSchema.options()); + branchOptions.set(CoreOptions.BRANCH, "br"); + branchSchema = branchSchema.copy(branchOptions.toMap()); + FileStoreTable branchTable = + FileStoreTableFactory.create(table.fileIO(), table.location(), branchSchema); + + String commitUser = UUID.randomUUID().toString(); + StreamTableWrite write = branchTable.newWrite(commitUser); + StreamTableCommit commit = branchTable.newCommit(commitUser); + write.write(GenericRow.of(2L, BinaryString.fromString("Hello"), 20)); + commit.commit(1, write.prepareCommit(false, 1)); + write.close(); + commit.close(); + + // create orphan file in snapshot directory of first branch + Path orphanFile3 = new Path(table.location(), "branch/branch-br/snapshot/orphan_file3"); + branchTable.fileIO().writeFile(orphanFile3, "x", true); + + // create second branch, which is empty + table.createBranch("br2"); + + // create orphan file in snapshot directory of second branch + Path orphanFile4 = new Path(table.location(), "branch/branch-br2/snapshot/orphan_file4"); + branchTable.fileIO().writeFile(orphanFile4, "y", true); + + String procedure = + String.format( + "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", + database, "*"); + ImmutableList actualDeleteFile = ImmutableList.copyOf(callProcedure(procedure)); + assertThat(actualDeleteFile) + .containsExactlyInAnyOrder( + Row.of(orphanFile1.toUri().getPath()), + Row.of(orphanFile2.toUri().getPath()), + Row.of(orphanFile3.toUri().getPath()), + Row.of(orphanFile4.toUri().getPath())); + } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 36a11cdc7258..58289b179a10 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -41,7 +41,6 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -346,9 +345,7 @@ private boolean partitionExistsInOtherBranches( continue; } - FileStoreTable table = - FileStoreTableFactory.create( - mainTable.fileIO(), mainTable.location(), branchSchema.get()); + FileStoreTable table = mainTable.switchToBranch(branchName); if (!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty()) { return true; }