From 96c23e573953a4452d57c1e35bb7ce36072fced5 Mon Sep 17 00:00:00 2001 From: Yann Date: Sat, 6 Jul 2024 13:30:30 +0800 Subject: [PATCH] [spark][core] spark compact with deletion vector --- .../apache/paimon/AppendOnlyFileStore.java | 8 + .../append/AppendOnlyCompactManager.java | 144 ++++++++++++---- .../append/AppendOnlyCompactionTask.java | 58 ++++++- .../AppendOnlyTableCompactionCoordinator.java | 67 +++++++- .../paimon/append/AppendOnlyWriter.java | 26 ++- .../apache/paimon/compact/CompactResult.java | 18 ++ .../DeletionVectorIndexFileMaintainer.java | 46 +++-- .../DeletionVectorIndexFileWriter.java | 4 + .../paimon/io/DeletionFileSerializer.java | 56 ++++++ .../manifest/IndexManifestFileHandler.java | 20 ++- .../paimon/mergetree/MergeTreeWriter.java | 2 +- .../operation/AbstractFileStoreWrite.java | 11 +- .../operation/AppendOnlyFileStoreWrite.java | 159 ++++++++++++------ .../operation/KeyValueFileStoreWrite.java | 1 + .../table/AppendOnlyFileStoreTable.java | 4 +- .../paimon/table/source/DeletionFile.java | 13 ++ .../apache/paimon/utils/CommitIncrement.java | 10 ++ .../append/AppendOnlyCompactManagerTest.java | 1 + .../AppendOnlyTableCompactionITTest.java | 2 +- .../paimon/append/AppendOnlyWriterTest.java | 1 + .../paimon/append/FullCompactTaskTest.java | 2 +- ...DeletionVectorIndexFileMaintainerTest.java | 6 +- .../paimon/format/FileFormatSuffixTest.java | 2 +- .../flink/compact/UnawareBucketCompactor.java | 2 +- .../spark/procedure/CompactProcedure.java | 4 +- .../spark/commands/PaimonSparkWriter.scala | 2 +- .../paimon/spark/sql/DeletionVectorTest.scala | 90 +++++++--- 27 files changed, 599 insertions(+), 160 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/DeletionFileSerializer.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 3cd7bb3b6959b..1e50da660b54f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -19,6 +19,7 @@ package org.apache.paimon; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestCacheFilter; @@ -93,6 +94,11 @@ public AppendOnlyFileStoreWrite newWrite(String commitUser) { @Override public AppendOnlyFileStoreWrite newWrite( String commitUser, ManifestCacheFilter manifestFilter) { + DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null; + if (options.deletionVectorsEnabled()) { + deletionVectorsMaintainerFactory = + new DeletionVectorsMaintainer.Factory(newIndexFileHandler()); + } return new AppendOnlyFileStoreWrite( fileIO, newRead(), @@ -103,6 +109,8 @@ public AppendOnlyFileStoreWrite newWrite( snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), options, + bucketMode(), + deletionVectorsMaintainerFactory, tableName); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java index 7d4346bdb7821..a39c5bc9cbd0e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java @@ -23,9 +23,15 @@ import org.apache.paimon.compact.CompactFutureManager; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactTask; +import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.operation.metrics.CompactionMetrics; import org.apache.paimon.operation.metrics.MetricUtils; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.utils.Preconditions; import org.slf4j.Logger; @@ -36,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; import java.util.List; @@ -52,6 +59,7 @@ public class AppendOnlyCompactManager extends CompactFutureManager { private static final int FULL_COMPACT_MIN_FILE = 3; private final ExecutorService executor; + private final DeletionVectorIndexFileMaintainer dvIndexFileMaintainer; private final TreeSet toCompact; private final int minFileNum; private final int maxFileNum; @@ -65,12 +73,14 @@ public class AppendOnlyCompactManager extends CompactFutureManager { public AppendOnlyCompactManager( ExecutorService executor, List restored, + @Nullable DeletionVectorIndexFileMaintainer dvIndexFileMaintainer, int minFileNum, int maxFileNum, long targetFileSize, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { this.executor = executor; + this.dvIndexFileMaintainer = dvIndexFileMaintainer; this.toCompact = new TreeSet<>(fileComparator(false)); this.toCompact.addAll(restored); this.minFileNum = minFileNum; @@ -94,13 +104,20 @@ private void triggerFullCompaction() { taskFuture == null, "A compaction task is still running while the user " + "forces a new compaction. This is unexpected."); - if (toCompact.size() < FULL_COMPACT_MIN_FILE) { + // if deletion vector enables, always trigger compaction. + if (toCompact.isEmpty() + || (dvIndexFileMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE)) { return; } taskFuture = executor.submit( - new FullCompactTask(toCompact, targetFileSize, rewriter, metricsReporter)); + new FullCompactTask( + dvIndexFileMaintainer, + toCompact, + targetFileSize, + rewriter, + metricsReporter)); compacting = new ArrayList<>(toCompact); toCompact.clear(); } @@ -113,7 +130,9 @@ private void triggerCompactionWithBestEffort() { if (picked.isPresent()) { compacting = picked.get(); taskFuture = - executor.submit(new AutoCompactTask(compacting, rewriter, metricsReporter)); + executor.submit( + new AutoCompactTask( + dvIndexFileMaintainer, compacting, rewriter, metricsReporter)); } } @@ -207,17 +226,20 @@ public void close() throws IOException { /** A {@link CompactTask} impl for full compaction of append-only table. */ public static class FullCompactTask extends CompactTask { - private final LinkedList inputs; + private final DeletionVectorIndexFileMaintainer dvIndexFileMaintainer; + private final LinkedList toCompact; private final long targetFileSize; private final CompactRewriter rewriter; public FullCompactTask( + DeletionVectorIndexFileMaintainer dvIndexFileMaintainer, Collection inputs, long targetFileSize, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { super(metricsReporter); - this.inputs = new LinkedList<>(inputs); + this.dvIndexFileMaintainer = dvIndexFileMaintainer; + this.toCompact = new LinkedList<>(inputs); this.targetFileSize = targetFileSize; this.rewriter = rewriter; } @@ -225,34 +247,42 @@ public FullCompactTask( @Override protected CompactResult doCompact() throws Exception { // remove large files - while (!inputs.isEmpty()) { - DataFileMeta file = inputs.peekFirst(); - if (file.fileSize() >= targetFileSize) { - inputs.poll(); + while (!toCompact.isEmpty()) { + DataFileMeta file = toCompact.peekFirst(); + // the data file with deletion file always need to be compacted. + if (file.fileSize() >= targetFileSize && hasDeletionFile(file)) { + toCompact.poll(); continue; } break; } - // compute small files - int big = 0; - int small = 0; - for (DataFileMeta file : inputs) { - if (file.fileSize() >= targetFileSize) { - big++; + // do compaction + if (dvIndexFileMaintainer != null) { + // if deletion vector enables, always trigger compaction. + return compact(dvIndexFileMaintainer, toCompact, rewriter); + } else { + // compute small files + int big = 0; + int small = 0; + for (DataFileMeta file : toCompact) { + if (file.fileSize() >= targetFileSize) { + big++; + } else { + small++; + } + } + if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) { + return compact(dvIndexFileMaintainer, toCompact, rewriter); } else { - small++; + return result(Collections.emptyList(), Collections.emptyList()); } } + } - // do compaction - List compactBefore = new ArrayList<>(); - List compactAfter = new ArrayList<>(); - if (small > big && inputs.size() >= FULL_COMPACT_MIN_FILE) { - compactBefore = new ArrayList<>(inputs); - compactAfter = rewriter.rewrite(inputs); - } - return result(new ArrayList<>(compactBefore), compactAfter); + private boolean hasDeletionFile(DataFileMeta file) { + return dvIndexFileMaintainer != null + && dvIndexFileMaintainer.getDeletionFile(file.fileName()) == null; } } @@ -265,41 +295,87 @@ protected CompactResult doCompact() throws Exception { */ public static class AutoCompactTask extends CompactTask { + private final DeletionVectorIndexFileMaintainer dvIndexFileMaintainer; private final List toCompact; private final CompactRewriter rewriter; public AutoCompactTask( + DeletionVectorIndexFileMaintainer dvIndexFileMaintainer, List toCompact, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { super(metricsReporter); + this.dvIndexFileMaintainer = dvIndexFileMaintainer; this.toCompact = toCompact; this.rewriter = rewriter; } @Override protected CompactResult doCompact() throws Exception { + return compact(dvIndexFileMaintainer, toCompact, rewriter); + } + } + + private static CompactResult compact( + DeletionVectorIndexFileMaintainer dvIndexFileMaintainer, + List toCompact, + CompactRewriter rewriter) + throws Exception { + if (dvIndexFileMaintainer == null) { return result(toCompact, rewriter.rewrite(toCompact)); + } else { + List deletionFiles = new ArrayList<>(); + for (DataFileMeta dataFile : toCompact) { + // is the file name right? + deletionFiles.add(dvIndexFileMaintainer.getDeletionFile(dataFile.fileName())); + } + List compactAfter = rewriter.rewrite(toCompact, deletionFiles); + toCompact.forEach(f -> dvIndexFileMaintainer.notify(f.fileName())); + + List indexManifestEntries = dvIndexFileMaintainer.persist(); + if (indexManifestEntries.isEmpty()) { + return result(toCompact, compactAfter); + } else { + List indexFilesBefore = new ArrayList<>(); + List indexFilesAfter = new ArrayList<>(); + for (IndexManifestEntry entry : dvIndexFileMaintainer.persist()) { + if (entry.kind() == FileKind.ADD) { + indexFilesAfter.add(entry.indexFile()); + } else { + indexFilesBefore.add(entry.indexFile()); + } + } + return result(toCompact, indexFilesBefore, compactAfter, indexFilesAfter); + } } } private static CompactResult result(List before, List after) { - return new CompactResult() { - @Override - public List before() { - return before; - } + return new CompactResult(before, after); + } - @Override - public List after() { - return after; - } - }; + private static CompactResult result( + List before, + @Nullable List indexFilesBefore, + List after, + @Nullable List indexFilesAfter) { + CompactResult result = new CompactResult(before, after); + if (indexFilesBefore != null || indexFilesAfter != null) { + IndexIncrement indexIncrement = new IndexIncrement(indexFilesAfter, indexFilesBefore); + result.setIndexIncrement(indexIncrement); + } + return result; } /** Compact rewriter for append-only table. */ public interface CompactRewriter { List rewrite(List compactBefore) throws Exception; + + default List rewrite( + List compactBefore, List deletionFiles) + throws Exception { + throw new Exception("If call here, please implement it first"); + } } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java index 721d93eb3fd9e..3e6f122e233e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java @@ -19,18 +19,26 @@ package org.apache.paimon.append; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.utils.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** Compaction task generated by {@link AppendOnlyTableCompactionCoordinator}. */ public class AppendOnlyCompactionTask { @@ -40,9 +48,7 @@ public class AppendOnlyCompactionTask { private final List compactAfter; public AppendOnlyCompactionTask(BinaryRow partition, List files) { - Preconditions.checkArgument( - files != null && files.size() > 1, - "AppendOnlyCompactionTask need more than one file input."); + Preconditions.checkArgument(files != null); this.partition = partition; compactBefore = new ArrayList<>(files); compactAfter = new ArrayList<>(); @@ -60,8 +66,47 @@ public List compactAfter() { return compactAfter; } - public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws Exception { - compactAfter.addAll(write.compactRewriter(partition, 0).rewrite(compactBefore)); + public CommitMessage doCompact(FileStoreTable table, AppendOnlyFileStoreWrite write) + throws Exception { + boolean dvEnabled = table.coreOptions().deletionVectorsEnabled(); + Preconditions.checkArgument( + dvEnabled || compactBefore.size() > 1, + "AppendOnlyCompactionTask need more than one file input."); + IndexIncrement indexIncrement; + if (dvEnabled) { + DeletionVectorIndexFileMaintainer dvIndexFileMaintainer = + table.store() + .newIndexFileHandler() + .createDVIndexFileMaintainer( + table.snapshotManager().latestSnapshotId(), + partition, + 0, + false); + List deletionFilesBefore = + compactBefore.stream() + .map(f -> dvIndexFileMaintainer.getDeletionFile(f.fileName())) + .collect(Collectors.toList()); + compactAfter.addAll( + write.compactRewriter(partition, 0) + .rewrite(compactBefore, deletionFilesBefore)); + + compactBefore.forEach( + f -> { + dvIndexFileMaintainer.notify(f.fileName()); + }); + List indexEntries = dvIndexFileMaintainer.persist(); + Preconditions.checkArgument( + indexEntries.stream().noneMatch(i -> i.kind() == FileKind.ADD)); + List removed = + indexEntries.stream() + .map(IndexManifestEntry::indexFile) + .collect(Collectors.toList()); + indexIncrement = new IndexIncrement(Collections.emptyList(), removed); + } else { + compactAfter.addAll(write.compactRewriter(partition, 0).rewrite(compactBefore)); + indexIncrement = new IndexIncrement(Collections.emptyList()); + } + CompactIncrement compactIncrement = new CompactIncrement(compactBefore, compactAfter, Collections.emptyList()); return new CommitMessageImpl( @@ -69,7 +114,8 @@ public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws Exception 0, // bucket 0 is bucket for unaware-bucket table for compatibility with the old // design DataIncrement.emptyIncrement(), - compactIncrement); + compactIncrement, + indexIncrement); } public int hashCode() { diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java index d54221403d662..6c560c1d39844 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java @@ -21,6 +21,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.FileStoreTable; @@ -60,12 +63,15 @@ public class AppendOnlyTableCompactionCoordinator { protected static final int REMOVE_AGE = 10; protected static final int COMPACT_AGE = 5; + @Nullable private final Long snapshotId; private final InnerTableScan scan; private final long targetFileSize; private final long compactionFileSize; private final int minFileNum; private final int maxFileNum; private final boolean streamingMode; + private final IndexFileHandler indexFileHandler; + private final boolean deletionVectorEnabled; final Map partitionCompactCoordinators = new HashMap<>(); @@ -90,6 +96,7 @@ public AppendOnlyTableCompactionCoordinator( if (filter != null) { scan.withFilter(filter); } + this.snapshotId = table.snapshotManager().latestSnapshotId(); this.streamingMode = isStreaming; CoreOptions coreOptions = table.coreOptions(); this.targetFileSize = coreOptions.targetFileSize(false); @@ -97,6 +104,8 @@ public AppendOnlyTableCompactionCoordinator( this.minFileNum = coreOptions.compactionMinFileNum(); // this is global compaction, avoid too many compaction tasks this.maxFileNum = coreOptions.compactionMaxFileNum().orElse(50); + this.indexFileHandler = table.store().newIndexFileHandler(); + this.deletionVectorEnabled = coreOptions.deletionVectorsEnabled(); } public List run() { @@ -130,12 +139,23 @@ boolean scan() { @VisibleForTesting void notifyNewFiles(BinaryRow partition, List files) { + DeletionVectorIndexFileMaintainer dvIndexFileMaintainer = + indexFileHandler.createDVIndexFileMaintainer(snapshotId, partition, 0, false); + java.util.function.Predicate filter = + file -> { + if (dvIndexFileMaintainer == null + || dvIndexFileMaintainer.getDeletionFile(file.fileName()) == null) { + return file.fileSize() < compactionFileSize; + } + // if a data file has a deletion file, always be to compact. + return true; + }; + List toCompact = files.stream().filter(filter).collect(Collectors.toList()); partitionCompactCoordinators - .computeIfAbsent(partition, PartitionCompactCoordinator::new) - .addFiles( - files.stream() - .filter(file -> file.fileSize() < compactionFileSize) - .collect(Collectors.toList())); + .computeIfAbsent( + partition, + pp -> new PartitionCompactCoordinator(dvIndexFileMaintainer, partition)) + .addFiles(toCompact); } @VisibleForTesting @@ -181,11 +201,14 @@ private Map compactScanType() { /** Coordinator for a single partition. */ class PartitionCompactCoordinator { + private final DeletionVectorIndexFileMaintainer dvIndexFileMaintainer; private final BinaryRow partition; - private final HashSet toCompact = new HashSet<>(); + private final List toCompact = new ArrayList<>(); int age = 0; - public PartitionCompactCoordinator(BinaryRow partition) { + public PartitionCompactCoordinator( + DeletionVectorIndexFileMaintainer dvIndexFileMaintainer, BinaryRow partition) { + this.dvIndexFileMaintainer = dvIndexFileMaintainer; this.partition = partition; } @@ -216,7 +239,12 @@ public boolean readyToRemove() { } private List> agePack() { - List> packed = pack(); + List> packed; + if (dvIndexFileMaintainer == null) { + packed = pack(toCompact); + } else { + packed = packInDeletionVectorVMode(toCompact); + } if (packed.isEmpty()) { // non-packed, we need to grow up age, and check whether to compact once if (++age > COMPACT_AGE && toCompact.size() > 1) { @@ -230,7 +258,7 @@ private List> agePack() { return packed; } - private List> pack() { + private List> pack(List toCompact) { // we compact smaller files first // step 1, sort files by file size, pick the smaller first ArrayList files = new ArrayList<>(toCompact); @@ -252,6 +280,27 @@ private List> pack() { return result; } + private List> packInDeletionVectorVMode(List toCompact) { + // we group the data files by their related index files. + Map> filesWithDV = new HashMap<>(); + List rest = new ArrayList<>(); + for (DataFileMeta dataFile : toCompact) { + IndexFileMeta indexFile = dvIndexFileMaintainer.getIndexFile(dataFile.fileName()); + if (indexFile == null) { + rest.add(dataFile); + } else { + filesWithDV.computeIfAbsent(indexFile, f -> new ArrayList<>()).add(dataFile); + } + } + + List> result = new ArrayList<>(); + result.addAll(filesWithDV.values()); + if (rest.size() > 1) { + result.addAll(pack(rest)); + } + return result; + } + /** * A file bin for {@link PartitionCompactCoordinator} determine whether ready to compact. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 17ebe215ee1ae..a52b2875371de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -27,10 +27,12 @@ import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.io.RowDataRollingFileWriter; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.memory.MemoryOwner; @@ -75,6 +77,8 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final List deletedFiles; private final List compactBefore; private final List compactAfter; + private final List indexFilesBefore; + private final List indexFilesAfter; private final LongCounter seqNumCounter; private final String fileCompression; private final String spillCompression; @@ -121,6 +125,8 @@ public AppendOnlyWriter( this.deletedFiles = new ArrayList<>(); this.compactBefore = new ArrayList<>(); this.compactAfter = new ArrayList<>(); + this.indexFilesBefore = new ArrayList<>(); + this.indexFilesAfter = new ArrayList<>(); this.seqNumCounter = new LongCounter(maxSequenceNumber + 1); this.fileCompression = fileCompression; this.spillCompression = spillCompression; @@ -139,6 +145,10 @@ public AppendOnlyWriter( deletedFiles.addAll(increment.newFilesIncrement().deletedFiles()); compactBefore.addAll(increment.compactIncrement().compactBefore()); compactAfter.addAll(increment.compactIncrement().compactAfter()); + if (increment.indexIncrement() != null) { + indexFilesBefore.addAll(increment.indexIncrement().deletedIndexFiles()); + indexFilesAfter.addAll(increment.indexIncrement().newIndexFiles()); + } } } @@ -276,6 +286,11 @@ private void trySyncLatestCompaction(boolean blocking) result -> { compactBefore.addAll(result.before()); compactAfter.addAll(result.after()); + if (result.indexIncrement() != null) { + indexFilesBefore.addAll( + result.indexIncrement().deletedIndexFiles()); + indexFilesAfter.addAll(result.indexIncrement().newIndexFiles()); + } }); } @@ -291,12 +306,21 @@ private CommitIncrement drainIncrement() { new ArrayList<>(compactAfter), Collections.emptyList()); + IndexIncrement indexIncrement = null; + if (!indexFilesBefore.isEmpty() || !indexFilesAfter.isEmpty()) { + indexIncrement = + new IndexIncrement( + new ArrayList<>(indexFilesAfter), new ArrayList<>(indexFilesBefore)); + } + newFiles.clear(); deletedFiles.clear(); compactBefore.clear(); compactAfter.clear(); + indexFilesBefore.clear(); + indexFilesAfter.clear(); - return new CommitIncrement(dataIncrement, compactIncrement, null); + return new CommitIncrement(dataIncrement, compactIncrement, indexIncrement, null); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java index 08d7de5dab7fc..4b6cdf872f829 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java @@ -19,6 +19,8 @@ package org.apache.paimon.compact; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; @@ -32,6 +34,7 @@ public class CompactResult { private final List before; private final List after; private final List changelog; + @Nullable private IndexIncrement indexIncrement; @Nullable private CompactDeletionFile deletionFile; @@ -66,10 +69,25 @@ public List changelog() { return changelog; } + public void setIndexIncrement(@Nullable IndexIncrement indexIncrement) { + Preconditions.checkArgument( + deletionFile == null, + "indexIncrement and deletionFile can't be set at the same time"); + this.indexIncrement = indexIncrement; + } + public void setDeletionFile(@Nullable CompactDeletionFile deletionFile) { + Preconditions.checkArgument( + indexIncrement == null, + "indexIncrement and deletionFile can't be set at the same time"); this.deletionFile = deletionFile; } + @Nullable + public IndexIncrement indexIncrement() { + return indexIncrement; + } + @Nullable public CompactDeletionFile deletionFile() { return deletionFile; diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java index 39a0c75921ffa..b037f3af6ffeb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java @@ -42,6 +42,7 @@ public class DeletionVectorIndexFileMaintainer { private final BinaryRow partition; private final int bucket; + private final Map dataFileToDeletionFile; private final Map indexNameToEntry = new HashMap<>(); private final Map> indexFileToDeletionFiles = new HashMap<>(); @@ -68,15 +69,14 @@ public DeletionVectorIndexFileMaintainer( } else { this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create(); } - Map dataFileToDeletionFiles = - indexFileHandler.scanDVIndex(snapshotId, partition, bucket); - init(dataFileToDeletionFiles); + this.dataFileToDeletionFile = indexFileHandler.scanDVIndex(snapshotId, partition, bucket); + init(this.dataFileToDeletionFile); } @VisibleForTesting - public void init(Map dataFileToDeletionFiles) { + public void init(Map dataFileToDeletionFile) { List touchedIndexFileNames = - dataFileToDeletionFiles.values().stream() + dataFileToDeletionFile.values().stream() .map(deletionFile -> new Path(deletionFile.path()).getName()) .distinct() .collect(Collectors.toList()); @@ -87,8 +87,8 @@ public void init(Map dataFileToDeletionFiles) { indexManifestEntry.indexFile().fileName())) .forEach(entry -> indexNameToEntry.put(entry.indexFile().fileName(), entry)); - for (String dataFile : dataFileToDeletionFiles.keySet()) { - DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile); + for (String dataFile : dataFileToDeletionFile.keySet()) { + DeletionFile deletionFile = dataFileToDeletionFile.get(dataFile); String indexFileName = new Path(deletionFile.path()).getName(); if (!indexFileToDeletionFiles.containsKey(indexFileName)) { indexFileToDeletionFiles.put(indexFileName, new HashMap<>()); @@ -106,17 +106,37 @@ public int getBucket() { return this.bucket; } - public void notifyDeletionFiles(String dataFile, DeletionVector deletionVector) { - DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); - DeletionFile previous = null; + public DeletionFile getDeletionFile(String dataFile) { + return this.dataFileToDeletionFile.get(dataFile); + } + + public IndexFileMeta getIndexFile(String dataFile) { + DeletionFile deletionFile = getDeletionFile(dataFile); + if (deletionFile == null) { + return null; + } else { + IndexManifestEntry entry = + this.indexNameToEntry.get(new Path(deletionFile.path()).getName()); + return entry == null ? null : entry.indexFile(); + } + } + + public DeletionFile notify(String dataFile) { if (dataFileToIndexFile.containsKey(dataFile)) { String indexFileName = dataFileToIndexFile.get(dataFile); touchedIndexFiles.add(indexFileName); if (indexFileToDeletionFiles.containsKey(indexFileName)) { - previous = indexFileToDeletionFiles.get(indexFileName).remove(dataFile); + return indexFileToDeletionFiles.get(indexFileName).remove(dataFile); } } + return null; + } + + public void notifyNewDeletionVector(String dataFile, DeletionVector deletionVector) { + DeletionFile previous = notify(dataFile); if (previous != null) { + DeletionVectorsIndexFile deletionVectorsIndexFile = + indexFileHandler.deletionVectorsIndex(); deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile, previous)); } maintainer.notifyNewDeletion(dataFile, deletionVector); @@ -146,7 +166,7 @@ public List persist() { return result; } - public List writeUnchangedDeletionVector() { + private List writeUnchangedDeletionVector() { DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); List newIndexEntries = new ArrayList<>(); for (String indexFile : indexFileToDeletionFiles.keySet()) { @@ -158,7 +178,7 @@ public List writeUnchangedDeletionVector() { indexFileToDeletionFiles.get(indexFile); if (!dataFileToDeletionFiles.isEmpty()) { List newIndexFiles = - indexFileHandler.writeDeletionVectorsIndex( + deletionVectorsIndexFile.write( deletionVectorsIndexFile.readDeletionVector( dataFileToDeletionFiles)); newIndexFiles.forEach( diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java index 4dddd6ba162e4..fd4f6f4ccf167 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java @@ -54,6 +54,10 @@ public DeletionVectorIndexFileWriter( this.targetSizeInBytes = targetSizePerIndexFile.getBytes(); } + /** + * For unaware-bucket mode, this method will write out multiple index files, else, it will write + * out only one index file. + */ public List write(Map input) throws IOException { if (input.isEmpty()) { return emptyIndexFile(); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DeletionFileSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DeletionFileSerializer.java new file mode 100644 index 0000000000000..4b927b1af0f5c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DeletionFileSerializer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.utils.ObjectSerializer; + +/** DeletionFileSerializer. */ +public class DeletionFileSerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + private static final GenericRow NULL_DELETION_FILE = + GenericRow.of(BinaryString.fromString(""), -1L, -1L); + + public DeletionFileSerializer() { + super(DeletionFile.SCHEMA); + } + + @Override + public InternalRow toRow(DeletionFile record) { + if (record == null) { + return NULL_DELETION_FILE; + } + return GenericRow.of( + BinaryString.fromString(record.path()), record.offset(), record.length()); + } + + @Override + public DeletionFile fromRow(InternalRow rowData) { + if (rowData.getString(0).toString().equals("")) { + return null; + } + return new DeletionFile( + rowData.getString(0).toString(), rowData.getLong(1), rowData.getLong(2)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java index 89fbb608092c6..1651900125d59 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; @@ -140,12 +141,19 @@ public List combine( indexEntries.put(identifier(entry), entry); } - for (IndexManifestEntry entry : newIndexFiles) { - if (entry.kind() == FileKind.ADD) { - indexEntries.put(identifier(entry), entry); - } else { - indexEntries.remove(identifier(entry)); - } + List removed = + newIndexFiles.stream() + .filter(f -> f.kind() == FileKind.DELETE) + .collect(Collectors.toList()); + List added = + newIndexFiles.stream() + .filter(f -> f.kind() == FileKind.ADD) + .collect(Collectors.toList()); + for (IndexManifestEntry entry : removed) { + indexEntries.remove(identifier(entry)); + } + for (IndexManifestEntry entry : added) { + indexEntries.put(identifier(entry), entry); } return new ArrayList<>(indexEntries.values()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index aa60c884660fa..66ba6743f186b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -317,7 +317,7 @@ private CommitIncrement drainIncrement() { compactChangelog.clear(); this.compactDeletionFile = null; - return new CommitIncrement(dataIncrement, compactIncrement, drainDeletionFile); + return new CommitIncrement(dataIncrement, compactIncrement, null, drainDeletionFile); } private void trySyncLatestCompaction(boolean blocking) throws Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 459fec027f730..9ed1841393675 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -208,6 +208,7 @@ public List prepareCommit(boolean waitCompaction, long commitIden WriterContainer writerContainer = entry.getValue(); CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction); + List deletedIndexFiles = new ArrayList<>(); List newIndexFiles = new ArrayList<>(); if (writerContainer.indexMaintainer != null) { newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit()); @@ -216,13 +217,17 @@ public List prepareCommit(boolean waitCompaction, long commitIden if (compactDeletionFile != null) { compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add); } + if (increment.indexIncrement() != null) { + newIndexFiles.addAll(increment.indexIncrement().newIndexFiles()); + deletedIndexFiles.addAll(increment.indexIncrement().deletedIndexFiles()); + } CommitMessageImpl committable = new CommitMessageImpl( partition, bucket, increment.newFilesIncrement(), increment.compactIncrement(), - new IndexIncrement(newIndexFiles)); + new IndexIncrement(newIndexFiles, deletedIndexFiles)); result.add(committable); if (committable.isEmpty()) { @@ -333,6 +338,7 @@ public void restore(List> states) { for (State state : states) { RecordWriter writer = createWriter( + state.baseSnapshotId, state.partition, state.bucket, state.dataFiles, @@ -408,6 +414,7 @@ public WriterContainer createWriterContainer( ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); RecordWriter writer = createWriter( + latestSnapshotId, partition.copy(), bucket, restoreFiles, @@ -434,6 +441,7 @@ public FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry) { private List scanExistingFileMetas( long snapshotId, BinaryRow partition, int bucket) { + // datafile and deletionfile ? List existingFileMetas = new ArrayList<>(); // Concat all the DataFileMeta of existing files into existingFileMetas. scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files().stream() @@ -460,6 +468,7 @@ public ExecutorService getCompactExecutor() { protected void notifyNewWriter(RecordWriter writer) {} protected abstract RecordWriter createWriter( + @Nullable Long snapshotId, BinaryRow partition, int bucket, List restoreFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 744a130caf244..e0d6dfc2af2a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -26,6 +26,7 @@ import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; @@ -35,10 +36,12 @@ import org.apache.paimon.io.RowDataRollingFileWriter; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FileStorePathFactory; @@ -80,10 +83,9 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite private final MemorySize maxDiskSize; private final SimpleColStatsCollector.Factory[] statsCollectors; private final FileIndexOptions fileIndexOptions; - + private final BucketMode bucketMode; private boolean forceBufferSpill = false; private boolean skipCompaction; - private BucketMode bucketMode = BucketMode.HASH_FIXED; public AppendOnlyFileStoreWrite( FileIO fileIO, @@ -95,14 +97,30 @@ public AppendOnlyFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, + BucketMode bucketMode, + @Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory, String tableName) { - super(commitUser, snapshotManager, scan, options, null, null, tableName); + super( + commitUser, + snapshotManager, + scan, + options, + null, + deletionVectorsMaintainerFactory, + tableName); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; this.rowType = rowType; this.fileFormat = options.fileFormat(); this.pathFactory = pathFactory; + this.bucketMode = bucketMode; + // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act difference in + // unaware-bucket mode (no compaction and force empty-writer). + if (bucketMode == BucketMode.BUCKET_UNAWARE) { + super.withIgnorePreviousFiles(true); + skipCompaction = true; + } this.targetFileSize = options.targetFileSize(false); this.compactionMinFileNum = options.compactionMinFileNum(); this.compactionMaxFileNum = options.compactionMaxFileNum().orElse(5); @@ -120,13 +138,25 @@ public AppendOnlyFileStoreWrite( @Override protected RecordWriter createWriter( + @Nullable Long snapshotId, BinaryRow partition, int bucket, List restoredFiles, long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer ignore) { + @Nullable DeletionVectorsMaintainer dvMaintainer) { + DeletionVectorIndexFileMaintainer dvIndexFileMaintainer = null; + if (dvMaintainer != null) { + dvIndexFileMaintainer = + dvMaintainer + .indexFileHandler() + .createDVIndexFileMaintainer( + snapshotId, + partition, + bucket, + bucketMode != BucketMode.BUCKET_UNAWARE); + } // let writer and compact manager hold the same reference // and make restore files mutable to update DataFilePathFactory factory = pathFactory.createDataFilePathFactory(partition, bucket); @@ -136,6 +166,7 @@ protected RecordWriter createWriter( : new AppendOnlyCompactManager( compactExecutor, restoredFiles, + dvIndexFileMaintainer, compactionMinFileNum, compactionMaxFileNum, targetFileSize, @@ -169,58 +200,85 @@ protected RecordWriter createWriter( public AppendOnlyCompactManager.CompactRewriter compactRewriter( BinaryRow partition, int bucket) { - return toCompact -> { - if (toCompact.isEmpty()) { - return Collections.emptyList(); + return new AppendOnlyCompactManager.CompactRewriter() { + @Override + public List rewrite(List toCompact) throws Exception { + if (toCompact.isEmpty()) { + return Collections.emptyList(); + } + RowDataRollingFileWriter rewriter = createRewriter(toCompact); + try { + rewriter.write(bucketReader(partition, bucket).read(toCompact)); + } finally { + rewriter.close(); + } + return rewriter.result(); + } + + @Override + public List rewrite( + List toCompact, List deletionFiles) + throws Exception { + if (toCompact.isEmpty()) { + return Collections.emptyList(); + } + RowDataRollingFileWriter rewriter = createRewriter(toCompact); + try { + rewriter.write(bucketReader(partition, bucket).read(toCompact, deletionFiles)); + } finally { + rewriter.close(); + } + return rewriter.result(); } - RowDataRollingFileWriter rewriter = - new RowDataRollingFileWriter( - fileIO, - schemaId, - fileFormat, - targetFileSize, - rowType, - pathFactory.createDataFilePathFactory(partition, bucket), - new LongCounter(toCompact.get(0).minSequenceNumber()), - fileCompression, - statsCollectors, - fileIndexOptions, - FileSource.COMPACT, - options.asyncFileWrite()); - try { - rewriter.write(bucketReader(partition, bucket).read(toCompact)); - } finally { - rewriter.close(); + + private RowDataRollingFileWriter createRewriter(List toCompact) { + return new RowDataRollingFileWriter( + fileIO, + schemaId, + fileFormat, + targetFileSize, + rowType, + pathFactory.createDataFilePathFactory(partition, bucket), + new LongCounter(toCompact.get(0).minSequenceNumber()), + fileCompression, + statsCollectors, + fileIndexOptions, + FileSource.COMPACT, + options.asyncFileWrite()); } - return rewriter.result(); }; } public BucketFileRead bucketReader(BinaryRow partition, int bucket) { - return files -> - new RecordReaderIterator<>( - read.createReader( - DataSplit.builder() - .withPartition(partition) - .withBucket(bucket) - .withDataFiles(files) - .rawConvertible(true) - .withBucketPath( - pathFactory - .bucketPath(partition, bucket) - .toString()) - .build())); - } + return new BucketFileRead() { + @Override + public RecordReaderIterator read(List files) + throws IOException { + return new RecordReaderIterator<>(reader(files, null)); + } - public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) { - // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act difference in - // unaware-bucket mode (no compaction and force empty-writer). - this.bucketMode = bucketMode; - if (bucketMode == BucketMode.BUCKET_UNAWARE) { - super.withIgnorePreviousFiles(true); - skipCompaction = true; - } - return this; + @Override + public RecordReaderIterator read( + List files, List deletionFiles) throws IOException { + return new RecordReaderIterator<>(reader(files, deletionFiles)); + } + + private RecordReader reader( + List files, List deletionFiles) throws IOException { + DataSplit.Builder builder = + DataSplit.builder() + .withPartition(partition) + .withBucket(bucket) + .withDataFiles(files) + .rawConvertible(true) + .withBucketPath( + pathFactory.bucketPath(partition, bucket).toString()); + if (deletionFiles != null) { + builder.withDataDeletionFiles(deletionFiles); + } + return read.createReader(builder.build()); + } + }; } @Override @@ -248,5 +306,8 @@ protected void forceBufferSpill() throws Exception { /** Read for one bucket. */ public interface BucketFileRead { RecordReaderIterator read(List files) throws IOException; + + RecordReaderIterator read( + List files, List deletionFiles) throws IOException; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 5f21d3c1aa956..9f4570cf2a43d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -172,6 +172,7 @@ public KeyValueFileStoreWrite( @Override protected MergeTreeWriter createWriter( + @Nullable Long snapshotId, BinaryRow partition, int bucket, List restoreFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 0af78a5dac8b8..3d273e5225355 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -135,9 +135,7 @@ public TableWriteImpl newWrite(String commitUser) { @Override public TableWriteImpl newWrite( String commitUser, ManifestCacheFilter manifestFilter) { - // if this table is unaware-bucket table, we skip compaction and restored files searching - AppendOnlyFileStoreWrite writer = - store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode()); + AppendOnlyFileStoreWrite writer = store().newWrite(commitUser, manifestFilter); return new TableWriteImpl<>( rowType(), writer, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java index 94dfc615729c8..d84aae87632df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java @@ -22,18 +22,24 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import static org.apache.paimon.utils.SerializationUtils.newStringType; + /** * Deletion file for data file, the first 4 bytes are length, should, the following is the bitmap * content. @@ -47,6 +53,13 @@ @Public public class DeletionFile implements Serializable { + public static final RowType SCHEMA = + new RowType( + Arrays.asList( + new DataField(0, "_PATH", newStringType(false)), + new DataField(1, "_OFFSET", new BigIntType(false)), + new DataField(2, "_LENGTH", new BigIntType(false)))); + private static final long serialVersionUID = 1L; private final String path; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java index 3c16378f8f571..d6ba2f376fd5f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java @@ -21,6 +21,7 @@ import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; import javax.annotation.Nullable; @@ -29,14 +30,18 @@ public class CommitIncrement { private final DataIncrement dataIncrement; private final CompactIncrement compactIncrement; + @Nullable private final IndexIncrement indexIncrement; @Nullable private final CompactDeletionFile compactDeletionFile; public CommitIncrement( DataIncrement dataIncrement, CompactIncrement compactIncrement, + @Nullable IndexIncrement indexIncrement, @Nullable CompactDeletionFile compactDeletionFile) { + Preconditions.checkArgument(indexIncrement == null || compactDeletionFile == null); this.dataIncrement = dataIncrement; this.compactIncrement = compactIncrement; + this.indexIncrement = indexIncrement; this.compactDeletionFile = compactDeletionFile; } @@ -48,6 +53,11 @@ public CompactIncrement compactIncrement() { return compactIncrement; } + @Nullable + public IndexIncrement indexIncrement() { + return indexIncrement; + } + @Nullable public CompactDeletionFile compactDeletionFile() { return compactDeletionFile; diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java index 68bc4db9cfffe..915ec8a4a3af4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java @@ -203,6 +203,7 @@ private void innerTest( new AppendOnlyCompactManager( null, // not used toCompactBeforePick, + null, minFileNum, maxFileNum, targetFileSize, diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java index a67a081a0a8ff..d6d93a438bff9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java @@ -165,7 +165,7 @@ private List writeCommit(int number) throws Exception { private List doCompact(List tasks) throws Exception { List result = new ArrayList<>(); for (AppendOnlyCompactionTask task : tasks) { - result.add(task.doCompact(write)); + result.add(task.doCompact(appendOnlyFileStoreTable, write)); } return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index b040cda1b2274..57136510fc620 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -595,6 +595,7 @@ private Pair> createWriter( Executors.newSingleThreadScheduledExecutor( new ExecutorThreadFactory("compaction-thread")), toCompact, + null, MIN_FILE_NUM, MAX_FILE_NUM, targetFileSize, diff --git a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java index 5e474ccd48f47..1256d779b9329 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java @@ -123,7 +123,7 @@ public MockFullCompactTask( Collection inputs, long targetFileSize, AppendOnlyCompactManager.CompactRewriter rewriter) { - super(inputs, targetFileSize, rewriter, null); + super(null, inputs, targetFileSize, rewriter, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java index f78e39dfbb670..4fe76d223b75c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java @@ -72,14 +72,14 @@ public void test() throws Exception { store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1, dataFileToDeletionFiles); // no dv should be rewritten, because nothing is changed. - List res = dvIFMaintainer.writeUnchangedDeletionVector(); + List res = dvIFMaintainer.persist(); assertThat(res.size()).isEqualTo(0); // the dv of f3 is updated, and the index file that contains the dv of f3 should be marked // as REMOVE. dvIFMaintainer.notifyDeletionFiles( Collections.singletonMap("f3", dataFileToDeletionFiles.get("f3"))); - res = dvIFMaintainer.writeUnchangedDeletionVector(); + res = dvIFMaintainer.persist(); assertThat(res.size()).isEqualTo(1); assertThat(res.get(0).kind()).isEqualTo(FileKind.DELETE); @@ -87,7 +87,7 @@ public void test() throws Exception { // the dv of f2 need to be rewritten, and this index file should be marked as REMOVE. dvIFMaintainer.notifyDeletionFiles( Collections.singletonMap("f1", dataFileToDeletionFiles.get("f1"))); - res = dvIFMaintainer.writeUnchangedDeletionVector(); + res = dvIFMaintainer.persist(); assertThat(res.size()).isEqualTo(3); IndexManifestEntry entry = res.stream().filter(file -> file.kind() == FileKind.ADD).findAny().get(); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index e2ccd063e8785..9bb1c86ee182c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -79,7 +79,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception SCHEMA, 0, new AppendOnlyCompactManager( - null, toCompact, 4, 10, 10, null, null), // not used + null, toCompact, null, 4, 10, 10, null, null), // not used null, false, dataFilePathFactory, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java index 1f05321f2fa72..6b801bb0d3cc6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java @@ -60,7 +60,7 @@ public UnawareBucketCompactor( } public void processElement(AppendOnlyCompactionTask task) throws Exception { - result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(write))); + result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(table, write))); } public void close() throws Exception { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index eaf7e3a00e1f7..018921e1c8edf 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -369,7 +369,7 @@ private void compactUnAwareBucketTable( taskIterator.next()); messages.add( messageSer.serialize( - task.doCompact(write))); + task.doCompact(table, write))); } return messages.iterator(); } finally { @@ -384,7 +384,7 @@ private void compactUnAwareBucketTable( for (byte[] serializedMessage : serializedMessages) { messages.add( messageSerializerser.deserialize( - serializer.getVersion(), serializedMessage)); + messageSerializerser.getVersion(), serializedMessage)); } commit.commit(messages); } catch (Exception e) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index a5681e54e979c..362403c57ca5c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -229,7 +229,7 @@ case class PaimonSparkWriter(table: FileStoreTable) { sdv.dataFileAndDeletionVector.foreach { case (dataFileName, dv) => - dvIndexFileMaintainer.notifyDeletionFiles( + dvIndexFileMaintainer.notifyNewDeletionVector( dataFileName, DeletionVector.deserializeFromBytes(dv)) } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index 26d07ce06dad7..719117bcc2dea 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -58,7 +58,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')") - val deletionVectors1 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" @@ -67,7 +67,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Nil) - val deletionVectors2 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors2 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertEquals(1, deletionVectors2.size) deletionVectors2 .foreach { @@ -79,7 +79,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Row(4, "d") :: Row(5, "e") :: Nil) - val deletionVectors3 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors3 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertTrue(deletionVectors2 == deletionVectors3) val cond2 = "id % 2 = 1" @@ -94,6 +94,16 @@ class DeletionVectorTest extends PaimonSparkTestBase { Row(1, "_all") :: Row(2, "_all") :: Row(3, "_all") :: Row(4, "_all") :: Row( 5, "_all") :: Nil) + + spark.sql("CALL sys.compact('T')") + val deletionVectors4 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + // After compaction, deletionVectors should be empty + Assertions.assertTrue(deletionVectors4.isEmpty) + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "_all") :: Row(2, "_all") :: Row(3, "_all") :: Row(4, "_all") :: Row( + 5, + "_all") :: Nil) } } } @@ -120,7 +130,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { spark.sql( "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')") - val deletionVectors1 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" @@ -169,6 +179,17 @@ class DeletionVectorTest extends PaimonSparkTestBase { case (filePath, dv) => rowMetaInfo2(filePath).foreach(index => Assertions.assertTrue(dv.isDeleted(index))) } + + spark.sql("CALL sys.compact('T')") + val deletionVectors5 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + // After compaction, deletionVectors should be empty + Assertions.assertTrue(deletionVectors5.isEmpty) + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a", "2024") :: Row(2, "b_2", "2024") :: Row(3, "c_2", "2025") :: Row( + 4, + "d_2", + "2025") :: Nil) } } } @@ -194,14 +215,14 @@ class DeletionVectorTest extends PaimonSparkTestBase { new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')") - val deletionVectors1 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" val rowMetaInfo1 = getFilePathAndRowIndex(cond1) spark.sql(s"DELETE FROM T WHERE $cond1") checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Nil) - val deletionVectors2 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors2 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertEquals(1, deletionVectors2.size) deletionVectors2 .foreach { @@ -213,12 +234,18 @@ class DeletionVectorTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Row(2, "bb") :: Row(3, "c") :: Row(4, "d") :: Nil) - val deletionVectors3 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors3 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertTrue(deletionVectors2 == deletionVectors3) val cond2 = "id % 2 = 1" spark.sql(s"DELETE FROM T WHERE $cond2") checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") :: Row(4, "d") :: Nil) + + spark.sql("CALL sys.compact('T')") + val deletionVectors4 = getAllLatestDeletionVectors(table, dvMaintainerFactory) + // After compaction, deletionVectors should be empty + Assertions.assertTrue(deletionVectors4.isEmpty) + checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") :: Row(4, "d") :: Nil) } } } @@ -243,9 +270,16 @@ class DeletionVectorTest extends PaimonSparkTestBase { val dvMaintainerFactory = new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + def getDeletionVectors(ptValues: Seq[String]): Map[String, DeletionVector] = { + getLatestDeletionVectors( + table, + dvMaintainerFactory, + ptValues.map(BinaryRow.singleColumn)) + } + spark.sql( "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')") - val deletionVectors1 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) Assertions.assertEquals(0, deletionVectors1.size) val cond1 = "id = 2" @@ -254,11 +288,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025") :: Nil) - val deletionVectors2 = - getLatestDeletionVectors( - table, - dvMaintainerFactory, - Seq(BinaryRow.singleColumn("2024"), BinaryRow.singleColumn("2025"))) + val deletionVectors2 = getDeletionVectors(Seq("2024", "2025")) Assertions.assertEquals(1, deletionVectors2.size) deletionVectors2 .foreach { @@ -272,22 +302,28 @@ class DeletionVectorTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil) - val deletionVectors3 = - getLatestDeletionVectors( - table, - dvMaintainerFactory, - Seq(BinaryRow.singleColumn("2024"))) + val deletionVectors3 = getDeletionVectors(Seq("2024")) Assertions.assertTrue(deletionVectors2 == deletionVectors3) - val deletionVectors4 = - getLatestDeletionVectors( - table, - dvMaintainerFactory, - Seq(BinaryRow.singleColumn("2024"), BinaryRow.singleColumn("2025"))) + val deletionVectors4 = getDeletionVectors(Seq("2024", "2025")) deletionVectors4 .foreach { case (filePath, dv) => rowMetaInfo2(filePath).foreach(index => Assertions.assertTrue(dv.isDeleted(index))) } + + spark.sql("""CALL sys.compact(table => 'T', partitions => "pt = '2024'")""") + Assertions.assertTrue(getDeletionVectors(Seq("2024")).isEmpty) + Assertions.assertTrue(getDeletionVectors(Seq("2025")).nonEmpty) + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil) + + spark.sql("""CALL sys.compact(table => 'T', where => "pt = '2025'")""") + Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty) + Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty) + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil) } } } @@ -317,7 +353,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { val dvMaintainerFactory = new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) - val deletionVectors1 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) // 1, 3 deleted, their row positions are 0, 2 Assertions.assertEquals(1, deletionVectors1.size) deletionVectors1 @@ -330,7 +366,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { // Compact // f3 (1, 2, 3), no deletion spark.sql("CALL sys.compact('T')") - val deletionVectors2 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors2 = getAllLatestDeletionVectors(table, dvMaintainerFactory) // After compaction, deletionVectors should be empty Assertions.assertTrue(deletionVectors2.isEmpty) @@ -342,7 +378,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil) - val deletionVectors3 = getLatestDeletionVectors(table, dvMaintainerFactory) + val deletionVectors3 = getAllLatestDeletionVectors(table, dvMaintainerFactory) // 2 deleted, row positions is 1 Assertions.assertEquals(1, deletionVectors3.size) deletionVectors3 @@ -488,7 +524,7 @@ class DeletionVectorTest extends PaimonSparkTestBase { new Path(path).getName } - private def getLatestDeletionVectors( + private def getAllLatestDeletionVectors( table: FileStoreTable, dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String, DeletionVector] = { getLatestDeletionVectors(table, dvMaintainerFactory, Seq(BinaryRow.EMPTY_ROW))