diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 1b7b2c8bb96d..46f36be7f1e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -23,6 +23,7 @@ import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.Preconditions; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -30,7 +31,8 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; +import java.util.concurrent.Future; +import java.util.function.Supplier; /** Entry representing a file. */ public interface FileEntry { @@ -117,23 +119,10 @@ static void mergeEntries( ManifestFile manifestFile, List manifestFiles, Map map) { - List>> manifestReadFutures = - manifestFiles.stream() - .map( - manifestFileMeta -> - CompletableFuture.supplyAsync( - () -> - manifestFile.read( - manifestFileMeta.fileName()), - FileUtils.COMMON_IO_FORK_JOIN_POOL)) - .collect(Collectors.toList()); - - try { - for (CompletableFuture> taskResult : manifestReadFutures) { - mergeEntries(taskResult.get(), map); - } - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException("Failed to read manifest file.", e); + List>> manifestReadFutures = + readManifestEntries(manifestFile, manifestFiles); + for (Supplier> taskResult : manifestReadFutures) { + mergeEntries(taskResult.get(), map); } } @@ -167,6 +156,26 @@ static void mergeEntries(Iterable entries, Map>> readManifestEntries( + ManifestFile manifestFile, List manifestFiles) { + List>> result = new ArrayList<>(); + for (ManifestFileMeta file : manifestFiles) { + Future> future = + CompletableFuture.supplyAsync( + () -> manifestFile.read(file.fileName()), + FileUtils.COMMON_IO_FORK_JOIN_POOL); + result.add( + () -> { + try { + return future.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Failed to read manifest file.", e); + } + }); + } + return result; + } + static void assertNoDelete(Collection entries) { for (T entry : entries) { Preconditions.checkState( diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index a434e2acd28c..4aa6c2931ebd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -81,14 +81,7 @@ public long suggestedFileSize() { *

NOTE: This method is atomic. */ public List write(List entries) { - RollingFileWriter writer = - new RollingFileWriter<>( - () -> - new ManifestEntryWriter( - writerFactory, - pathFactory.newPath(), - CoreOptions.FILE_COMPRESSION.defaultValue()), - suggestedFileSize); + RollingFileWriter writer = createRollingWriter(); try { writer.write(entries); writer.close(); @@ -98,6 +91,16 @@ public List write(List entries) { return writer.result(); } + public RollingFileWriter createRollingWriter() { + return new RollingFileWriter<>( + () -> + new ManifestEntryWriter( + writerFactory, + pathFactory.newPath(), + CoreOptions.FILE_COMPRESSION.defaultValue()), + suggestedFileSize); + } + private class ManifestEntryWriter extends SingleFileWriter { private final TableStatsCollector partitionStatsCollector; diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java index beecb4482d25..c0bcdd061a91 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java @@ -19,6 +19,7 @@ package org.apache.paimon.manifest; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.manifest.FileEntry.Identifier; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; @@ -28,6 +29,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.slf4j.Logger; @@ -41,8 +43,11 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Metadata of a manifest file. */ public class ManifestFileMeta { @@ -170,7 +175,7 @@ public static List merge( for (ManifestFileMeta manifest : newMetas) { manifestFile.delete(manifest.fileName); } - throw e; + throw new RuntimeException(e); } } @@ -229,7 +234,8 @@ public static Optional> tryFullCompaction( ManifestFile manifestFile, long suggestedMetaSize, long sizeTrigger, - RowType partitionType) { + RowType partitionType) + throws Exception { // 1. should trigger full compaction List base = new ArrayList<>(); @@ -311,15 +317,16 @@ public static Optional> tryFullCompaction( } }); - Map fullMerged = new LinkedHashMap<>(); + List mergedEntries = new ArrayList<>(); for (; j < base.size(); j++) { ManifestFileMeta file = base.get(j); - FileEntry.mergeEntries(manifestFile.read(file.fileName), fullMerged); boolean contains = false; - for (Identifier identifier : deleteEntries) { - if (fullMerged.containsKey(identifier)) { + for (ManifestEntry entry : manifestFile.read(file.fileName)) { + checkArgument(entry.kind() == FileKind.ADD); + if (deleteEntries.contains(entry.identifier())) { contains = true; - break; + } else { + mergedEntries.add(entry); } } if (contains) { @@ -327,25 +334,53 @@ public static Optional> tryFullCompaction( j++; break; } else { - fullMerged.clear(); + mergedEntries.clear(); result.add(file); } } - // 2.3. merge base files + // 2.3. merge - FileEntry.mergeEntries(manifestFile, base.subList(j, base.size()), fullMerged); - FileEntry.mergeEntries(deltaMerged.values(), fullMerged); + RollingFileWriter writer = + manifestFile.createRollingWriter(); + Exception exception = null; + try { - // 2.4. write new manifest files + // 2.3.1 merge mergedEntries + for (ManifestEntry entry : mergedEntries) { + writer.write(entry); + } - if (!fullMerged.isEmpty()) { - List merged = - manifestFile.write(new ArrayList<>(fullMerged.values())); - result.addAll(merged); - newMetas.addAll(merged); + // 2.3.2 merge base files + for (Supplier> reader : + FileEntry.readManifestEntries(manifestFile, base.subList(j, base.size()))) { + for (ManifestEntry entry : reader.get()) { + checkArgument(entry.kind() == FileKind.ADD); + if (!deleteEntries.contains(entry.identifier())) { + writer.write(entry); + } + } + } + + // 2.3.3 merge deltaMerged + for (ManifestEntry entry : deltaMerged.values()) { + if (entry.kind() == FileKind.ADD) { + writer.write(entry); + } + } + } catch (Exception e) { + exception = e; + } finally { + if (exception != null) { + IOUtils.closeQuietly(writer); + throw exception; + } + writer.close(); } + List merged = writer.result(); + result.addAll(merged); + newMetas.addAll(merged); return Optional.of(result); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 7ff95be36bed..c08543784951 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -210,7 +210,7 @@ public void testMergeWithoutDeleteFile() { } @Test - public void testTriggerFullCompaction() { + public void testTriggerFullCompaction() throws Exception { List entries = new ArrayList<>(); for (int i = 0; i < 16; i++) { entries.add(makeEntry(true, String.valueOf(i))); @@ -260,7 +260,7 @@ public void testTriggerFullCompaction() { } @Test - public void testMultiPartitionsFullCompaction() { + public void testMultiPartitionsFullCompaction() throws Exception { List input = createBaseManifestFileMetas(true); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 611876867c81..3a9754decb84 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -107,6 +107,7 @@ protected void assertEquivalentEntries( .collect(Collectors.toList()); List entryBeforeMerge = FileEntry.mergeEntries(inputEntry).stream() + .filter(entry -> entry.kind() == FileKind.ADD) .map(entry -> entry.kind() + "-" + entry.file().fileName()) .collect(Collectors.toList());