Skip to content

Commit

Permalink
[core] Fix thread safety issue about IcebergManifestFile in Iceberg c…
Browse files Browse the repository at this point in the history
…ommit callback (apache#4048)
  • Loading branch information
tsreaper authored Aug 23, 2024
1 parent ee49c92 commit 9ed7a4e
Showing 1 changed file with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -95,6 +96,7 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback {
private final IcebergPathFactory pathFactory;
private final FileStorePathFactory fileStorePathFactory;

private final Supplier<IcebergManifestFile> manifestFileFactory;
private final IcebergManifestFile manifestFile;
private final IcebergManifestList manifestList;

Expand All @@ -114,15 +116,17 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
+ "manifest_entry_data_file:r2,"
+ "r2_partition:r102");
FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro");
this.manifestFile =
new IcebergManifestFile(
table.fileIO(),
partitionType,
manifestFileAvro.createReaderFactory(entryType),
manifestFileAvro.createWriterFactory(entryType),
table.coreOptions().manifestCompression(),
pathFactory.manifestFileFactory(),
table.coreOptions().manifestTargetSize());
this.manifestFileFactory =
() ->
new IcebergManifestFile(
table.fileIO(),
partitionType,
manifestFileAvro.createReaderFactory(entryType),
manifestFileAvro.createWriterFactory(entryType),
table.coreOptions().manifestCompression(),
pathFactory.manifestFileFactory(),
table.coreOptions().manifestTargetSize());
this.manifestFile = manifestFileFactory.get();

Options manifestListAvroOptions = Options.fromMap(table.options());
// https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java
Expand Down Expand Up @@ -557,7 +561,9 @@ private List<IcebergManifestFileMeta> compactMetadataIfNeeded(
meta -> {
List<IcebergManifestEntry> entries = new ArrayList<>();
for (IcebergManifestEntry entry :
manifestFile.read(new Path(meta.manifestPath()).getName())) {
manifestFileFactory
.get()
.read(new Path(meta.manifestPath()).getName())) {
if (entry.fileSequenceNumber() == currentSnapshotId
|| entry.status() == IcebergManifestEntry.Status.EXISTING) {
entries.add(entry);
Expand Down

0 comments on commit 9ed7a4e

Please sign in to comment.