From 588d7f2c50ba07ab1b7887d1d978391ef6985754 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Sat, 14 Sep 2024 07:49:28 +0800 Subject: [PATCH] [flink] Introduce dedecate job for orphan files clean (#4181) --- .../org/apache/paimon/options/Options.java | 4 + .../paimon/utils/SerializableConsumer.java | 31 + .../paimon/utils/SerializablePredicate.java | 29 + .../operation/LocalOrphanFilesClean.java | 239 +++++++ .../paimon/operation/OrphanFilesClean.java | 636 +++++------------- .../table/FallbackReadFileStoreTable.java | 29 +- .../paimon/table/FileStoreTableFactory.java | 2 +- ...st.java => LocalOrphanFilesCleanTest.java} | 54 +- .../procedure/RemoveOrphanFilesProcedure.java | 48 +- .../flink/action/RemoveOrphanFilesAction.java | 39 +- .../RemoveOrphanFilesActionFactory.java | 19 +- .../flink/orphan/FlinkOrphanFilesClean.java | 352 ++++++++++ .../procedure/RemoveOrphanFilesProcedure.java | 58 +- .../flink/utils/BoundedOneInputOperator.java | 27 + .../flink/utils/BoundedTwoInputOperator.java | 28 + .../paimon/flink/action/ActionITCaseBase.java | 6 +- .../flink/action/BranchActionITCase.java | 30 +- .../flink/action/CloneActionITCase.java | 16 +- .../action/CompactDatabaseActionITCase.java | 34 +- .../flink/action/ConsumerActionITCase.java | 16 +- .../action/DropPartitionActionITCase.java | 4 +- .../action/MarkPartitionDoneActionITCase.java | 8 +- .../flink/action/MergeIntoActionITCase.java | 2 +- .../action/RemoveOrphanFilesActionITCase.java | 81 +-- .../action/RewriteFileIndexActionITCase.java | 2 +- .../flink/action/RollbackToActionITCase.java | 8 +- .../paimon/flink/action/TagActionITCase.java | 24 +- .../procedure/RemoveOrphanFilesProcedure.java | 47 +- 28 files changed, 1109 insertions(+), 764 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/SerializableConsumer.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/SerializablePredicate.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java rename paimon-core/src/test/java/org/apache/paimon/operation/{OrphanFilesCleanTest.java => LocalOrphanFilesCleanTest.java} (94%) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedOneInputOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedTwoInputOperator.java diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java b/paimon-common/src/main/java/org/apache/paimon/options/Options.java index b526764204f2..d292fef3bf14 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java @@ -153,6 +153,10 @@ public synchronized void remove(String key) { data.remove(key); } + public synchronized void remove(ConfigOption option) { + data.remove(option.key()); + } + public synchronized boolean containsKey(String key) { return data.containsKey(key); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/SerializableConsumer.java b/paimon-common/src/main/java/org/apache/paimon/utils/SerializableConsumer.java new file mode 100644 index 000000000000..bcca006e4360 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/SerializableConsumer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.Serializable; +import java.util.function.Consumer; + +/** + * This interface is basically Java's {@link Consumer} interface enhanced with the {@link + * Serializable}. + * + * @param type of the consumed elements. + */ +@FunctionalInterface +public interface SerializableConsumer extends Consumer, Serializable {} diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/SerializablePredicate.java b/paimon-common/src/main/java/org/apache/paimon/utils/SerializablePredicate.java new file mode 100644 index 000000000000..7f2dee5fd8c4 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/SerializablePredicate.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.Serializable; +import java.util.function.Predicate; + +/** + * This interface is basically Java's {@link Predicate} interface enhanced with the {@link + * Serializable}. + */ +@FunctionalInterface +public interface SerializablePredicate extends Predicate, Serializable {} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java new file mode 100644 index 000000000000..9cce1061fdd4 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.SerializableConsumer; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; +import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute; + +/** Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. */ +public class LocalOrphanFilesClean extends OrphanFilesClean { + + private final ThreadPoolExecutor executor; + + private static final int SHOW_LIMIT = 200; + + private final List deleteFiles; + + public LocalOrphanFilesClean(FileStoreTable table) { + this(table, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1)); + } + + public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) { + this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path)); + } + + public LocalOrphanFilesClean( + FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { + super(table, olderThanMillis, fileCleaner); + this.deleteFiles = new ArrayList<>(); + this.executor = + createCachedThreadPool( + table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); + } + + public List clean() throws IOException, ExecutionException, InterruptedException { + List branches = validBranches(); + + // specially handle to clear snapshot dir + cleanSnapshotDir(branches, deleteFiles::add); + + // delete candidate files + Map candidates = getCandidateDeletingFiles(); + + // find used files + Set usedFiles = + branches.stream() + .flatMap(branch -> getUsedFiles(branch).stream()) + .collect(Collectors.toSet()); + + // delete unused files + Set deleted = new HashSet<>(candidates.keySet()); + deleted.removeAll(usedFiles); + deleted.stream().map(candidates::get).forEach(fileCleaner); + deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList())); + + return deleteFiles; + } + + private List getUsedFiles(String branch) { + List usedFiles = new ArrayList<>(); + ManifestFile manifestFile = + table.switchToBranch(branch).store().manifestFileFactory().create(); + try { + List manifests = new ArrayList<>(); + collectWithoutDataFile( + branch, usedFiles::add, manifest -> manifests.add(manifest.fileName())); + usedFiles.addAll(retryReadingDataFiles(manifestFile, manifests)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return usedFiles; + } + + /** + * Get all the candidate deleting files in the specified directories and filter them by + * olderThanMillis. + */ + private Map getCandidateDeletingFiles() { + List fileDirs = listPaimonFileDirs(); + Function> processor = + path -> + tryBestListingDirs(path).stream() + .filter(this::oldEnough) + .map(FileStatus::getPath) + .collect(Collectors.toList()); + Iterator allPaths = randomlyExecute(executor, processor, fileDirs); + Map result = new HashMap<>(); + while (allPaths.hasNext()) { + Path next = allPaths.next(); + result.put(next.getName(), next); + } + return result; + } + + private List retryReadingDataFiles( + ManifestFile manifestFile, List manifestNames) throws IOException { + List dataFiles = new ArrayList<>(); + for (String manifestName : manifestNames) { + retryReadingFiles( + () -> manifestFile.readWithIOException(manifestName), + Collections.emptyList()) + .stream() + .map(ManifestEntry::file) + .forEach( + f -> { + dataFiles.add(f.fileName()); + dataFiles.addAll(f.extraFiles()); + }); + } + return dataFiles; + } + + public static List showDeletedFiles(List deleteFiles, int showLimit) { + int showSize = Math.min(deleteFiles.size(), showLimit); + List result = new ArrayList<>(); + if (deleteFiles.size() > showSize) { + result.add( + String.format( + "Total %s files, only %s lines are displayed.", + deleteFiles.size(), showSize)); + } + for (int i = 0; i < showSize; i++) { + result.add(deleteFiles.get(i).toUri().getPath()); + } + return result; + } + + public static List createOrphanFilesCleans( + Catalog catalog, + String databaseName, + @Nullable String tableName, + long olderThanMillis, + SerializableConsumer fileCleaner, + @Nullable Integer parallelism) + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + List orphanFilesCleans = new ArrayList<>(); + List tableNames = Collections.singletonList(tableName); + if (tableName == null || "*".equals(tableName)) { + tableNames = catalog.listTables(databaseName); + } + + Map dynamicOptions = + parallelism == null + ? Collections.emptyMap() + : new HashMap() { + { + put( + CoreOptions.DELETE_FILE_THREAD_NUM.key(), + parallelism.toString()); + } + }; + + for (String t : tableNames) { + Identifier identifier = new Identifier(databaseName, t); + Table table = catalog.getTable(identifier).copy(dynamicOptions); + checkArgument( + table instanceof FileStoreTable, + "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", + table.getClass().getName()); + + orphanFilesCleans.add( + new LocalOrphanFilesClean( + (FileStoreTable) table, olderThanMillis, fileCleaner)); + } + + return orphanFilesCleans; + } + + public static String[] executeOrphanFilesClean(List tableCleans) { + ExecutorService executorService = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List>> tasks = new ArrayList<>(); + for (LocalOrphanFilesClean clean : tableCleans) { + tasks.add(executorService.submit(clean::clean)); + } + + List cleanOrphanFiles = new ArrayList<>(); + for (Future> task : tasks) { + try { + cleanOrphanFiles.addAll(task.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + executorService.shutdownNow(); + return showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT).toArray(new String[0]); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index f5a32583fbf1..8ce95337f27d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -18,31 +18,24 @@ package org.apache.paimon.operation; -import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; -import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.Table; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.SerializableConsumer; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; -import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; -import org.apache.paimon.shade.guava30.com.google.common.collect.Sets; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,31 +43,21 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TimeZone; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Predicate; -import java.util.stream.Collectors; +import static java.util.Collections.emptyList; import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; -import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; -import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute; +import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; /** * To remove the data files and metadata files that are not used by table (so-called "orphan @@ -82,8 +65,8 @@ * *

It will ignore exception when listing all files because it's OK to not delete unread files. * - *

To avoid deleting newly written files, it only deletes orphan files older than 1 day by - * default. The interval can be modified by {@link #olderThan}. + *

To avoid deleting newly written files, it only deletes orphan files older than {@code + * olderThanMillis} (1 day by default). * *

To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the * snapshot/tag when catching {@link FileNotFoundException} in the process of listing used files. @@ -91,62 +74,31 @@ *

To avoid deleting files that are used but not read by mistaken, it will stop removing process * when failed to read used files. */ -public class OrphanFilesClean { - - private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesClean.class); - - private final ThreadPoolExecutor executor; +public abstract class OrphanFilesClean implements Serializable { - private static final int READ_FILE_RETRY_NUM = 3; - private static final int READ_FILE_RETRY_INTERVAL = 5; - private static final int SHOW_LIMIT = 200; + protected static final Logger LOG = LoggerFactory.getLogger(OrphanFilesClean.class); - private final FileStoreTable table; - private final FileIO fileIO; - private final Path location; - private final int partitionKeysNum; + protected static final int READ_FILE_RETRY_NUM = 3; + protected static final int READ_FILE_RETRY_INTERVAL = 5; - private final List deleteFiles; - private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); - private Consumer fileCleaner; + protected final FileStoreTable table; + protected final FileIO fileIO; + protected final long olderThanMillis; + protected final SerializableConsumer fileCleaner; + protected final int partitionKeysNum; + protected final Path location; - public OrphanFilesClean(FileStoreTable table) { + public OrphanFilesClean( + FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { this.table = table; this.fileIO = table.fileIO(); - this.location = table.location(); this.partitionKeysNum = table.partitionKeys().size(); - - this.deleteFiles = new ArrayList<>(); - this.fileCleaner = - path -> { - try { - if (fileIO.isDir(path)) { - fileIO.deleteDirectoryQuietly(path); - } else { - fileIO.deleteQuietly(path); - } - } catch (IOException ignored) { - } - }; - this.executor = - createCachedThreadPool( - table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); - } - - public OrphanFilesClean olderThan(String timestamp) { - // The FileStatus#getModificationTime returns milliseconds - this.olderThanMillis = - DateTimeUtils.parseTimestampData(timestamp, 3, TimeZone.getDefault()) - .getMillisecond(); - return this; - } - - public OrphanFilesClean fileCleaner(Consumer fileCleaner) { + this.location = table.location(); + this.olderThanMillis = olderThanMillis; this.fileCleaner = fileCleaner; - return this; } - public List clean() throws IOException, ExecutionException, InterruptedException { + protected List validBranches() { List branches = table.branchManager().branches(); branches.add(BranchManager.DEFAULT_MAIN_BRANCH); @@ -157,16 +109,16 @@ public List clean() throws IOException, ExecutionException, InterruptedExc } } if (!abnormalBranches.isEmpty()) { - LOG.warn( - "Branches {} have no schemas. Orphan files cleaning aborted. " - + "Please check these branches manually.", - abnormalBranches); - return Collections.emptyList(); + throw new RuntimeException( + String.format( + "Branches %s have no schemas. Orphan files cleaning aborted. " + + "Please check these branches manually.", + abnormalBranches)); } + return branches; + } - Map candidates = getCandidateDeletingFiles(); - Set usedFiles = new HashSet<>(); - + protected void cleanSnapshotDir(List branches, Consumer deletedFileConsumer) { for (String branch : branches) { FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); @@ -174,244 +126,179 @@ public List clean() throws IOException, ExecutionException, InterruptedExc // specially handle the snapshot directory List nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough); nonSnapshotFiles.forEach(fileCleaner); - deleteFiles.addAll(nonSnapshotFiles); + nonSnapshotFiles.forEach(deletedFileConsumer); // specially handle the changelog directory List nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough); nonChangelogFiles.forEach(fileCleaner); - deleteFiles.addAll(nonChangelogFiles); - - usedFiles.addAll(getUsedFiles(branchTable)); + nonChangelogFiles.forEach(deletedFileConsumer); } + } - Set deleted = new HashSet<>(candidates.keySet()); - deleted.removeAll(usedFiles); - deleted.stream().map(candidates::get).forEach(fileCleaner); - - deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList())); - return deleteFiles; + protected void collectWithoutDataFile( + String branch, + Consumer usedFileConsumer, + Consumer manifestConsumer) + throws IOException { + for (Snapshot snapshot : safelyGetAllSnapshots(branch)) { + collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer); + } } - /** Get all the files used by snapshots and tags. */ - private Set getUsedFiles(FileStoreTable branchTable) throws IOException { + protected Set safelyGetAllSnapshots(String branch) throws IOException { + FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); TagManager tagManager = branchTable.tagManager(); - - // safely get all snapshots to be read Set readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots()); - List taggedSnapshots = tagManager.taggedSnapshots(); - readSnapshots.addAll(taggedSnapshots); + readSnapshots.addAll(tagManager.taggedSnapshots()); readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs()); - - return Sets.newHashSet( - randomlyExecute( - executor, snapshot -> getUsedFiles(branchTable, snapshot), readSnapshots)); + return readSnapshots; } - private List getUsedFiles(FileStoreTable branchTable, Snapshot snapshot) { + protected void collectWithoutDataFile( + String branch, + Snapshot snapshot, + Consumer usedFileConsumer, + Consumer manifestConsumer) + throws IOException { + FileStoreTable branchTable = table.switchToBranch(branch); ManifestList manifestList = branchTable.store().manifestListFactory().create(); - ManifestFile manifestFile = branchTable.store().manifestFileFactory().create(); + IndexFileHandler indexFileHandler = branchTable.store().newIndexFileHandler(); + List manifestFileMetas = new ArrayList<>(); + // changelog manifest + if (snapshot.changelogManifestList() != null) { + usedFileConsumer.accept(snapshot.changelogManifestList()); + manifestFileMetas.addAll( + retryReadingFiles( + () -> + manifestList.readWithIOException( + snapshot.changelogManifestList()), + emptyList())); + } - if (snapshot instanceof Changelog) { - return getUsedFilesForChangelog(manifestList, manifestFile, (Changelog) snapshot); - } else { - return getUsedFilesForSnapshot( - manifestList, - manifestFile, - branchTable.store().newIndexFileHandler(), - snapshot); + // delta manifest + if (snapshot.deltaManifestList() != null) { + usedFileConsumer.accept(snapshot.deltaManifestList()); + manifestFileMetas.addAll( + retryReadingFiles( + () -> manifestList.readWithIOException(snapshot.deltaManifestList()), + emptyList())); + } + + // base manifest + usedFileConsumer.accept(snapshot.baseManifestList()); + manifestFileMetas.addAll( + retryReadingFiles( + () -> manifestList.readWithIOException(snapshot.baseManifestList()), + emptyList())); + + // collect manifests + for (ManifestFileMeta manifest : manifestFileMetas) { + manifestConsumer.accept(manifest); + usedFileConsumer.accept(manifest.fileName()); + } + + // index files + String indexManifest = snapshot.indexManifest(); + if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { + usedFileConsumer.accept(indexManifest); + retryReadingFiles( + () -> indexFileHandler.readManifestWithIOException(indexManifest), + Collections.emptyList()) + .stream() + .map(IndexManifestEntry::indexFile) + .map(IndexFileMeta::fileName) + .forEach(usedFileConsumer); } + + // statistic file + if (snapshot.statistics() != null) { + usedFileConsumer.accept(snapshot.statistics()); + } + } + + /** List directories that contains data files and manifest files. */ + protected List listPaimonFileDirs() { + List paimonFileDirs = new ArrayList<>(); + + paimonFileDirs.add(new Path(location, "manifest")); + paimonFileDirs.add(new Path(location, "index")); + paimonFileDirs.add(new Path(location, "statistics")); + paimonFileDirs.addAll(listFileDirs(location, partitionKeysNum)); + + return paimonFileDirs; } /** - * Get all the candidate deleting files in the specified directories and filter them by - * olderThanMillis. + * List directories that contains data files. The argument level is used to control recursive + * depth. */ - private Map getCandidateDeletingFiles() { - List fileDirs = listPaimonFileDirs(); - Function> processor = - path -> - tryBestListingDirs(path).stream() - .filter(this::oldEnough) - .map(FileStatus::getPath) - .collect(Collectors.toList()); - Iterator allPaths = randomlyExecute(executor, processor, fileDirs); - Map result = new HashMap<>(); - while (allPaths.hasNext()) { - Path next = allPaths.next(); - result.put(next.getName(), next); - } - return result; - } + private List listFileDirs(Path dir, int level) { + List dirs = tryBestListingDirs(dir); - private List getUsedFilesForChangelog( - ManifestList manifestList, ManifestFile manifestFile, Changelog changelog) { - List files = new ArrayList<>(); - List manifestFileMetas = new ArrayList<>(); - try { - // try to read manifests - // changelog manifest - List changelogManifest = new ArrayList<>(); - if (changelog.changelogManifestList() != null) { - files.add(changelog.changelogManifestList()); - changelogManifest = - retryReadingFiles( - () -> - manifestList.readWithIOException( - changelog.changelogManifestList())); - if (changelogManifest != null) { - manifestFileMetas.addAll(changelogManifest); - } - } + if (level == 0) { + // return bucket paths + return filterDirs(dirs, p -> p.getName().startsWith(BUCKET_PATH_PREFIX)); + } - // base manifest - if (manifestList.exists(changelog.baseManifestList())) { - files.add(changelog.baseManifestList()); - List baseManifest = - retryReadingFiles( - () -> - manifestList.readWithIOException( - changelog.baseManifestList())); - if (baseManifest != null) { - manifestFileMetas.addAll(baseManifest); - } - } + List partitionPaths = filterDirs(dirs, p -> p.getName().contains("=")); - // delta manifest - List deltaManifest = null; - if (manifestList.exists(changelog.deltaManifestList())) { - files.add(changelog.deltaManifestList()); - deltaManifest = - retryReadingFiles( - () -> - manifestList.readWithIOException( - changelog.deltaManifestList())); - if (deltaManifest != null) { - manifestFileMetas.addAll(deltaManifest); - } - } + List result = new ArrayList<>(); + for (Path partitionPath : partitionPaths) { + result.addAll(listFileDirs(partitionPath, level - 1)); + } + return result; + } - files.addAll( - manifestFileMetas.stream() - .map(ManifestFileMeta::fileName) - .collect(Collectors.toList())); - - // data file - List manifestFileName = new ArrayList<>(); - if (changelog.changelogManifestList() != null) { - manifestFileName.addAll( - changelogManifest == null - ? new ArrayList<>() - : changelogManifest.stream() - .map(ManifestFileMeta::fileName) - .collect(Collectors.toList())); - } else { - manifestFileName.addAll( - deltaManifest == null - ? new ArrayList<>() - : deltaManifest.stream() - .map(ManifestFileMeta::fileName) - .collect(Collectors.toList())); - } + private List filterDirs(List statuses, Predicate filter) { + List filtered = new ArrayList<>(); - // try to read data files - List dataFiles = retryReadingDataFiles(manifestFile, manifestFileName); - if (dataFiles == null) { - return Collections.emptyList(); + for (FileStatus status : statuses) { + Path path = status.getPath(); + if (filter.test(path)) { + filtered.add(path); } - files.addAll(dataFiles); - } catch (IOException e) { - throw new RuntimeException(e); + // ignore unknown dirs } - return files; + return filtered; } /** - * If getting null when reading some files, the snapshot/tag is being deleted, so just return an - * empty result. + * If failed to list directory, just return an empty result because it's OK to not delete them. */ - private List getUsedFilesForSnapshot( - ManifestList manifestList, - ManifestFile manifestFile, - IndexFileHandler indexFileHandler, - Snapshot snapshot) { - List files = new ArrayList<>(); - addManifestList(files, snapshot); - + protected List tryBestListingDirs(Path dir) { try { - // try to read manifests - List manifestFileMetas = - retryReadingFiles( - () -> readAllManifestsWithIOException(manifestList, snapshot)); - if (manifestFileMetas == null) { - return Collections.emptyList(); - } - List manifestFileName = - manifestFileMetas.stream() - .map(ManifestFileMeta::fileName) - .collect(Collectors.toList()); - files.addAll(manifestFileName); - - // try to read data files - List dataFiles = retryReadingDataFiles(manifestFile, manifestFileName); - if (dataFiles == null) { - return Collections.emptyList(); - } - files.addAll(dataFiles); - - // try to read index files - String indexManifest = snapshot.indexManifest(); - if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { - files.add(indexManifest); - - List indexManifestEntries = - retryReadingFiles( - () -> indexFileHandler.readManifestWithIOException(indexManifest)); - if (indexManifestEntries == null) { - return Collections.emptyList(); - } - - indexManifestEntries.stream() - .map(IndexManifestEntry::indexFile) - .map(IndexFileMeta::fileName) - .forEach(files::add); + if (!fileIO.exists(dir)) { + return emptyList(); } - // try to read statistic - if (snapshot.statistics() != null) { - files.add(snapshot.statistics()); - } + return retryReadingFiles( + () -> { + FileStatus[] s = fileIO.listStatus(dir); + return s == null ? emptyList() : Arrays.asList(s); + }, + emptyList()); } catch (IOException e) { - throw new RuntimeException(e); - } - - return files; - } - - private void addManifestList(List used, Snapshot snapshot) { - used.add(snapshot.baseManifestList()); - used.add(snapshot.deltaManifestList()); - String changelogManifestList = snapshot.changelogManifestList(); - if (changelogManifestList != null) { - used.add(changelogManifestList); + LOG.debug("Failed to list directory {}, skip it.", dir, e); + return emptyList(); } } /** * Retry reading files when {@link IOException} was thrown by the reader. If the exception is - * {@link FileNotFoundException}, return null. Finally, if retry times reaches the limits, - * rethrow the IOException. + * {@link FileNotFoundException}, return default value. Finally, if retry times reaches the + * limits, rethrow the IOException. */ - @Nullable - private T retryReadingFiles(ReaderWithIOException reader) throws IOException { + protected static T retryReadingFiles(ReaderWithIOException reader, T defaultValue) + throws IOException { int retryNumber = 0; IOException caught = null; while (retryNumber++ < READ_FILE_RETRY_NUM) { try { return reader.read(); } catch (FileNotFoundException e) { - return null; + return defaultValue; } catch (IOException e) { caught = e; } @@ -426,208 +313,43 @@ private T retryReadingFiles(ReaderWithIOException reader) throws IOExcept throw caught; } - private List readAllManifestsWithIOException( - ManifestList manifestList, Snapshot snapshot) throws IOException { - List result = new ArrayList<>(); - - result.addAll(manifestList.readWithIOException(snapshot.baseManifestList())); - result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList())); - - String changelogManifestList = snapshot.changelogManifestList(); - if (changelogManifestList != null) { - result.addAll(manifestList.readWithIOException(changelogManifestList)); - } - - return result; - } - - @Nullable - private List retryReadingDataFiles( - ManifestFile manifestFile, List manifestNames) throws IOException { - List dataFiles = new ArrayList<>(); - for (String manifestName : manifestNames) { - List manifestEntries = - retryReadingFiles(() -> manifestFile.readWithIOException(manifestName)); - if (manifestEntries == null) { - return null; - } - - manifestEntries.stream() - .map(ManifestEntry::file) - .forEach( - f -> { - dataFiles.add(f.fileName()); - dataFiles.addAll(f.extraFiles()); - }); - } - return dataFiles; - } - - /** List directories that contains data files and manifest files. */ - private List listPaimonFileDirs() { - List paimonFileDirs = new ArrayList<>(); - - paimonFileDirs.add(new Path(location, "manifest")); - paimonFileDirs.add(new Path(location, "index")); - paimonFileDirs.add(new Path(location, "statistics")); - paimonFileDirs.addAll(listAndCleanDataDirs(location, partitionKeysNum)); - - return paimonFileDirs; - } - - /** - * If failed to list directory, just return an empty result because it's OK to not delete them. - */ - private List tryBestListingDirs(Path dir) { - try { - if (!fileIO.exists(dir)) { - return Collections.emptyList(); - } - - List status = - retryReadingFiles( - () -> { - FileStatus[] s = fileIO.listStatus(dir); - return s == null ? Collections.emptyList() : Arrays.asList(s); - }); - return status == null ? Collections.emptyList() : status; - } catch (IOException e) { - LOG.debug("Failed to list directory {}, skip it.", dir, e); - return Collections.emptyList(); - } - } - - private boolean oldEnough(FileStatus status) { + protected boolean oldEnough(FileStatus status) { return status.getModificationTime() < olderThanMillis; } - /** - * List directories that contains data files and may clean non Paimon data dirs/files. The - * argument level is used to control recursive depth. - */ - private List listAndCleanDataDirs(Path dir, int level) { - List dirs = tryBestListingDirs(dir); - - if (level == 0) { - // return bucket paths - return filterAndCleanDataDirs( - dirs, - p -> p.getName().startsWith(BUCKET_PATH_PREFIX), - // if buckets are under partition, we can do clean - partitionKeysNum -> partitionKeysNum != 0); - } - - List partitionPaths = - filterAndCleanDataDirs( - dirs, - p -> p.getName().contains("="), - // if partitions are under a parent partition, we can do clean - partitionKeysNum -> level != partitionKeysNum); - - // dive into the next partition level - return Lists.newArrayList( - randomlyExecute(executor, p -> listAndCleanDataDirs(p, level - 1), partitionPaths)); - } - - private List filterAndCleanDataDirs( - List statuses, Predicate filter, Predicate cleanCondition) { - List filtered = new ArrayList<>(); - List mayBeClean = new ArrayList<>(); - - for (FileStatus status : statuses) { - Path path = status.getPath(); - if (filter.test(path)) { - filtered.add(path); - } else { - mayBeClean.add(status); - } - } - - if (cleanCondition.test(partitionKeysNum)) { - mayBeClean.stream() - .filter(this::oldEnough) - .map(FileStatus::getPath) - .forEach( - p -> { - fileCleaner.accept(p); - synchronized (deleteFiles) { - deleteFiles.add(p); - } - }); - } - - return filtered; - } - /** A helper functional interface for method {@link #retryReadingFiles}. */ @FunctionalInterface - private interface ReaderWithIOException { + protected interface ReaderWithIOException { T read() throws IOException; } - public static List showDeletedFiles(List deleteFiles, int showLimit) { - int showSize = Math.min(deleteFiles.size(), showLimit); - List result = new ArrayList<>(); - if (deleteFiles.size() > showSize) { - result.add( - String.format( - "Total %s files, only %s lines are displayed.", - deleteFiles.size(), showSize)); - } - for (int i = 0; i < showSize; i++) { - result.add(deleteFiles.get(i).toUri().getPath()); - } - return result; - } - - public static List createOrphanFilesCleans( - Catalog catalog, - Map tableConfig, - String databaseName, - @Nullable String tableName) - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - List orphanFilesCleans = new ArrayList<>(); - List tableNames = Collections.singletonList(tableName); - if (tableName == null || "*".equals(tableName)) { - tableNames = catalog.listTables(databaseName); - } - - for (String t : tableNames) { - Identifier identifier = new Identifier(databaseName, t); - Table table = catalog.getTable(identifier).copy(tableConfig); - checkArgument( - table instanceof FileStoreTable, - "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", - table.getClass().getName()); - - orphanFilesCleans.add(new OrphanFilesClean((FileStoreTable) table)); + public static SerializableConsumer createFileCleaner( + Catalog catalog, @Nullable Boolean dryRun) { + SerializableConsumer fileCleaner; + if (Boolean.TRUE.equals(dryRun)) { + fileCleaner = path -> {}; + } else { + FileIO fileIO = catalog.fileIO(); + fileCleaner = + path -> { + try { + if (fileIO.isDir(path)) { + fileIO.deleteDirectoryQuietly(path); + } else { + fileIO.deleteQuietly(path); + } + } catch (IOException ignored) { + } + }; } - - return orphanFilesCleans; + return fileCleaner; } - public static String[] executeOrphanFilesClean(List tableCleans) { - ExecutorService executorService = - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - List>> tasks = new ArrayList<>(); - for (OrphanFilesClean clean : tableCleans) { - tasks.add(executorService.submit(clean::clean)); - } - - List cleanOrphanFiles = new ArrayList<>(); - for (Future> task : tasks) { - try { - cleanOrphanFiles.addAll(task.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - executorService.shutdownNow(); - return showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT).toArray(new String[0]); + public static long olderThanMillis(@Nullable String olderThan) { + return isNullOrWhitespaceOnly(olderThan) + ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1) + : DateTimeUtils.parseTimestampData(olderThan, 3, TimeZone.getDefault()) + .getMillisecond(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 60d0db79177f..f8be79cb4efb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -25,8 +25,10 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataFilePlan; import org.apache.paimon.table.source.DataSplit; @@ -47,6 +49,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -58,11 +61,11 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { private final FileStoreTable fallback; - public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback) { - super(main); + public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable fallback) { + super(wrapped); this.fallback = fallback; - Preconditions.checkArgument(!(main instanceof FallbackReadFileStoreTable)); + Preconditions.checkArgument(!(wrapped instanceof FallbackReadFileStoreTable)); Preconditions.checkArgument(!(fallback instanceof FallbackReadFileStoreTable)); } @@ -96,7 +99,25 @@ public FileStoreTable copyWithLatestSchema() { @Override public FileStoreTable switchToBranch(String branchName) { - return new FallbackReadFileStoreTable(wrapped.switchToBranch(branchName), fallback); + return new FallbackReadFileStoreTable(switchWrappedToBranch(branchName), fallback); + } + + private FileStoreTable switchWrappedToBranch(String branchName) { + Optional optionalSchema = + new SchemaManager(wrapped.fileIO(), wrapped.location(), branchName).latest(); + Preconditions.checkArgument( + optionalSchema.isPresent(), "Branch " + branchName + " does not exist"); + + TableSchema branchSchema = optionalSchema.get(); + Options branchOptions = new Options(branchSchema.options()); + branchOptions.set(CoreOptions.BRANCH, branchName); + branchSchema = branchSchema.copy(branchOptions.toMap()); + return FileStoreTableFactory.createWithoutFallbackBranch( + wrapped.fileIO(), + wrapped.location(), + branchSchema, + new Options(), + wrapped.catalogEnvironment()); } private Map rewriteFallbackOptions(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index d0753259f412..19c87cc7467c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -106,7 +106,7 @@ public static FileStoreTable create( return table; } - private static FileStoreTable createWithoutFallbackBranch( + public static FileStoreTable createWithoutFallbackBranch( FileIO fileIO, Path tablePath, TableSchema tableSchema, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java similarity index 94% rename from paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java rename to paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 9135015d73d0..fdc68b34abb4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -49,7 +49,6 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; @@ -84,8 +83,8 @@ import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link OrphanFilesClean}. */ -public class OrphanFilesCleanTest { +/** Test for {@link LocalOrphanFilesClean}. */ +public class LocalOrphanFilesCleanTest { private static final Random RANDOM = new Random(System.currentTimeMillis()); @@ -174,12 +173,13 @@ public void testNormallyRemoving() throws Throwable { } // first check, nothing will be deleted because the default olderThan interval is 1 day - OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table); + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); assertThat(orphanFilesClean.clean().size()).isEqualTo(0); // second check - orphanFilesClean = new OrphanFilesClean(table); - setOlderThan(orphanFilesClean); + orphanFilesClean = + new LocalOrphanFilesClean( + table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); List deleted = orphanFilesClean.clean(); try { validate(deleted, snapshotData, new HashMap<>()); @@ -221,16 +221,11 @@ private void validate( Map> snapshotData, Map> changelogData) throws Exception { - assertThat( - deleteFiles.stream() - .map(p -> p.toUri().getPath()) - .sorted() - .collect(Collectors.joining("\n"))) - .isEqualTo( + assertThat(deleteFiles.stream().map(p -> p.toUri().getPath())) + .containsExactlyInAnyOrderElementsOf( manuallyAddedFiles.stream() .map(p -> p.toUri().getPath()) - .sorted() - .collect(Collectors.joining("\n"))); + .collect(Collectors.toList())); Set snapshots = new HashSet<>(); table.snapshotManager().snapshots().forEachRemaining(snapshots::add); @@ -367,12 +362,13 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); // first check, nothing will be deleted because the default olderThan interval is 1 day - OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table); + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); assertThat(orphanFilesClean.clean().size()).isEqualTo(0); // second check - orphanFilesClean = new OrphanFilesClean(table); - setOlderThan(orphanFilesClean); + orphanFilesClean = + new LocalOrphanFilesClean( + table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); List deleted = orphanFilesClean.clean(); validate(deleted, snapshotData, changelogData); } @@ -400,8 +396,9 @@ public void testAbnormallyRemoving() throws Exception { Path manifest = manifests.get(RANDOM.nextInt(manifests.size())); fileIO.deleteQuietly(manifest); - OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table); - setOlderThan(orphanFilesClean); + LocalOrphanFilesClean orphanFilesClean = + new LocalOrphanFilesClean( + table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); assertThat(orphanFilesClean.clean().size()).isGreaterThan(0); } @@ -482,15 +479,6 @@ private int generateUnUsedFile() throws Exception { return shouldBeDeleted; } - private void setOlderThan(OrphanFilesClean orphanFilesClean) { - String timestamp = - DateTimeUtils.formatLocalDateTime( - DateTimeUtils.toLocalDateTime( - System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)), - 3); - orphanFilesClean.olderThan(timestamp); - } - private List generateData() { int num = RANDOM.nextInt(6) + 5; List data = new ArrayList<>(num); @@ -561,19 +549,11 @@ private void recordChangelogData( private int randomlyAddNonUsedDataFiles() throws IOException { int addedFiles = 0; List part1 = listSubDirs(tablePath, p -> p.getName().contains("=")); - // add non used file at partition part1 - List corruptedPartitions = randomlyPick(part1); - for (Path path : corruptedPartitions) { - addNonUsedFiles(path, 1, Collections.singletonList("UNKNOWN")); - } - addedFiles += corruptedPartitions.size(); - List part2 = new ArrayList<>(); + List buckets = new ArrayList<>(); for (Path path : part1) { part2.addAll(listSubDirs(path, p -> p.getName().contains("="))); } - - List buckets = new ArrayList<>(); for (Path path : part2) { buckets.addAll(listSubDirs(path, p -> p.getName().startsWith(BUCKET_PATH_PREFIX))); } diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 6e160ebbc85f..c5fa7b7ba34a 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -18,19 +18,13 @@ package org.apache.paimon.flink.procedure; -import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.operation.OrphanFilesClean; -import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean; import org.apache.flink.table.procedure.ProcedureContext; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean; +import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner; +import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis; /** * Remove orphan files procedure. Usage: @@ -62,7 +56,7 @@ public String[] call(ProcedureContext procedureContext, String tableId, String o public String[] call( ProcedureContext procedureContext, String tableId, String olderThan, boolean dryRun) throws Exception { - return call(procedureContext, tableId, olderThan, dryRun, ""); + return call(procedureContext, tableId, olderThan, dryRun, null); } public String[] call( @@ -70,34 +64,22 @@ public String[] call( String tableId, String olderThan, boolean dryRun, - String parallelism) + Integer parallelism) throws Exception { Identifier identifier = Identifier.fromString(tableId); String databaseName = identifier.getDatabaseName(); String tableName = identifier.getObjectName(); - Map dynamicOptions = - StringUtils.isNullOrWhitespaceOnly(parallelism) - ? Collections.emptyMap() - : new HashMap() { - { - put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), parallelism); - } - }; - - List tableCleans = - OrphanFilesClean.createOrphanFilesCleans( - catalog, dynamicOptions, databaseName, tableName); - - if (!StringUtils.isNullOrWhitespaceOnly(olderThan)) { - tableCleans.forEach(clean -> clean.olderThan(olderThan)); - } - - if (dryRun) { - tableCleans.forEach(clean -> clean.fileCleaner(path -> {})); - } - - return executeOrphanFilesClean(tableCleans); + long deleted = + FlinkOrphanFilesClean.executeDatabaseOrphanFiles( + procedureContext.getExecutionEnvironment(), + catalog, + olderThanMillis(olderThan), + createFileCleaner(catalog, dryRun), + parallelism, + databaseName, + tableName); + return new String[] {String.valueOf(deleted)}; } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java index 9b369a0dcc71..5c7e6967c319 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java @@ -18,44 +18,53 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.operation.OrphanFilesClean; - import javax.annotation.Nullable; -import java.util.List; import java.util.Map; -import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean; +import static org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.executeDatabaseOrphanFiles; +import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner; +import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis; /** Action to remove the orphan data files and metadata files. */ public class RemoveOrphanFilesAction extends ActionBase { - private final List tableCleans; + private final String databaseName; + @Nullable private final String tableName; + @Nullable private final String parallelism; + + private String olderThan = null; + private boolean dryRun = false; public RemoveOrphanFilesAction( String warehouse, String databaseName, @Nullable String tableName, - Map catalogConfig, - Map dynamicOptions) - throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { + @Nullable String parallelism, + Map catalogConfig) { super(warehouse, catalogConfig); - this.tableCleans = - OrphanFilesClean.createOrphanFilesCleans( - catalog, dynamicOptions, databaseName, tableName); + this.databaseName = databaseName; + this.tableName = tableName; + this.parallelism = parallelism; } public void olderThan(String olderThan) { - tableCleans.forEach(clean -> clean.olderThan(olderThan)); + this.olderThan = olderThan; } public void dryRun() { - tableCleans.forEach(clean -> clean.fileCleaner(path -> {})); + this.dryRun = true; } @Override public void run() throws Exception { - executeOrphanFilesClean(tableCleans); + executeDatabaseOrphanFiles( + env, + catalog, + olderThanMillis(olderThan), + createFileCleaner(catalog, dryRun), + parallelism == null ? null : Integer.parseInt(parallelism), + databaseName, + tableName); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java index 6ac6e728021f..ed567510d143 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.action; -import org.apache.paimon.CoreOptions; - -import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -46,21 +43,11 @@ public Optional create(MultipleParameterToolAdapter params) { String database = params.get(DATABASE); checkNotNull(database); String table = params.get(TABLE); + String parallelism = params.get(PARALLELISM); Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - Map dynamicOptions = new HashMap<>(); - if (params.has(PARALLELISM)) { - dynamicOptions.put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), params.get(PARALLELISM)); - } - - RemoveOrphanFilesAction action; - try { - action = - new RemoveOrphanFilesAction( - warehouse, database, table, catalogConfig, dynamicOptions); - } catch (Exception e) { - throw new RuntimeException(e); - } + RemoveOrphanFilesAction action = + new RemoveOrphanFilesAction(warehouse, database, table, parallelism, catalogConfig); if (params.has(OLDER_THAN)) { action.olderThan(params.get(OLDER_THAN)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java new file mode 100644 index 000000000000..6740e8980fe2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.orphan; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.flink.utils.BoundedTwoInputOperator; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.operation.OrphanFilesClean; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.SerializableConsumer; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; +import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Flink {@link OrphanFilesClean}, it will submit a job for a table. */ +public class FlinkOrphanFilesClean extends OrphanFilesClean { + + @Nullable protected final Integer parallelism; + + public FlinkOrphanFilesClean( + FileStoreTable table, + long olderThanMillis, + SerializableConsumer fileCleaner, + @Nullable Integer parallelism) { + super(table, olderThanMillis, fileCleaner); + this.parallelism = parallelism; + } + + @Nullable + public DataStream doOrphanClean(StreamExecutionEnvironment env) { + Configuration flinkConf = new Configuration(); + flinkConf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); + flinkConf.set(ExecutionOptions.SORT_INPUTS, false); + flinkConf.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false); + if (parallelism != null) { + flinkConf.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); + } + // Flink 1.17 introduced this config, use string to keep compatibility + flinkConf.setString("execution.batch.adaptive.auto-parallelism.enabled", "false"); + env.configure(flinkConf); + + List branches = validBranches(); + + // snapshot and changelog files are the root of everything, so they are handled specially + // here, and subsequently, we will not count their orphan files. + AtomicLong deletedInLocal = new AtomicLong(0); + cleanSnapshotDir(branches, p -> deletedInLocal.incrementAndGet()); + + // branch and manifest file + final OutputTag> manifestOutputTag = + new OutputTag>("manifest-output") {}; + + SingleOutputStreamOperator usedManifestFiles = + env.fromCollection(branches) + .process( + new ProcessFunction>() { + @Override + public void processElement( + String branch, + ProcessFunction>.Context + ctx, + Collector> out) + throws Exception { + for (Snapshot snapshot : safelyGetAllSnapshots(branch)) { + out.collect(new Tuple2<>(branch, snapshot.toJson())); + } + } + }) + .rebalance() + .process( + new ProcessFunction, String>() { + + @Override + public void processElement( + Tuple2 branchAndSnapshot, + ProcessFunction, String>.Context + ctx, + Collector out) + throws Exception { + String branch = branchAndSnapshot.f0; + Snapshot snapshot = Snapshot.fromJson(branchAndSnapshot.f1); + Consumer manifestConsumer = + manifest -> { + Tuple2 tuple2 = + new Tuple2<>( + branch, manifest.fileName()); + ctx.output(manifestOutputTag, tuple2); + }; + collectWithoutDataFile( + branch, snapshot, out::collect, manifestConsumer); + } + }); + + DataStream usedFiles = + usedManifestFiles + .getSideOutput(manifestOutputTag) + .keyBy(tuple2 -> tuple2.f0 + ":" + tuple2.f1) + .transform( + "datafile-reader", + STRING_TYPE_INFO, + new BoundedOneInputOperator, String>() { + + private final Set> manifests = + new HashSet<>(); + + @Override + public void processElement( + StreamRecord> element) { + manifests.add(element.getValue()); + } + + @Override + public void endInput() throws IOException { + Map branchManifests = new HashMap<>(); + for (Tuple2 tuple2 : manifests) { + ManifestFile manifestFile = + branchManifests.computeIfAbsent( + tuple2.f0, + key -> + table.switchToBranch(key) + .store() + .manifestFileFactory() + .create()); + retryReadingFiles( + () -> + manifestFile + .readWithIOException( + tuple2.f1), + Collections.emptyList()) + .forEach( + f -> { + List files = + new ArrayList<>(); + files.add(f.fileName()); + files.addAll(f.file().extraFiles()); + files.forEach( + file -> + output.collect( + new StreamRecord<>( + file))); + }); + } + } + }); + + usedFiles = usedFiles.union(usedManifestFiles); + + List fileDirs = + listPaimonFileDirs().stream() + .map(Path::toUri) + .map(Object::toString) + .collect(Collectors.toList()); + DataStream candidates = + env.fromCollection(fileDirs) + .process( + new ProcessFunction() { + @Override + public void processElement( + String dir, + ProcessFunction.Context ctx, + Collector out) { + for (FileStatus fileStatus : + tryBestListingDirs(new Path(dir))) { + if (oldEnough(fileStatus)) { + out.collect( + fileStatus.getPath().toUri().toString()); + } + } + } + }); + + DataStream deleted = + usedFiles + .keyBy(f -> f) + .connect(candidates.keyBy(path -> new Path(path).getName())) + .transform( + "files_join", + LONG_TYPE_INFO, + new BoundedTwoInputOperator() { + + private boolean buildEnd; + private long emitted; + + private final Set used = new HashSet<>(); + + @Override + public InputSelection nextSelection() { + return buildEnd + ? InputSelection.SECOND + : InputSelection.FIRST; + } + + @Override + public void endInput(int inputId) { + switch (inputId) { + case 1: + checkState(!buildEnd, "Should not build ended."); + LOG.info("Finish build phase."); + buildEnd = true; + break; + case 2: + checkState(buildEnd, "Should build ended."); + LOG.info("Finish probe phase."); + LOG.info("Clean files: {}", emitted); + output.collect(new StreamRecord<>(emitted)); + break; + } + } + + @Override + public void processElement1(StreamRecord element) { + used.add(element.getValue()); + } + + @Override + public void processElement2(StreamRecord element) { + checkState(buildEnd, "Should build ended."); + String value = element.getValue(); + Path path = new Path(value); + if (!used.contains(path.getName())) { + fileCleaner.accept(path); + LOG.info("Dry clean: {}", path); + emitted++; + } + } + }); + + if (deletedInLocal.get() != 0) { + deleted = deleted.union(env.fromData(deletedInLocal.get())); + } + return deleted; + } + + public static long executeDatabaseOrphanFiles( + StreamExecutionEnvironment env, + Catalog catalog, + long olderThanMillis, + SerializableConsumer fileCleaner, + @Nullable Integer parallelism, + String databaseName, + @Nullable String tableName) + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + List> orphanFilesCleans = new ArrayList<>(); + List tableNames = Collections.singletonList(tableName); + if (tableName == null || "*".equals(tableName)) { + tableNames = catalog.listTables(databaseName); + } + + for (String t : tableNames) { + Identifier identifier = new Identifier(databaseName, t); + Table table = catalog.getTable(identifier); + checkArgument( + table instanceof FileStoreTable, + "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", + table.getClass().getName()); + + DataStream clean = + new FlinkOrphanFilesClean( + (FileStoreTable) table, + olderThanMillis, + fileCleaner, + parallelism) + .doOrphanClean(env); + if (clean != null) { + orphanFilesCleans.add(clean); + } + } + + DataStream result = null; + for (DataStream clean : orphanFilesCleans) { + if (result == null) { + result = clean; + } else { + result = result.union(clean); + } + } + + return sum(result); + } + + private static long sum(DataStream deleted) { + long deleteCount = 0; + if (deleted != null) { + try { + CloseableIterator iterator = + deleted.global().executeAndCollect("OrphanFilesClean"); + while (iterator.hasNext()) { + deleteCount += iterator.next(); + } + iterator.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return deleteCount; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 1d6cb0886d34..9bc20350d447 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -18,22 +18,16 @@ package org.apache.paimon.flink.procedure; -import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.operation.OrphanFilesClean; -import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean; +import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner; +import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis; /** * Remove orphan files procedure. Usage: @@ -61,49 +55,29 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase { type = @DataTypeHint("STRING"), isOptional = true), @ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true), - @ArgumentHint( - name = "parallelism", - type = @DataTypeHint("STRING"), - isOptional = true) + @ArgumentHint(name = "parallelism", type = @DataTypeHint("INT"), isOptional = true) }) public String[] call( ProcedureContext procedureContext, String tableId, - String nullableOlderThan, + String olderThan, Boolean dryRun, - String parallelism) + Integer parallelism) throws Exception { - final String olderThan = notnull(nullableOlderThan); - if (dryRun == null) { - dryRun = false; - } - Identifier identifier = Identifier.fromString(tableId); String databaseName = identifier.getDatabaseName(); String tableName = identifier.getObjectName(); - Map dynamicOptions = - StringUtils.isNullOrWhitespaceOnly(parallelism) - ? Collections.emptyMap() - : new HashMap() { - { - put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), parallelism); - } - }; - - List tableCleans = - OrphanFilesClean.createOrphanFilesCleans( - catalog, dynamicOptions, databaseName, tableName); - - if (!StringUtils.isNullOrWhitespaceOnly(olderThan)) { - tableCleans.forEach(clean -> clean.olderThan(olderThan)); - } - - if (dryRun) { - tableCleans.forEach(clean -> clean.fileCleaner(path -> {})); - } - - return executeOrphanFilesClean(tableCleans); + long deleted = + FlinkOrphanFilesClean.executeDatabaseOrphanFiles( + procedureContext.getExecutionEnvironment(), + catalog, + olderThanMillis(olderThan), + createFileCleaner(catalog, dryRun), + parallelism, + databaseName, + tableName); + return new String[] {String.valueOf(deleted)}; } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedOneInputOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedOneInputOperator.java new file mode 100644 index 000000000000..b220d401a4b7 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedOneInputOperator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; + +/** A {@link OneInputStreamOperator} with {@link BoundedOneInput}. */ +public abstract class BoundedOneInputOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput {} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedTwoInputOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedTwoInputOperator.java new file mode 100644 index 000000000000..8334e399005b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/BoundedTwoInputOperator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; + +/** A {@link TwoInputStreamOperator} with {@link BoundedMultiInput}. */ +public abstract class BoundedTwoInputOperator extends AbstractStreamOperator + implements TwoInputStreamOperator, BoundedMultiInput, InputSelectable {} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index 1b17ee17279c..f8ea18950035 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -191,12 +191,12 @@ private void confuseArgs(String[] args, String regex, String replacement) { } } - protected CloseableIterator callProcedure(String procedureStatement) { + protected CloseableIterator executeSQL(String procedureStatement) { // default execution mode - return callProcedure(procedureStatement, true, false); + return executeSQL(procedureStatement, true, false); } - protected CloseableIterator callProcedure( + protected CloseableIterator executeSQL( String procedureStatement, boolean isStreaming, boolean dmlSync) { TableEnvironment tEnv; if (isStreaming) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 261515e8719f..a8286964832a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -71,30 +71,30 @@ void testCreateAndDeleteBranch() throws Exception { writeData(rowData(3L, BinaryString.fromString("Paimon"))); TagManager tagManager = new TagManager(table.fileIO(), table.location()); - callProcedure( + executeSQL( String.format( "CALL sys.create_tag('%s.%s', 'tag2', 2, '5 d')", database, tableName)); assertThat(tagManager.tagExists("tag2")).isTrue(); BranchManager branchManager = table.branchManager(); - callProcedure( + executeSQL( String.format( "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')", database, tableName)); assertThat(branchManager.branchExists("branch_name")).isTrue(); - callProcedure( + executeSQL( String.format( "CALL sys.create_branch(`table` => '%s.%s', branch => 'branch_name_named_argument', tag => 'tag2')", database, tableName)); assertThat(branchManager.branchExists("branch_name_named_argument")).isTrue(); - callProcedure( + executeSQL( String.format( "CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName)); assertThat(branchManager.branchExists("branch_name")).isFalse(); - callProcedure( + executeSQL( String.format( "CALL sys.delete_branch(`table` => '%s.%s', branch => 'branch_name_named_argument')", database, tableName)); @@ -158,25 +158,25 @@ void testCreateAndDeleteEmptyBranch() throws Exception { writeData(rowData(3L, BinaryString.fromString("Paimon"))); BranchManager branchManager = table.branchManager(); - callProcedure( + executeSQL( String.format( "CALL sys.create_branch('%s.%s', 'empty_branch_name')", database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isTrue(); - callProcedure( + executeSQL( String.format( "CALL sys.create_branch(`table` => '%s.%s', branch => 'empty_branch_named_argument')", database, tableName)); assertThat(branchManager.branchExists("empty_branch_named_argument")).isTrue(); - callProcedure( + executeSQL( String.format( "CALL sys.delete_branch('%s.%s', 'empty_branch_name')", database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); - callProcedure( + executeSQL( String.format( "CALL sys.delete_branch(`table` => '%s.%s', branch => 'empty_branch_named_argument')", database, tableName)); @@ -237,17 +237,15 @@ void testFastForward() throws Exception { // Create tag2 TagManager tagManager = new TagManager(table.fileIO(), table.location()); - callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + executeSQL(String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); assertThat(tagManager.tagExists("tag2")).isTrue(); // Create tag3 - callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); + executeSQL(String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); assertThat(tagManager.tagExists("tag3")).isTrue(); // Create branch_name branch BranchManager branchManager = table.branchManager(); - callProcedure( + executeSQL( String.format( "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')", database, tableName)); @@ -270,7 +268,7 @@ void testFastForward() throws Exception { assertThat(branchManager.branchExists("branch_name_action")).isTrue(); // Fast-forward branch branch_name - callProcedure( + executeSQL( String.format( "CALL sys.fast_forward('%s.%s', 'branch_name')", database, tableName)); @@ -325,7 +323,7 @@ void testFastForward() throws Exception { Assert.assertEquals(expected, sortedActual); // Fast-forward branch branch_name again - callProcedure( + executeSQL( String.format( "CALL sys.fast_forward(`table` => '%s.%s', branch => 'branch_name')", database, tableName)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java index 82ac7e58eb66..71672551abcb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java @@ -82,7 +82,7 @@ public void testCloneTable(String invoker) throws Exception { ActionFactory.createAction(args).get().run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.clone('%s', 'db1', 't1', '', '%s', 'mydb', 'myt')", sourceWarehouse, targetWarehouse), @@ -90,7 +90,7 @@ public void testCloneTable(String invoker) throws Exception { true); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.clone(warehouse => '%s', database => 'db1', `table` => 't1', target_warehouse => '%s', target_database => 'mydb', target_table => 'myt')", sourceWarehouse, targetWarehouse), @@ -141,7 +141,7 @@ public void testCloneDatabase(String invoker) throws Exception { ActionFactory.createAction(args).get().run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.clone('%s', 'db1', '', '', '%s', 'mydb')", sourceWarehouse, targetWarehouse), @@ -149,7 +149,7 @@ public void testCloneDatabase(String invoker) throws Exception { true); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.clone(warehouse => '%s', database => 'db1', target_warehouse => '%s', target_database => 'mydb')", sourceWarehouse, targetWarehouse), @@ -201,7 +201,7 @@ public void testCloneWarehouse(String invoker) throws Exception { ActionFactory.createAction(args).get().run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.clone('%s', '', '', '', '%s')", sourceWarehouse, targetWarehouse), @@ -209,7 +209,7 @@ public void testCloneWarehouse(String invoker) throws Exception { true); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.clone(warehouse => '%s', target_warehouse => '%s')", sourceWarehouse, targetWarehouse), @@ -408,7 +408,7 @@ public void testCloneWithSchemaEvolution(String invoker) throws Exception { ActionFactory.createAction(args).get().run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.clone('%s', '', '', '', '%s')", sourceWarehouse, targetWarehouse), @@ -416,7 +416,7 @@ public void testCloneWithSchemaEvolution(String invoker) throws Exception { true); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.clone(warehouse => '%s', target_warehouse => '%s')", sourceWarehouse, targetWarehouse), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index 5e346499ef75..1edaa87427d1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -143,11 +143,10 @@ public void testStreamCompactForUnawareTable(String mode, String invoker) throws env.executeAsync(); break; case "procedure_indexed": - callProcedure( - String.format("CALL sys.compact_database('', '%s')", mode), true, false); + executeSQL(String.format("CALL sys.compact_database('', '%s')", mode), true, false); break; case "procedure_named": - callProcedure( + executeSQL( String.format("CALL sys.compact_database(mode => '%s')", mode), true, false); @@ -272,11 +271,10 @@ public void testBatchCompact(String mode, String invoker) throws Exception { env.execute(); break; case "procedure_indexed": - callProcedure( - String.format("CALL sys.compact_database('', '%s')", mode), false, true); + executeSQL(String.format("CALL sys.compact_database('', '%s')", mode), false, true); break; case "procedure_named": - callProcedure( + executeSQL( String.format("CALL sys.compact_database(mode => '%s')", mode), false, true); @@ -383,9 +381,9 @@ public void testStreamingCompact(String mode, String invoker) throws Exception { break; case "procedure_indexed": if (mode.equals("divided")) { - callProcedure("CALL sys.compact_database()", true, false); + executeSQL("CALL sys.compact_database()", true, false); } else { - callProcedure( + executeSQL( "CALL sys.compact_database('', 'combined', '', '', 'continuous.discovery-interval=1s')", true, false); @@ -393,9 +391,9 @@ public void testStreamingCompact(String mode, String invoker) throws Exception { break; case "procedure_named": if (mode.equals("divided")) { - callProcedure("CALL sys.compact_database()", true, false); + executeSQL("CALL sys.compact_database()", true, false); } else { - callProcedure( + executeSQL( "CALL sys.compact_database(mode => 'combined', table_options => 'continuous.discovery-interval=1s')", true, false); @@ -628,7 +626,7 @@ public void testHistoryPartitionCompact(String mode, String invoker) throws Exce env.execute(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.compact_database('', '%s','','','','%s')", mode, partitionIdleTime), @@ -636,7 +634,7 @@ public void testHistoryPartitionCompact(String mode, String invoker) throws Exce true); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.compact_database(mode => '%s', partition_idle_time => '%s')", mode, partitionIdleTime), @@ -795,14 +793,14 @@ private void includingAndExcludingTablesImpl( break; case "procedure_indexed": if (mode.equals("divided")) { - callProcedure( + executeSQL( String.format( "CALL sys.compact_database('', 'divided', '%s', '%s')", nonNull(includingPattern), nonNull(excludesPattern)), false, true); } else { - callProcedure( + executeSQL( String.format( "CALL sys.compact_database('', 'combined', '%s', '%s', 'continuous.discovery-interval=1s')", nonNull(includingPattern), nonNull(excludesPattern)), @@ -812,14 +810,14 @@ private void includingAndExcludingTablesImpl( break; case "procedure_named": if (mode.equals("divided")) { - callProcedure( + executeSQL( String.format( "CALL sys.compact_database(mode => 'divided', including_tables => '%s', excluding_tables => '%s')", nonNull(includingPattern), nonNull(excludesPattern)), false, true); } else { - callProcedure( + executeSQL( String.format( "CALL sys.compact_database(mode => 'combined', including_tables => '%s', excluding_tables => '%s', table_options => 'continuous.discovery-interval=1s')", nonNull(includingPattern), nonNull(excludesPattern)), @@ -917,7 +915,7 @@ public void testUnawareBucketStreamingCompact() throws Exception { .build(); env.executeAsync(); } else { - callProcedure("CALL sys.compact_database()"); + executeSQL("CALL sys.compact_database()"); } for (FileStoreTable table : tables) { @@ -991,7 +989,7 @@ public void testUnawareBucketBatchCompact() throws Exception { .build(); env.execute(); } else { - callProcedure("CALL sys.compact_database()", false, true); + executeSQL("CALL sys.compact_database()", false, true); } for (FileStoreTable table : tables) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index c2cb190a80b6..ef921ad666ca 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -105,13 +105,13 @@ public void testResetConsumer(String invoker) throws Exception { createAction(ResetConsumerAction.class, args).run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer('%s.%s', 'myid', 1)", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid', next_snapshot_id => cast(1 as bigint))", database, tableName)); @@ -129,12 +129,12 @@ public void testResetConsumer(String invoker) throws Exception { createAction(ResetConsumerAction.class, args.subList(0, 9)).run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer('%s.%s', 'myid')", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid')", database, tableName)); @@ -213,13 +213,13 @@ public void testResetBranchConsumer(String invoker) throws Exception { createAction(ResetConsumerAction.class, args).run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer('%s.%s', 'myid', 1)", database, branchTableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid', next_snapshot_id => cast(1 as bigint))", database, branchTableName)); @@ -237,13 +237,13 @@ public void testResetBranchConsumer(String invoker) throws Exception { createAction(ResetConsumerAction.class, args.subList(0, 9)).run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer('%s.%s', 'myid')", database, branchTableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.reset_consumer(`table` => '%s.%s', consumer_id => 'myid')", database, branchTableName)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java index 9e919c157304..bb9a02fa1988 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java @@ -70,7 +70,7 @@ public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Except "partKey0=0") .run(); } else { - callProcedure( + executeSQL( String.format( "CALL sys.drop_partition('%s.%s', 'partKey0 = 0')", database, tableName)); @@ -136,7 +136,7 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exce "partKey0=1,partKey1=0") .run(); } else { - callProcedure( + executeSQL( String.format( "CALL sys.drop_partition('%s.%s', 'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')", database, tableName)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index 1c0ec8587816..3c1b73df8cb3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -81,13 +81,13 @@ public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk, String in .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.mark_partition_done('%s.%s', 'partKey0 = 0')", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0 = 0')", database, tableName)); @@ -125,13 +125,13 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, String invo .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", database, tableName)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 78f83fcfa3e0..41d607fac5f6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -589,7 +589,7 @@ private void validateProcedureResult( throws Exception { BlockingIterator iterator = testStreamingRead(buildSimpleQuery("T"), initialRecords); - callProcedure(procedureStatement, true, true); + executeSQL(procedureStatement, true, true); // test batch read first to ensure TABLE_DML_SYNC works testBatchRead(buildSimpleQuery("T"), batchExpected); // test streaming read diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java index b0bd7d0d6236..82c09c6674ed 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java @@ -48,7 +48,9 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -97,9 +99,7 @@ private Path getOrphanFilePath(FileStoreTable table, String orphanFile) { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRunWithoutException(boolean isNamedArgument) throws Exception { - FileStoreTable table = createTableAndWriteData(tableName); - Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1); - Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2); + createTableAndWriteData(tableName); List args = new ArrayList<>( @@ -126,8 +126,8 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { : "CALL sys.remove_orphan_files('%s.%s')", database, tableName); - CloseableIterator withoutOlderThanCollect = callProcedure(withoutOlderThan); - assertThat(ImmutableList.copyOf(withoutOlderThanCollect).size()).isEqualTo(0); + CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); + assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); String withDryRun = String.format( @@ -136,11 +136,8 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)", database, tableName); - ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(callProcedure(withDryRun)); - assertThat(actualDryRunDeleteFile) - .containsExactlyInAnyOrder( - Row.of(orphanFile1.toUri().getPath()), - Row.of(orphanFile2.toUri().getPath())); + ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); + assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2")); String withOlderThan = String.format( @@ -149,23 +146,16 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", database, tableName); - ImmutableList actualDeleteFile = ImmutableList.copyOf(callProcedure(withOlderThan)); + ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); - assertThat(actualDeleteFile) - .containsExactlyInAnyOrder( - Row.of(orphanFile1.toUri().getPath()), - Row.of(orphanFile2.toUri().getPath())); + assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2")); } @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws Exception { - FileStoreTable table1 = createTableAndWriteData("tableName1"); - Path orphanFile11 = getOrphanFilePath(table1, ORPHAN_FILE_1); - Path orphanFile12 = getOrphanFilePath(table1, ORPHAN_FILE_2); - FileStoreTable table2 = createTableAndWriteData("tableName2"); - Path orphanFile21 = getOrphanFilePath(table2, ORPHAN_FILE_1); - Path orphanFile22 = getOrphanFilePath(table2, ORPHAN_FILE_2); + createTableAndWriteData("tableName1"); + createTableAndWriteData("tableName2"); List args = new ArrayList<>( @@ -197,13 +187,13 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws : "CALL sys.remove_orphan_files('%s.%s')", database, "*"); - CloseableIterator withoutOlderThanCollect = callProcedure(withoutOlderThan); - assertThat(ImmutableList.copyOf(withoutOlderThanCollect).size()).isEqualTo(0); + CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); + assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); String withParallelism = - String.format("CALL sys.remove_orphan_files('%s.%s','',true,'5')", database, "*"); - CloseableIterator withParallelismCollect = callProcedure(withParallelism); - assertThat(ImmutableList.copyOf(withParallelismCollect).size()).isEqualTo(0); + String.format("CALL sys.remove_orphan_files('%s.%s','',true,5)", database, "*"); + CloseableIterator withParallelismCollect = executeSQL(withParallelism); + assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0")); String withDryRun = String.format( @@ -212,13 +202,8 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)", database, "*"); - ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(callProcedure(withDryRun)); - assertThat(actualDryRunDeleteFile) - .containsExactlyInAnyOrder( - Row.of(orphanFile11.toUri().getPath()), - Row.of(orphanFile12.toUri().getPath()), - Row.of(orphanFile21.toUri().getPath()), - Row.of(orphanFile22.toUri().getPath())); + ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); + assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4")); String withOlderThan = String.format( @@ -227,14 +212,9 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", database, "*"); - ImmutableList actualDeleteFile = ImmutableList.copyOf(callProcedure(withOlderThan)); - - assertThat(actualDeleteFile) - .containsExactlyInAnyOrder( - Row.of(orphanFile11.toUri().getPath()), - Row.of(orphanFile12.toUri().getPath()), - Row.of(orphanFile21.toUri().getPath()), - Row.of(orphanFile22.toUri().getPath())); + ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); + + assertThat(actualDeleteFile).containsOnly(Row.of("4")); } @ParameterizedTest @@ -242,8 +222,6 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws public void testCleanWithBranch(boolean isNamedArgument) throws Exception { // create main branch FileStoreTable table = createTableAndWriteData(tableName); - Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1); - Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2); // create first branch and write some data table.createBranch("br"); @@ -275,6 +253,14 @@ public void testCleanWithBranch(boolean isNamedArgument) throws Exception { Path orphanFile4 = new Path(table.location(), "branch/branch-br2/snapshot/orphan_file4"); branchTable.fileIO().writeFile(orphanFile4, "y", true); + if (ThreadLocalRandom.current().nextBoolean()) { + executeSQL( + String.format( + "ALTER TABLE `%s`.`%s` SET ('%s' = 'br')", + database, tableName, SCAN_FALLBACK_BRANCH.key()), + false, + true); + } String procedure = String.format( isNamedArgument @@ -282,12 +268,7 @@ public void testCleanWithBranch(boolean isNamedArgument) throws Exception { : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", database, "*"); - ImmutableList actualDeleteFile = ImmutableList.copyOf(callProcedure(procedure)); - assertThat(actualDeleteFile) - .containsExactlyInAnyOrder( - Row.of(orphanFile1.toUri().getPath()), - Row.of(orphanFile2.toUri().getPath()), - Row.of(orphanFile3.toUri().getPath()), - Row.of(orphanFile4.toUri().getPath())); + ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(procedure)); + assertThat(actualDeleteFile).containsOnly(Row.of("4")); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java index d9513c17ef1b..242a7514168e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java @@ -83,7 +83,7 @@ public void testFileIndexAddIndex() throws Exception { .withStreamExecutionEnvironment(env) .run(); } else { - callProcedure("CALL sys.rewrite_file_index('test_db.T')"); + executeSQL("CALL sys.rewrite_file_index('test_db.T')"); } FileStoreTable table = (FileStoreTable) catalog.getTable(new Identifier("test_db", "T")); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java index a9e0726e7d15..859f8deda4d1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java @@ -84,13 +84,13 @@ public void rollbackToSnapshotTest(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.rollback_to('%s.%s', '', cast(2 as bigint))", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.rollback_to(`table` => '%s.%s', snapshot_id => cast(2 as bigint))", database, tableName)); @@ -142,12 +142,12 @@ public void rollbackToTagTest(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.rollback_to('%s.%s', 'tag2')", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.rollback_to(`table` => '%s.%s', tag => 'tag2')", database, tableName)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java index 9603d9085dfb..aa98de38715b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java @@ -91,12 +91,12 @@ public void testCreateAndDeleteTag(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag(`table` => '%s.%s', tag => 'tag2', snapshot_id => cast(2 as bigint))", database, tableName)); @@ -127,11 +127,11 @@ public void testCreateAndDeleteTag(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format("CALL sys.delete_tag('%s.%s', 'tag2')", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.delete_tag(`table` => '%s.%s', tag => 'tag2')", database, tableName)); @@ -160,12 +160,12 @@ public void testCreateAndDeleteTag(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag('%s.%s', 'tag1', 1)", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag(`table` => '%s.%s', tag => 'tag1', snapshot_id => cast(1 as bigint))", database, tableName)); @@ -193,12 +193,12 @@ public void testCreateAndDeleteTag(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag(`table` => '%s.%s', tag => 'tag3', snapshot_id => cast(3 as bigint))", database, tableName)); @@ -223,12 +223,12 @@ public void testCreateAndDeleteTag(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.delete_tag('%s.%s', 'tag1,tag3')", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.delete_tag(`table` => '%s.%s', tag => 'tag1,tag3')", database, tableName)); @@ -284,12 +284,12 @@ public void testCreateLatestTag(String invoker) throws Exception { .run(); break; case "procedure_indexed": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); break; case "procedure_named": - callProcedure( + executeSQL( String.format( "CALL sys.create_tag(`table` => '%s.%s', tag => 'tag2', snapshot_id => cast(2 as bigint))", database, tableName)); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index 0735fa8cecd5..7a3c8df4d829 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -18,11 +18,11 @@ package org.apache.paimon.spark.procedure; -import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.spark.catalog.WithPaimonCatalog; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.StringUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -35,13 +35,11 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean; +import static org.apache.paimon.operation.LocalOrphanFilesClean.executeOrphanFilesClean; import static org.apache.spark.sql.types.DataTypes.BooleanType; +import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; /** @@ -63,7 +61,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { ProcedureParameter.required("table", StringType), ProcedureParameter.optional("older_than", StringType), ProcedureParameter.optional("dry_run", BooleanType), - ProcedureParameter.optional("parallelism", StringType) + ProcedureParameter.optional("parallelism", IntegerType) }; private static final StructType OUTPUT_TYPE = @@ -90,16 +88,6 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { org.apache.paimon.catalog.Identifier identifier; String tableId = args.getString(0); - String parallelism = args.isNullAt(3) ? null : args.getString(3); - Map dynamicOptions = - StringUtils.isNullOrWhitespaceOnly(parallelism) - ? Collections.emptyMap() - : new HashMap() { - { - put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), parallelism); - } - }; - Preconditions.checkArgument( tableId != null && !tableId.isEmpty(), "Cannot handle an empty tableId for argument %s", @@ -114,28 +102,23 @@ public InternalRow[] call(InternalRow args) { } LOG.info("identifier is {}.", identifier); - List tableCleans; + List tableCleans; try { + Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); tableCleans = - OrphanFilesClean.createOrphanFilesCleans( - ((WithPaimonCatalog) tableCatalog()).paimonCatalog(), - dynamicOptions, + LocalOrphanFilesClean.createOrphanFilesCleans( + catalog, identifier.getDatabaseName(), - identifier.getObjectName()); + identifier.getObjectName(), + OrphanFilesClean.olderThanMillis( + args.isNullAt(1) ? null : args.getString(1)), + OrphanFilesClean.createFileCleaner( + catalog, !args.isNullAt(2) && args.getBoolean(2)), + args.isNullAt(3) ? null : args.getInt(3)); } catch (Exception e) { throw new RuntimeException(e); } - String olderThan = args.isNullAt(1) ? null : args.getString(1); - if (!StringUtils.isNullOrWhitespaceOnly(olderThan)) { - tableCleans.forEach(clean -> clean.olderThan(olderThan)); - } - - boolean dryRun = !args.isNullAt(2) && args.getBoolean(2); - if (dryRun) { - tableCleans.forEach(clean -> clean.fileCleaner(path -> {})); - } - String[] result = executeOrphanFilesClean(tableCleans); List rows = new ArrayList<>(); Arrays.stream(result)