From 9ed7a4e57d02a6ef47cd79d54b58a70eae7e8c61 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Fri, 23 Aug 2024 18:24:06 +0800 Subject: [PATCH] [core] Fix thread safety issue about IcebergManifestFile in Iceberg commit callback (#4048) --- .../AbstractIcebergCommitCallback.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index c1ab2933d329..f0b4dc2c8145 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -70,6 +70,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -95,6 +96,7 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback { private final IcebergPathFactory pathFactory; private final FileStorePathFactory fileStorePathFactory; + private final Supplier manifestFileFactory; private final IcebergManifestFile manifestFile; private final IcebergManifestList manifestList; @@ -114,15 +116,17 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { + "manifest_entry_data_file:r2," + "r2_partition:r102"); FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro"); - this.manifestFile = - new IcebergManifestFile( - table.fileIO(), - partitionType, - manifestFileAvro.createReaderFactory(entryType), - manifestFileAvro.createWriterFactory(entryType), - table.coreOptions().manifestCompression(), - pathFactory.manifestFileFactory(), - table.coreOptions().manifestTargetSize()); + this.manifestFileFactory = + () -> + new IcebergManifestFile( + table.fileIO(), + partitionType, + manifestFileAvro.createReaderFactory(entryType), + manifestFileAvro.createWriterFactory(entryType), + table.coreOptions().manifestCompression(), + pathFactory.manifestFileFactory(), + table.coreOptions().manifestTargetSize()); + this.manifestFile = manifestFileFactory.get(); Options manifestListAvroOptions = Options.fromMap(table.options()); // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -557,7 +561,9 @@ private List compactMetadataIfNeeded( meta -> { List entries = new ArrayList<>(); for (IcebergManifestEntry entry : - manifestFile.read(new Path(meta.manifestPath()).getName())) { + manifestFileFactory + .get() + .read(new Path(meta.manifestPath()).getName())) { if (entry.fileSequenceNumber() == currentSnapshotId || entry.status() == IcebergManifestEntry.Status.EXISTING) { entries.add(entry);