From c32b06e5c6fc6d588c99389a394a62b1b599e6bf Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Tue, 10 Sep 2024 13:55:35 +0800 Subject: [PATCH] [core] Only keep deleted Identifiers in memory for Manifest full merge (#4150) --- .../org/apache/paimon/manifest/FileEntry.java | 94 +++++++++- .../apache/paimon/manifest/ManifestEntry.java | 8 +- .../manifest/ManifestEntrySerializer.java | 4 + .../paimon/manifest/SimpleFileEntry.java | 20 ++- .../manifest/SimpleFileEntrySerializer.java | 3 + .../paimon/operation/ManifestFileMerger.java | 169 +++++++----------- .../paimon/manifest/ManifestFileMetaTest.java | 4 +- 7 files changed, 185 insertions(+), 117 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 145eb93a7c51..7f5fc9de6068 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -19,16 +19,23 @@ package org.apache.paimon.manifest; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; @@ -60,40 +67,74 @@ class Identifier { public final int bucket; public final int level; public final String fileName; + public final List extraFiles; + @Nullable private final byte[] embeddedIndex; /* Cache the hash code for the string */ private Integer hash; - public Identifier(BinaryRow partition, int bucket, int level, String fileName) { + public Identifier( + BinaryRow partition, + int bucket, + int level, + String fileName, + List extraFiles, + @Nullable byte[] embeddedIndex) { this.partition = partition; this.bucket = bucket; this.level = level; this.fileName = fileName; + this.extraFiles = extraFiles; + this.embeddedIndex = embeddedIndex; } @Override public boolean equals(Object o) { - if (!(o instanceof Identifier)) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { return false; } Identifier that = (Identifier) o; - return Objects.equals(partition, that.partition) - && bucket == that.bucket + return bucket == that.bucket && level == that.level - && Objects.equals(fileName, that.fileName); + && Objects.equals(partition, that.partition) + && Objects.equals(fileName, that.fileName) + && Objects.equals(extraFiles, that.extraFiles) + && Objects.deepEquals(embeddedIndex, that.embeddedIndex); } @Override public int hashCode() { if (hash == null) { - hash = Objects.hash(partition, bucket, level, fileName); + hash = + Objects.hash( + partition, + bucket, + level, + fileName, + extraFiles, + Arrays.hashCode(embeddedIndex)); } return hash; } @Override public String toString() { - return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName); + return "{partition=" + + partition + + ", bucket=" + + bucket + + ", level=" + + level + + ", fileName=" + + fileName + + ", extraFiles=" + + extraFiles + + ", embeddedIndex=" + + Arrays.toString(embeddedIndex) + + '}'; } public String toString(FileStorePathFactory pathFactory) { @@ -103,7 +144,11 @@ public String toString(FileStorePathFactory pathFactory) { + ", level " + level + ", file " - + fileName; + + fileName + + ", extraFiles " + + extraFiles + + ", embeddedIndex " + + Arrays.toString(embeddedIndex); } } @@ -161,4 +206,37 @@ static Iterable readManifestEntries( manifestFiles, manifestReadParallelism); } + + static Set readDeletedEntries( + ManifestFile manifestFile, + List manifestFiles, + @Nullable Integer manifestReadParallelism) { + manifestFiles = + manifestFiles.stream() + .filter(file -> file.numDeletedFiles() > 0) + .collect(Collectors.toList()); + Function> processor = + file -> + manifestFile + .read( + file.fileName(), + file.fileSize(), + Filter.alwaysTrue(), + deletedFilter()) + .stream() + .map(ManifestEntry::identifier) + .collect(Collectors.toList()); + Iterable identifiers = + sequentialBatchedExecute(processor, manifestFiles, manifestReadParallelism); + Set result = new HashSet<>(); + for (Identifier identifier : identifiers) { + result.add(identifier); + } + return result; + } + + static Filter deletedFilter() { + Function getter = ManifestEntrySerializer.kindGetter(); + return row -> getter.apply(row) == FileKind.DELETE; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index b6c79295e0bc..49cfe47c13c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -111,7 +111,13 @@ public DataFileMeta file() { @Override public Identifier identifier() { - return new Identifier(partition, bucket, file.level(), file.fileName()); + return new Identifier( + partition, + bucket, + file.level(), + file.fileName(), + file.extraFiles(), + file.embeddedIndex()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index 877ef6673af4..2c3ba2aeaab3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -77,6 +77,10 @@ public ManifestEntry convertFrom(int version, InternalRow row) { dataFileMetaSerializer.fromRow(row.getRow(4, dataFileMetaSerializer.numFields()))); } + public static Function kindGetter() { + return row -> FileKind.fromByteValue(row.getByte(1)); + } + public static Function partitionGetter() { return row -> deserializeBinaryRow(row.getBinary(2)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index 3e8b88755a79..8d33ede0c4a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -20,6 +20,8 @@ import org.apache.paimon.data.BinaryRow; +import javax.annotation.Nullable; + import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -32,6 +34,8 @@ public class SimpleFileEntry implements FileEntry { private final int bucket; private final int level; private final String fileName; + private final List extraFiles; + @Nullable private final byte[] embeddedIndex; private final BinaryRow minKey; private final BinaryRow maxKey; @@ -41,6 +45,8 @@ public SimpleFileEntry( int bucket, int level, String fileName, + List extraFiles, + @Nullable byte[] embeddedIndex, BinaryRow minKey, BinaryRow maxKey) { this.kind = kind; @@ -48,6 +54,8 @@ public SimpleFileEntry( this.bucket = bucket; this.level = level; this.fileName = fileName; + this.extraFiles = extraFiles; + this.embeddedIndex = embeddedIndex; this.minKey = minKey; this.maxKey = maxKey; } @@ -59,6 +67,8 @@ public static SimpleFileEntry from(ManifestEntry entry) { entry.bucket(), entry.level(), entry.fileName(), + entry.file().extraFiles(), + entry.file().embeddedIndex(), entry.minKey(), entry.maxKey()); } @@ -94,7 +104,7 @@ public String fileName() { @Override public Identifier identifier() { - return new Identifier(partition, bucket, level, fileName); + return new Identifier(partition, bucket, level, fileName, extraFiles, embeddedIndex); } @Override @@ -121,13 +131,14 @@ public boolean equals(Object o) { && kind == that.kind && Objects.equals(partition, that.partition) && Objects.equals(fileName, that.fileName) + && Objects.equals(extraFiles, that.extraFiles) && Objects.equals(minKey, that.minKey) && Objects.equals(maxKey, that.maxKey); } @Override public int hashCode() { - return Objects.hash(kind, partition, bucket, level, fileName, minKey, maxKey); + return Objects.hash(kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey); } @Override @@ -141,9 +152,10 @@ public String toString() { + bucket + ", level=" + level - + ", fileName='" + + ", fileName=" + fileName - + '\'' + + ", extraFiles=" + + extraFiles + ", minKey=" + minKey + ", maxKey=" diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java index 9c830816aa32..bdc89b8d4c3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java @@ -22,6 +22,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.utils.VersionedObjectSerializer; +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; /** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only supports reading. */ @@ -59,6 +60,8 @@ public SimpleFileEntry convertFrom(int version, InternalRow row) { row.getInt(2), file.getInt(10), file.getString(0).toString(), + fromStringArrayData(file.getArray(11)), + file.isNullAt(14) ? null : file.getBinary(14), deserializeBinaryRow(file.getBinary(3)), deserializeBinaryRow(file.getBinary(4))); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index dfaa7af3ff51..8b43b6408ba3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -27,7 +27,7 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.Filter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +36,9 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -64,13 +66,13 @@ public static List merge( RowType partitionType, @Nullable Integer manifestReadParallelism) { // these are the newly created manifest files, clean them up if exception occurs - List newMetas = new ArrayList<>(); + List newFilesForAbort = new ArrayList<>(); try { Optional> fullCompacted = tryFullCompaction( input, - newMetas, + newFilesForAbort, manifestFile, suggestedMetaSize, manifestFullCompactionSize, @@ -80,14 +82,14 @@ public static List merge( () -> tryMinorCompaction( input, - newMetas, + newFilesForAbort, manifestFile, suggestedMetaSize, suggestedMinMetaCount, manifestReadParallelism)); } catch (Throwable e) { // exception occurs, clean up and rethrow - for (ManifestFileMeta manifest : newMetas) { + for (ManifestFileMeta manifest : newFilesForAbort) { manifestFile.delete(manifest.fileName()); } throw new RuntimeException(e); @@ -96,7 +98,7 @@ public static List merge( private static List tryMinorCompaction( List input, - List newMetas, + List newFilesForAbort, ManifestFile manifestFile, long suggestedMetaSize, int suggestedMinMetaCount, @@ -111,7 +113,11 @@ private static List tryMinorCompaction( if (totalSize >= suggestedMetaSize) { // reach suggested file size, perform merging and produce new file mergeCandidates( - candidates, manifestFile, result, newMetas, manifestReadParallelism); + candidates, + manifestFile, + result, + newFilesForAbort, + manifestReadParallelism); candidates.clear(); totalSize = 0; } @@ -119,7 +125,8 @@ private static List tryMinorCompaction( // merge the last bit of manifests if there are too many if (candidates.size() >= suggestedMinMetaCount) { - mergeCandidates(candidates, manifestFile, result, newMetas, manifestReadParallelism); + mergeCandidates( + candidates, manifestFile, result, newFilesForAbort, manifestReadParallelism); } else { result.addAll(candidates); } @@ -148,37 +155,28 @@ private static void mergeCandidates( public static Optional> tryFullCompaction( List inputs, - List newMetas, + List newFilesForAbort, ManifestFile manifestFile, long suggestedMetaSize, long sizeTrigger, RowType partitionType, @Nullable Integer manifestReadParallelism) throws Exception { + checkArgument(sizeTrigger > 0, "Manifest full compaction size trigger cannot be zero."); + // 1. should trigger full compaction - List base = new ArrayList<>(); + Filter mustChange = + file -> file.numDeletedFiles() > 0 || file.fileSize() < suggestedMetaSize; long totalManifestSize = 0; - int i = 0; - for (; i < inputs.size(); i++) { - ManifestFileMeta file = inputs.get(i); - if (file.numDeletedFiles() == 0 && file.fileSize() >= suggestedMetaSize) { - base.add(file); - totalManifestSize += file.fileSize(); - } else { - break; - } - } - - List delta = new ArrayList<>(); long deltaDeleteFileNum = 0; long totalDeltaFileSize = 0; - for (; i < inputs.size(); i++) { - ManifestFileMeta file = inputs.get(i); - delta.add(file); + for (ManifestFileMeta file : inputs) { totalManifestSize += file.fileSize(); - deltaDeleteFileNum += file.numDeletedFiles(); - totalDeltaFileSize += file.fileSize(); + if (mustChange.test(file)) { + totalDeltaFileSize += file.fileSize(); + deltaDeleteFileNum += file.numDeletedFiles(); + } } if (totalDeltaFileSize < sizeTrigger) { @@ -188,108 +186,79 @@ public static Optional> tryFullCompaction( // 2. do full compaction LOG.info( - "Start Manifest File Full Compaction, pick the number of delete file: {}, total manifest file size: {}", + "Start Manifest File Full Compaction: totalManifestSize: {}, deltaDeleteFileNum {}, totalDeltaFileSize {}", + totalManifestSize, deltaDeleteFileNum, - totalManifestSize); + totalDeltaFileSize); - // 2.1. try to skip base files by partition filter + // 2.1. read all delete entries - Map deltaMerged = new LinkedHashMap<>(); - FileEntry.mergeEntries(manifestFile, delta, deltaMerged, manifestReadParallelism); + Set deleteEntries = + FileEntry.readDeletedEntries(manifestFile, inputs, manifestReadParallelism); + + // 2.2. try to skip base files by partition filter List result = new ArrayList<>(); - int j = 0; + List toBeMerged = new LinkedList<>(inputs); if (partitionType.getFieldCount() > 0) { - Set deletePartitions = computeDeletePartitions(deltaMerged); + Set deletePartitions = computeDeletePartitions(deleteEntries); PartitionPredicate predicate = PartitionPredicate.fromMultiple(partitionType, deletePartitions); if (predicate != null) { - for (; j < base.size(); j++) { - // TODO: optimize this to binary search. - ManifestFileMeta file = base.get(j); - if (predicate.test( + Iterator iterator = toBeMerged.iterator(); + while (iterator.hasNext()) { + ManifestFileMeta file = iterator.next(); + if (mustChange.test(file)) { + continue; + } + if (!predicate.test( file.numAddedFiles() + file.numDeletedFiles(), file.partitionStats().minValues(), file.partitionStats().maxValues(), file.partitionStats().nullCounts())) { - break; - } else { + iterator.remove(); result.add(file); } } - } else { - // There is no DELETE Entry in Delta, Base don't need compaction - j = base.size(); - result.addAll(base); } } - // 2.2. try to skip base files by reading entries + // 2.2. merge - Set deleteEntries = new HashSet<>(); - deltaMerged.forEach( - (k, v) -> { - if (v.kind() == FileKind.DELETE) { - deleteEntries.add(k); - } - }); - - List mergedEntries = new ArrayList<>(); - for (; j < base.size(); j++) { - ManifestFileMeta file = base.get(j); - boolean contains = false; - for (ManifestEntry entry : manifestFile.read(file.fileName(), file.fileSize())) { - checkArgument(entry.kind() == FileKind.ADD); - if (deleteEntries.contains(entry.identifier())) { - contains = true; - } else { - mergedEntries.add(entry); - } - } - if (contains) { - // already read this file into fullMerged - j++; - break; - } else { - mergedEntries.clear(); - result.add(file); - } + if (toBeMerged.size() <= 1) { + return Optional.empty(); } - // 2.3. merge - RollingFileWriter writer = manifestFile.createRollingWriter(); Exception exception = null; try { + for (ManifestFileMeta file : toBeMerged) { + List entries = new ArrayList<>(); + boolean requireChange = mustChange.test(file); + for (ManifestEntry entry : manifestFile.read(file.fileName(), file.fileSize())) { + if (entry.kind() == FileKind.DELETE) { + continue; + } - // 2.3.1 merge mergedEntries - for (ManifestEntry entry : mergedEntries) { - writer.write(entry); - } - mergedEntries.clear(); - - // 2.3.2 merge base files - for (ManifestEntry entry : - FileEntry.readManifestEntries( - manifestFile, base.subList(j, base.size()), manifestReadParallelism)) { - checkArgument(entry.kind() == FileKind.ADD); - if (!deleteEntries.contains(entry.identifier())) { - writer.write(entry); + if (deleteEntries.contains(entry.identifier())) { + requireChange = true; + } else { + entries.add(entry); + } } - } - // 2.3.3 merge deltaMerged - for (ManifestEntry entry : deltaMerged.values()) { - if (entry.kind() == FileKind.ADD) { - writer.write(entry); + if (requireChange) { + writer.write(entries); + } else { + result.add(file); } } } catch (Exception e) { exception = e; } finally { if (exception != null) { - IOUtils.closeQuietly(writer); + writer.abort(); throw exception; } writer.close(); @@ -297,18 +266,14 @@ public static Optional> tryFullCompaction( List merged = writer.result(); result.addAll(merged); - newMetas.addAll(merged); + newFilesForAbort.addAll(merged); return Optional.of(result); } - private static Set computeDeletePartitions( - Map deltaMerged) { + private static Set computeDeletePartitions(Set deleteEntries) { Set partitions = new HashSet<>(); - for (ManifestEntry manifestEntry : deltaMerged.values()) { - if (manifestEntry.kind() == FileKind.DELETE) { - BinaryRow partition = manifestEntry.partition(); - partitions.add(partition); - } + for (FileEntry.Identifier identifier : deleteEntries) { + partitions.add(identifier.partition); } return partitions; } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 89ed09fb5fda..86c03087a202 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -134,13 +134,13 @@ private void testCleanUp(List input, long fullCompactionThresh public void testCleanUpWithFullCompaction() throws IOException { List input = new ArrayList<>(); createData(ThreadLocalRandom.current().nextInt(5), input, null); - testCleanUp(input, 0L); + testCleanUp(input, 1L); } @Test public void testMerge() { List input = createBaseManifestFileMetas(true); - // delta with delete apply parititon 1,2 + // delta with delete apply partition 1,2 addDeltaManifests(input, true); // trigger full compaction List merged =