Skip to content

Commit

Permalink
Core: Move deleteRemovedMetadataFiles(..) to CatalogUtil (apache#11352)
Browse files Browse the repository at this point in the history
  • Loading branch information
leesf authored Oct 21, 2024
1 parent ce75f52 commit c16cefa
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.iceberg;

import java.util.Locale;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand All @@ -32,11 +31,8 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
Expand Down Expand Up @@ -127,7 +123,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {

long start = System.currentTimeMillis();
doCommit(base, metadata);
deleteRemovedMetadataFiles(base, metadata);
CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);
requestRefresh();

LOG.info(
Expand Down Expand Up @@ -354,47 +350,4 @@ private static int parseVersion(String metadataLocation) {
return -1;
}
}

/**
* Deletes the oldest metadata files if {@link
* TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) {
if (base == null) {
return;
}

boolean deleteAfterCommit =
metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);

if (deleteAfterCommit) {
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
Sets.newHashSet(base.previousFiles());
// TableMetadata#addPreviousFile builds up the metadata log and uses
// TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in
// the log, thus we don't include metadata.previousFiles() for deletion - everything else can
// be removed
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
if (io() instanceof SupportsBulkOperations) {
((SupportsBulkOperations) io())
.deleteFiles(
Iterables.transform(
removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file));
} else {
Tasks.foreach(removedPreviousMetadataFiles)
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(previousMetadataFile, exc) ->
LOG.warn(
"Delete failed for previous metadata file: {}", previousMetadataFile, exc))
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}
}
}
45 changes: 45 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,4 +515,49 @@ public static String fullTableName(String catalogName, TableIdentifier identifie

return sb.toString();
}

/**
* Deletes the oldest metadata files if {@link
* TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param io FileIO instance to use for deletes
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
public static void deleteRemovedMetadataFiles(
FileIO io, TableMetadata base, TableMetadata metadata) {
if (base == null) {
return;
}

boolean deleteAfterCommit =
metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);

if (deleteAfterCommit) {
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
Sets.newHashSet(base.previousFiles());
// TableMetadata#addPreviousFile builds up the metadata log and uses
// TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in
// the log, thus we don't include metadata.previousFiles() for deletion - everything else can
// be removed
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
if (io instanceof SupportsBulkOperations) {
((SupportsBulkOperations) io)
.deleteFiles(
Iterables.transform(
removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file));
} else {
Tasks.foreach(removedPreviousMetadataFiles)
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(previousMetadataFile, exc) ->
LOG.warn(
"Delete failed for previous metadata file: {}", previousMetadataFile, exc))
.run(previousMetadataFile -> io.deleteFile(previousMetadataFile.file()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -31,6 +30,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.LockManager;
import org.apache.iceberg.TableMetadata;
Expand All @@ -45,10 +45,7 @@
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -168,7 +165,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
// update the best-effort version pointer
writeVersionHint(nextVersion);

deleteRemovedMetadataFiles(base, metadata);
CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);

this.shouldRefresh = true;
}
Expand Down Expand Up @@ -414,39 +411,6 @@ protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
return Util.getFs(path, hadoopConf);
}

/**
* Deletes the oldest metadata files if {@link
* TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) {
if (base == null) {
return;
}

boolean deleteAfterCommit =
metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);

if (deleteAfterCommit) {
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
Sets.newHashSet(base.previousFiles());
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
Tasks.foreach(removedPreviousMetadataFiles)
.executeWith(ThreadPools.getWorkerPool())
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(previousMetadataFile, exc) ->
LOG.warn(
"Delete failed for previous metadata file: {}", previousMetadataFile, exc))
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}

private static TableMetadata checkUUID(TableMetadata currentMetadata, TableMetadata newMetadata) {
String newUUID = newMetadata.uuid();
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
Expand Down

0 comments on commit c16cefa

Please sign in to comment.