From c16cefa5015dda417f80e0d59124cd92448787ab Mon Sep 17 00:00:00 2001 From: leesf <490081539@qq.com> Date: Mon, 21 Oct 2024 20:35:03 +0800 Subject: [PATCH] Core: Move deleteRemovedMetadataFiles(..) to CatalogUtil (#11352) --- .../iceberg/BaseMetastoreTableOperations.java | 49 +------------------ .../java/org/apache/iceberg/CatalogUtil.java | 45 +++++++++++++++++ .../iceberg/hadoop/HadoopTableOperations.java | 40 +-------------- 3 files changed, 48 insertions(+), 86 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 53f3250dc95c..dbab9e813966 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -19,7 +19,6 @@ package org.apache.iceberg; import java.util.Locale; -import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -32,11 +31,8 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -127,7 +123,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { long start = System.currentTimeMillis(); doCommit(base, metadata); - deleteRemovedMetadataFiles(base, metadata); + CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata); requestRefresh(); LOG.info( @@ -354,47 +350,4 @@ private static int parseVersion(String metadataLocation) { return -1; } } - - /** - * Deletes the oldest metadata files if {@link - * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true. - * - * @param base table metadata on which previous versions were based - * @param metadata new table metadata with updated previous versions - */ - private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) { - if (base == null) { - return; - } - - boolean deleteAfterCommit = - metadata.propertyAsBoolean( - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); - - if (deleteAfterCommit) { - Set removedPreviousMetadataFiles = - Sets.newHashSet(base.previousFiles()); - // TableMetadata#addPreviousFile builds up the metadata log and uses - // TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in - // the log, thus we don't include metadata.previousFiles() for deletion - everything else can - // be removed - removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); - if (io() instanceof SupportsBulkOperations) { - ((SupportsBulkOperations) io()) - .deleteFiles( - Iterables.transform( - removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file)); - } else { - Tasks.foreach(removedPreviousMetadataFiles) - .noRetry() - .suppressFailureWhenFinished() - .onFailure( - (previousMetadataFile, exc) -> - LOG.warn( - "Delete failed for previous metadata file: {}", previousMetadataFile, exc)) - .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); - } - } - } } diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 70b10cbaeb62..609e94b7b150 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -515,4 +515,49 @@ public static String fullTableName(String catalogName, TableIdentifier identifie return sb.toString(); } + + /** + * Deletes the oldest metadata files if {@link + * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true. + * + * @param io FileIO instance to use for deletes + * @param base table metadata on which previous versions were based + * @param metadata new table metadata with updated previous versions + */ + public static void deleteRemovedMetadataFiles( + FileIO io, TableMetadata base, TableMetadata metadata) { + if (base == null) { + return; + } + + boolean deleteAfterCommit = + metadata.propertyAsBoolean( + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); + + if (deleteAfterCommit) { + Set removedPreviousMetadataFiles = + Sets.newHashSet(base.previousFiles()); + // TableMetadata#addPreviousFile builds up the metadata log and uses + // TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in + // the log, thus we don't include metadata.previousFiles() for deletion - everything else can + // be removed + removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); + if (io instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) io) + .deleteFiles( + Iterables.transform( + removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file)); + } else { + Tasks.foreach(removedPreviousMetadataFiles) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (previousMetadataFile, exc) -> + LOG.warn( + "Delete failed for previous metadata file: {}", previousMetadataFile, exc)) + .run(previousMetadataFile -> io.deleteFile(previousMetadataFile.file())); + } + } + } } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index 1e0cf4422120..24299371401c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.Set; import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -31,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.LockManager; import org.apache.iceberg.TableMetadata; @@ -45,10 +45,7 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Pair; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,7 +165,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { // update the best-effort version pointer writeVersionHint(nextVersion); - deleteRemovedMetadataFiles(base, metadata); + CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata); this.shouldRefresh = true; } @@ -414,39 +411,6 @@ protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { return Util.getFs(path, hadoopConf); } - /** - * Deletes the oldest metadata files if {@link - * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true. - * - * @param base table metadata on which previous versions were based - * @param metadata new table metadata with updated previous versions - */ - private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) { - if (base == null) { - return; - } - - boolean deleteAfterCommit = - metadata.propertyAsBoolean( - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); - - if (deleteAfterCommit) { - Set removedPreviousMetadataFiles = - Sets.newHashSet(base.previousFiles()); - removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); - Tasks.foreach(removedPreviousMetadataFiles) - .executeWith(ThreadPools.getWorkerPool()) - .noRetry() - .suppressFailureWhenFinished() - .onFailure( - (previousMetadataFile, exc) -> - LOG.warn( - "Delete failed for previous metadata file: {}", previousMetadataFile, exc)) - .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); - } - } - private static TableMetadata checkUUID(TableMetadata currentMetadata, TableMetadata newMetadata) { String newUUID = newMetadata.uuid(); if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {