diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 4394e5ad26966..57315a3a5900c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -24,6 +24,8 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.HashIndexMaintainer; import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; +import org.apache.paimon.index.delete.DeleteMapIndexMaintainer; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; @@ -147,6 +149,10 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma if (bucketMode() == BucketMode.DYNAMIC) { indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler()); } + IndexMaintainer.Factory deleteMapFactory = null; + if (options.deleteMapEnabled()) { + deleteMapFactory = new DeleteMapIndexMaintainer.Factory(newIndexFileHandler()); + } return new KeyValueFileStoreWrite( fileIO, schemaManager, @@ -162,6 +168,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), indexFactory, + deleteMapFactory, options, keyValueFieldsExtractor, tableName); diff --git a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java index d68c914e4f123..909806f6d8848 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/delete/DeleteMapIndexMaintainer.java @@ -19,6 +19,7 @@ package org.apache.paimon.index.delete; import org.apache.paimon.KeyValue; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; @@ -115,6 +116,12 @@ public Optional indexOf(String fileName) { return Optional.ofNullable(deleteMap.get(fileName)); } + @VisibleForTesting + public Map deleteMap() { + restoreDeleteMap(); + return deleteMap; + } + /** Factory to restore {@link DeleteMapIndexMaintainer}. */ public static class Factory implements IndexMaintainer.Factory { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index 9f8b7a196547d..4f691dd16602a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -68,6 +68,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { private final LookupStoreFactory lookupStoreFactory; private final Cache lookupFiles; private final Function bfGenerator; + private final boolean deleteMapEnabled; public LookupLevels( Levels levels, @@ -79,7 +80,8 @@ public LookupLevels( LookupStoreFactory lookupStoreFactory, Duration fileRetention, MemorySize maxDiskSize, - Function bfGenerator) { + Function bfGenerator, + boolean deleteMapEnabled) { this.levels = levels; this.keyComparator = keyComparator; this.keySerializer = new RowCompactedSerializer(keyType); @@ -96,6 +98,7 @@ public LookupLevels( .executor(MoreExecutors.directExecutor()) .build(); this.bfGenerator = bfGenerator; + this.deleteMapEnabled = deleteMapEnabled; levels.addDropFileCallback(this); } @@ -145,11 +148,19 @@ private KeyValue lookup(InternalRow key, DataFileMeta file) throws IOException { return null; } InternalRow value = valueSerializer.deserialize(valueBytes); - long sequenceNumber = MemorySegment.wrap(valueBytes).getLong(valueBytes.length - 9); + long sequenceNumber = + MemorySegment.wrap(valueBytes).getLongBigEndian(valueBytes.length - 9); RowKind rowKind = RowKind.fromByteValue(valueBytes[valueBytes.length - 1]); - return new KeyValue() - .replace(key, sequenceNumber, rowKind, value) - .setLevel(lookupFile.remoteFile().level()); + KeyValue keyValue = + new KeyValue() + .replace(key, sequenceNumber, rowKind, value) + .setLevel(lookupFile.remoteFile().level()); + if (deleteMapEnabled) { + keyValue.setPosition( + MemorySegment.wrap(valueBytes).getLongBigEndian(valueBytes.length - 17)) + .setFileName(file.fileName()); + } + return keyValue; } private int fileWeigh(String file, LookupFile lookupFile) { @@ -183,6 +194,9 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException { byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); valueOut.clear(); valueOut.write(valueSerializer.serializeToBytes(kv.value())); + if (deleteMapEnabled) { + valueOut.writeLong(kv.position()); + } valueOut.writeLong(kv.sequenceNumber()); valueOut.writeByte(kv.valueKind().toByteValue()); byte[] valueBytes = valueOut.getCopyOfBuffer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 7be5fc74119e0..77f8b905de7ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -23,6 +23,8 @@ import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -32,6 +34,8 @@ import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.reader.RecordReaderIterator; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -47,6 +51,7 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewrite protected final MergeEngine mergeEngine; protected final RecordEqualiser valueEqualiser; protected final boolean changelogRowDeduplicate; + protected final @Nullable IndexMaintainer deleteMapMaintainer; public ChangelogMergeTreeRewriter( int maxLevel, @@ -57,12 +62,14 @@ public ChangelogMergeTreeRewriter( MergeFunctionFactory mfFactory, MergeSorter mergeSorter, RecordEqualiser valueEqualiser, - boolean changelogRowDeduplicate) { + boolean changelogRowDeduplicate, + @Nullable IndexMaintainer deleteMapMaintainer) { super(readerFactory, writerFactory, keyComparator, mfFactory, mergeSorter); this.maxLevel = maxLevel; this.mergeEngine = mergeEngine; this.valueEqualiser = valueEqualiser; this.changelogRowDeduplicate = changelogRowDeduplicate; + this.deleteMapMaintainer = deleteMapMaintainer; } protected abstract boolean rewriteChangelog( @@ -159,6 +166,11 @@ private CompactResult rewriteChangelogCompaction( .map(x -> x.upgrade(outputLevel)) .collect(Collectors.toList()); + if (deleteMapMaintainer != null) { + for (DataFileMeta dataFileMeta : before) { + deleteMapMaintainer.delete(dataFileMeta.fileName()); + } + } return new CompactResult(before, after, changelogFileWriter.result()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java index 24f872da11889..adad07d68ed8a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java @@ -23,6 +23,8 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -31,6 +33,8 @@ import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.utils.Filter; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; @@ -59,7 +63,8 @@ public FirstRowMergeTreeCompactRewriter( MergeFunctionFactory mfFactory, MergeSorter mergeSorter, RecordEqualiser valueEqualiser, - boolean changelogRowDeduplicate) { + boolean changelogRowDeduplicate, + @Nullable IndexMaintainer deleteMapMaintainer) { super( maxLevel, mergeEngine, @@ -69,7 +74,8 @@ public FirstRowMergeTreeCompactRewriter( mfFactory, mergeSorter, valueEqualiser, - changelogRowDeduplicate); + changelogRowDeduplicate, + deleteMapMaintainer); this.containsLevels = containsLevels; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java index b8d64fa869ac3..1c62438b50dad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java @@ -58,7 +58,8 @@ public FullChangelogMergeTreeCompactRewriter( mfFactory, mergeSorter, valueComparator, - changelogRowDeduplicate); + changelogRowDeduplicate, + null); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java index bad97e8bb1f6f..4595de465394d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java @@ -21,8 +21,12 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.types.RowKind; +import javax.annotation.Nullable; + import java.util.function.Function; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -53,12 +57,14 @@ public class LookupChangelogMergeFunctionWrapper implements MergeFunctionWrapper private final KeyValue reusedAfter = new KeyValue(); private final RecordEqualiser valueEqualiser; private final boolean changelogRowDeduplicate; + private final @Nullable IndexMaintainer deleteMapMaintainer; public LookupChangelogMergeFunctionWrapper( MergeFunctionFactory mergeFunctionFactory, Function lookup, RecordEqualiser valueEqualiser, - boolean changelogRowDeduplicate) { + boolean changelogRowDeduplicate, + @Nullable IndexMaintainer deleteMapMaintainer) { MergeFunction mergeFunction = mergeFunctionFactory.create(); checkArgument( mergeFunction instanceof LookupMergeFunction, @@ -69,6 +75,7 @@ public LookupChangelogMergeFunctionWrapper( this.lookup = lookup; this.valueEqualiser = valueEqualiser; this.changelogRowDeduplicate = changelogRowDeduplicate; + this.deleteMapMaintainer = deleteMapMaintainer; } @Override @@ -113,6 +120,9 @@ public ChangelogResult getResult() { mergeFunction2.add(result); result = mergeFunction2.getResult(); setChangelog(highLevel, result); + if (highLevel.isAdd() && deleteMapMaintainer != null) { + deleteMapMaintainer.notifyNewRecord(highLevel); + } } else { setChangelog(null, result); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java index c78ded5ff0b18..f221a53d2f8ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java @@ -22,6 +22,8 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -29,6 +31,8 @@ import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; @@ -56,7 +60,8 @@ public LookupMergeTreeCompactRewriter( MergeFunctionFactory mfFactory, MergeSorter mergeSorter, RecordEqualiser valueEqualiser, - boolean changelogRowDeduplicate) { + boolean changelogRowDeduplicate, + @Nullable IndexMaintainer deleteMapMaintainer) { super( maxLevel, mergeEngine, @@ -66,7 +71,8 @@ public LookupMergeTreeCompactRewriter( mfFactory, mergeSorter, valueEqualiser, - changelogRowDeduplicate); + changelogRowDeduplicate, + deleteMapMaintainer); this.lookupLevels = lookupLevels; } @@ -110,7 +116,8 @@ protected MergeFunctionWrapper createMergeWrapper(int outputLev } }, valueEqualiser, - changelogRowDeduplicate); + changelogRowDeduplicate, + deleteMapMaintainer); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 25f93cd82f297..ebe65f6d5bc0c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.ManifestEntry; @@ -69,6 +70,7 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { private final FileStoreScan scan; private final int writerNumberMax; @Nullable private final IndexMaintainer.Factory indexFactory; + @Nullable private final IndexMaintainer.Factory deleteMapFactory; @Nullable protected IOManager ioManager; @@ -88,6 +90,7 @@ protected AbstractFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory indexFactory, + @Nullable IndexMaintainer.Factory deleteMapFactory, String tableName, FileStorePathFactory pathFactory, int writerNumberMax) { @@ -95,7 +98,7 @@ protected AbstractFileStoreWrite( this.snapshotManager = snapshotManager; this.scan = scan; this.indexFactory = indexFactory; - + this.deleteMapFactory = deleteMapFactory; this.writers = new HashMap<>(); this.tableName = tableName; this.pathFactory = pathFactory; @@ -200,7 +203,10 @@ public List prepareCommit(boolean waitCompaction, long commitIden CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction); List newIndexFiles = new ArrayList<>(); if (writerContainer.indexMaintainer != null) { - newIndexFiles = writerContainer.indexMaintainer.prepareCommit(); + newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit()); + } + if (writerContainer.deleteMapMaintainer != null) { + newIndexFiles.addAll(writerContainer.deleteMapMaintainer.prepareCommit()); } CommitMessageImpl committable = new CommitMessageImpl( @@ -292,6 +298,7 @@ public List> checkpoint() { writerContainer.lastModifiedCommitIdentifier, dataFiles, writerContainer.indexMaintainer, + writerContainer.deleteMapMaintainer, increment)); } } @@ -311,10 +318,15 @@ public void restore(List> states) { state.bucket, state.dataFiles, state.commitIncrement, - compactExecutor()); + compactExecutor(), + state.deleteMapMaintainer); notifyNewWriter(writer); WriterContainer writerContainer = - new WriterContainer<>(writer, state.indexMaintainer, state.baseSnapshotId); + new WriterContainer<>( + writer, + state.indexMaintainer, + state.deleteMapMaintainer, + state.baseSnapshotId); writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier; writers.computeIfAbsent(state.partition, k -> new HashMap<>()) .put(state.bucket, writerContainer); @@ -360,10 +372,23 @@ public WriterContainer createWriterContainer( ? null : indexFactory.createOrRestore( ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); + IndexMaintainer deleteMapMaintainer = + deleteMapFactory == null + ? null + : deleteMapFactory.createOrRestore( + ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); + RecordWriter writer = - createWriter(partition.copy(), bucket, restoreFiles, null, compactExecutor()); + createWriter( + partition.copy(), + bucket, + restoreFiles, + null, + compactExecutor(), + deleteMapMaintainer); notifyNewWriter(writer); - return new WriterContainer<>(writer, indexMaintainer, latestSnapshotId); + return new WriterContainer<>( + writer, indexMaintainer, deleteMapMaintainer, latestSnapshotId); } @Override @@ -436,7 +461,8 @@ protected abstract RecordWriter createWriter( int bucket, List restoreFiles, @Nullable CommitIncrement restoreIncrement, - ExecutorService compactExecutor); + ExecutorService compactExecutor, + @Nullable IndexMaintainer deleteMapMaintainer); // force buffer spill to avoid out of memory in batch mode protected void forceBufferSpill() throws Exception {} @@ -449,15 +475,18 @@ protected void forceBufferSpill() throws Exception {} public static class WriterContainer { public final RecordWriter writer; @Nullable public final IndexMaintainer indexMaintainer; + @Nullable public final IndexMaintainer deleteMapMaintainer; protected final long baseSnapshotId; protected long lastModifiedCommitIdentifier; protected WriterContainer( RecordWriter writer, @Nullable IndexMaintainer indexMaintainer, + @Nullable IndexMaintainer deleteMapMaintainer, Long baseSnapshotId) { this.writer = writer; this.indexMaintainer = indexMaintainer; + this.deleteMapMaintainer = deleteMapMaintainer; this.baseSnapshotId = baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : baseSnapshotId; this.lastModifiedCommitIdentifier = Long.MIN_VALUE; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index a76848b5e6fa8..97f76202638d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -28,6 +28,8 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.RowDataRollingFileWriter; @@ -85,7 +87,7 @@ public AppendOnlyFileStoreWrite( FileStoreScan scan, CoreOptions options, String tableName) { - super(commitUser, snapshotManager, scan, options, null, tableName, pathFactory); + super(commitUser, snapshotManager, scan, options, null, null, tableName, pathFactory); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; @@ -110,7 +112,8 @@ protected RecordWriter createWriter( int bucket, List restoredFiles, @Nullable CommitIncrement restoreIncrement, - ExecutorService compactExecutor) { + ExecutorService compactExecutor, + @Nullable IndexMaintainer noUse) { // let writer and compact manager hold the same reference // and make restore files mutable to update long maxSequenceNumber = getMaxSequenceNumber(restoredFiles); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index 4630ce2609dac..ae38dfaf8dd65 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.memory.MemorySegmentPool; @@ -147,6 +148,7 @@ class State { protected final long lastModifiedCommitIdentifier; protected final List dataFiles; @Nullable protected final IndexMaintainer indexMaintainer; + @Nullable protected final IndexMaintainer deleteMapMaintainer; protected final CommitIncrement commitIncrement; protected State( @@ -156,6 +158,7 @@ protected State( long lastModifiedCommitIdentifier, Collection dataFiles, @Nullable IndexMaintainer indexMaintainer, + @Nullable IndexMaintainer deleteMapMaintainer, CommitIncrement commitIncrement) { this.partition = partition; this.bucket = bucket; @@ -163,6 +166,7 @@ protected State( this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier; this.dataFiles = new ArrayList<>(dataFiles); this.indexMaintainer = indexMaintainer; + this.deleteMapMaintainer = deleteMapMaintainer; this.commitIncrement = commitIncrement; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 955908b8a0fd9..5a25480178faf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -31,6 +31,7 @@ import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -103,10 +104,19 @@ public KeyValueFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory indexFactory, + @Nullable IndexMaintainer.Factory deleteMapFactory, CoreOptions options, KeyValueFieldsExtractor extractor, String tableName) { - super(commitUser, snapshotManager, scan, options, indexFactory, tableName, pathFactory); + super( + commitUser, + snapshotManager, + scan, + options, + indexFactory, + deleteMapFactory, + tableName, + pathFactory); this.fileIO = fileIO; this.keyType = keyType; this.valueType = valueType; @@ -142,7 +152,8 @@ protected MergeTreeWriter createWriter( int bucket, List restoreFiles, @Nullable CommitIncrement restoreIncrement, - ExecutorService compactExecutor) { + ExecutorService compactExecutor, + @Nullable IndexMaintainer deleteMapMaintainer) { if (LOG.isDebugEnabled()) { LOG.debug( "Creating merge tree writer for partition {} bucket {} from restored files {}", @@ -166,7 +177,13 @@ protected MergeTreeWriter createWriter( ? new ForceUpLevel0Compaction(universalCompaction) : universalCompaction; CompactManager compactManager = - createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels); + createCompactManager( + partition, + bucket, + compactStrategy, + compactExecutor, + levels, + deleteMapMaintainer); return new MergeTreeWriter( bufferSpillable(), @@ -193,12 +210,14 @@ private CompactManager createCompactManager( int bucket, CompactStrategy compactStrategy, ExecutorService compactExecutor, - Levels levels) { + Levels levels, + @Nullable IndexMaintainer deleteMapMaintainer) { if (options.writeOnly()) { return new NoopCompactManager(); } else { Comparator keyComparator = keyComparatorSupplier.get(); - CompactRewriter rewriter = createRewriter(partition, bucket, keyComparator, levels); + CompactRewriter rewriter = + createRewriter(partition, bucket, keyComparator, levels, deleteMapMaintainer); return new MergeTreeCompactManager( compactExecutor, levels, @@ -212,13 +231,22 @@ private CompactManager createCompactManager( } private MergeTreeCompactRewriter createRewriter( - BinaryRow partition, int bucket, Comparator keyComparator, Levels levels) { + BinaryRow partition, + int bucket, + Comparator keyComparator, + Levels levels, + @Nullable IndexMaintainer deleteMapMaintainer) { KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager); int maxLevel = options.numLevels() - 1; CoreOptions.MergeEngine mergeEngine = options.mergeEngine(); + + // todo: decouple changelog and deleteMap creation + // 1. check all usages of options.changelogProducer() + // 2. ... + switch (options.changelogProducer()) { case FULL_COMPACTION: return new FullChangelogMergeTreeCompactRewriter( @@ -249,7 +277,8 @@ private MergeTreeCompactRewriter createRewriter( mfFactory, mergeSorter, valueEqualiserSupplier.get(), - options.changelogRowDeduplicate()); + options.changelogRowDeduplicate(), + deleteMapMaintainer); } LookupLevels lookupLevels = createLookupLevels(levels, readerFactory); return new LookupMergeTreeCompactRewriter( @@ -262,7 +291,8 @@ private MergeTreeCompactRewriter createRewriter( mfFactory, mergeSorter, valueEqualiserSupplier.get(), - options.changelogRowDeduplicate()); + options.changelogRowDeduplicate(), + deleteMapMaintainer); default: return new MergeTreeCompactRewriter( readerFactory, writerFactory, keyComparator, mfFactory, mergeSorter); @@ -291,7 +321,8 @@ private LookupLevels createLookupLevels( options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)), options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), - bfGenerator(options)); + bfGenerator(options), + this.options.deleteMapEnabled()); } private ContainsLevels createContainsLevels( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index a1b39a992c9d1..651f88c03b995 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.index.IndexMaintainer; +import org.apache.paimon.index.delete.DeleteIndex; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryOwner; @@ -62,6 +63,7 @@ public MemoryFileStoreWrite( FileStoreScan scan, CoreOptions options, @Nullable IndexMaintainer.Factory indexFactory, + @Nullable IndexMaintainer.Factory deleteMapFactory, String tableName, FileStorePathFactory pathFactory) { super( @@ -69,6 +71,7 @@ public MemoryFileStoreWrite( snapshotManager, scan, indexFactory, + deleteMapFactory, tableName, pathFactory, options.writeMaxWritersToSpill()); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 68e6b05c10c48..1108d40040085 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; +import org.apache.paimon.CoreOptions.FileFormatType; import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.BinaryString; @@ -475,9 +476,15 @@ private static void validateDefaultValues(TableSchema schema) { } private static void validateForDeleteMap(CoreOptions options) { - if (!options.formatType().equals(CoreOptions.FileFormatType.PARQUET)) { + if (!options.formatType().equals(FileFormatType.PARQUET)) { throw new IllegalArgumentException( "Delete map is only supported for parquet file format now."); } + + // todo: decouple changelog and deleteMap creation + if (options.changelogProducer() != ChangelogProducer.LOOKUP) { + throw new IllegalArgumentException( + "Delete map is only supported for lookup changelog producer now."); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 2009932c5f97a..3e64d8a00607a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -145,7 +145,8 @@ private void newLookupLevels(BinaryRow partition, int bucket, List hashLookupStoreFactory, options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), - bfGenerator(options)); + bfGenerator(options), + this.options.deleteMapEnabled()); tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 836f8f874db4f..5f1f3e17b10cc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -229,7 +229,8 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxDiskSize) { new HashLookupStoreFactory(new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75), Duration.ofHours(1), maxDiskSize, - rowCount -> BloomFilter.builder(rowCount, 0.05)); + rowCount -> BloomFilter.builder(rowCount, 0.05), + false); } private KeyValue kv(int key, int value) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java index df51eecffbcb8..982d141238699 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java @@ -69,7 +69,8 @@ public void testDeduplicate(boolean changelogRowDeduplicate) { RowType.of(DataTypes.INT())), highLevel::get, EQUALISER, - changelogRowDeduplicate); + changelogRowDeduplicate, + null); // Without level-0 function.reset(); @@ -225,7 +226,8 @@ public void testSum(boolean changelogRowDeduplicate) { RowType.of(DataTypes.INT())), key -> null, EQUALISER, - changelogRowDeduplicate); + changelogRowDeduplicate, + null); // Without level-0 function.reset(); @@ -384,7 +386,8 @@ public void testPartialUpdateIgnoreDelete() { RowType.of(DataTypes.INT())), key -> null, EQUALISER, - false); + false, + null); function.reset(); function.add(new KeyValue().replace(row(1), 1, DELETE, row(1)).setLevel(2)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 55a67187945bf..7708b2f3f9335 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -183,6 +183,7 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc snapshotManager, null, // not used, we only create an empty writer null, + null, options, EXTRACTOR, tablePath.getName()) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteMapModeTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteMapModeTest.scala new file mode 100644 index 0000000000000..d9a09f9847162 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteMapModeTest.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.sql + +import org.apache.paimon.data.BinaryRow +import org.apache.paimon.index.delete.DeleteMapIndexMaintainer +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions + +class DeleteMapModeTest extends PaimonSparkTestBase { + + test("Paimon deleteMap: deleteMap write") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'primary-key' = 'id', + | 'changelog-producer' = 'lookup', + | 'file.format' = 'parquet', + | 'delete-map.enabled' = 'true') + |""".stripMargin) + val table = loadTable("T") + + spark.sql("INSERT INTO T VALUES (1, 'aaaaaaaaaaaaaaaaaaa'), (2, 'b'), (3, 'c')") + spark.sql("INSERT INTO T VALUES (1, 'a_new1'), (3, 'c_new1')") + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a_new1") :: Row(2, "b") :: Row(3, "c_new1") :: Nil) + + val deleteMap1 = new DeleteMapIndexMaintainer( + table.store().newIndexFileHandler(), + table.snapshotManager().latestSnapshotId(), + BinaryRow.EMPTY_ROW, + 0).deleteMap() + // 1, 3 deleted, their row positions are 0, 2 + Assertions.assertEquals(1, deleteMap1.size()) + deleteMap1 + .entrySet() + .forEach( + e => { + Assertions.assertTrue(e.getValue.isDeleted(0)) + Assertions.assertTrue(e.getValue.isDeleted(2)) + }) + + spark.sql("INSERT INTO T VALUES (1, 'a_new1'), (2, 'b_new1')") + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a_new1") :: Row(2, "b_new1") :: Row(3, "c_new1") :: Nil) + + val deleteMap2 = new DeleteMapIndexMaintainer( + table.store().newIndexFileHandler(), + table.snapshotManager().latestSnapshotId(), + BinaryRow.EMPTY_ROW, + 0).deleteMap() + // trigger compaction, deleteMap should be empty + Assertions.assertTrue(deleteMap2.isEmpty) + + spark.sql("INSERT INTO T VALUES (2, 'b_new2')") + checkAnswer( + spark.sql(s"SELECT * from T ORDER BY id"), + Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil) + + val deleteMap3 = new DeleteMapIndexMaintainer( + table.store().newIndexFileHandler(), + table.snapshotManager().latestSnapshotId(), + BinaryRow.EMPTY_ROW, + 0).deleteMap() + // 2 deleted, row positions is 1 + Assertions.assertEquals(1, deleteMap3.size()) + deleteMap3 + .entrySet() + .forEach( + e => { + Assertions.assertTrue(e.getValue.isDeleted(1)) + }) + } +}