From f622d80aed2102c9b17b40fd09a741093ff1160f Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 11 Mar 2024 12:35:59 +0800 Subject: [PATCH] [core] Refactor ChangelogMergeTreeRewriter to lookup free --- .../compact/ChangelogMergeTreeRewriter.java | 43 ++++++++----------- ...FullChangelogMergeTreeCompactRewriter.java | 5 +-- .../LookupMergeTreeCompactRewriter.java | 27 ++++++------ .../compact/MergeTreeCompactRewriter.java | 14 ++---- .../operation/KeyValueFileStoreWrite.java | 18 +++----- 5 files changed, 45 insertions(+), 62 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index c807fca23464..0024d79ce973 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -22,12 +22,10 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; -import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.MergeTreeReaders; import org.apache.paimon.mergetree.SortedRun; @@ -49,7 +47,8 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewrite protected final int maxLevel; protected final MergeEngine mergeEngine; - protected final LookupStrategy lookupStrategy; + private final boolean produceChangelog; + private final boolean forceDropDelete; public ChangelogMergeTreeRewriter( int maxLevel, @@ -60,19 +59,19 @@ public ChangelogMergeTreeRewriter( @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, MergeSorter mergeSorter, - LookupStrategy lookupStrategy, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + boolean produceChangelog, + boolean forceDropDelete) { super( readerFactory, writerFactory, keyComparator, userDefinedSeqComparator, mfFactory, - mergeSorter, - deletionVectorsMaintainer); + mergeSorter); this.maxLevel = maxLevel; this.mergeEngine = mergeEngine; - this.lookupStrategy = lookupStrategy; + this.produceChangelog = produceChangelog; + this.forceDropDelete = forceDropDelete; } protected abstract boolean rewriteChangelog( @@ -143,17 +142,19 @@ private CompactResult rewriteChangelogCompaction( if (rewriteCompactFile) { compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel); } - if (lookupStrategy.produceChangelog) { + if (produceChangelog) { changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel); } while (iterator.hasNext()) { ChangelogResult result = iterator.next(); KeyValue keyValue = result.result(); - if (rewriteCompactFile && keyValue != null && (!dropDelete || keyValue.isAdd())) { + if (compactFileWriter != null + && keyValue != null + && (!dropDelete || keyValue.isAdd())) { compactFileWriter.write(keyValue); } - if (lookupStrategy.produceChangelog) { + if (produceChangelog) { for (KeyValue kv : result.changelogs()) { changelogFileWriter.write(kv); } @@ -173,24 +174,19 @@ private CompactResult rewriteChangelogCompaction( List before = extractFilesFromSections(sections); List after = - rewriteCompactFile + compactFileWriter != null ? compactFileWriter.result() : before.stream() .map(x -> x.upgrade(outputLevel)) .collect(Collectors.toList()); - if (deletionVectorsMaintainer != null) { - for (DataFileMeta dataFileMeta : before) { - deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName()); - } - } + notifyCompactBefore(before); - return new CompactResult( - before, - after, - lookupStrategy.produceChangelog + List changelogFiles = + changelogFileWriter != null ? changelogFileWriter.result() - : Collections.emptyList()); + : Collections.emptyList(); + return new CompactResult(before, after, changelogFiles); } @Override @@ -201,8 +197,7 @@ public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exceptio outputLevel, Collections.singletonList( Collections.singletonList(SortedRun.fromSingle(file))), - // In deletion vector mode, we always drop deletion - lookupStrategy.deletionVector, + forceDropDelete, strategy.rewrite); } else { return super.upgrade(outputLevel, file); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java index 1eaa76cf4ac4..5415427f4245 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java @@ -25,7 +25,6 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; -import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.utils.FieldsComparator; @@ -66,8 +65,8 @@ public FullChangelogMergeTreeCompactRewriter( userDefinedSeqComparator, mfFactory, mergeSorter, - LookupStrategy.CHANGELOG_ONLY, - null); + true, + false); this.valueEqualiser = valueEqualiser; this.changelogRowDeduplicate = changelogRowDeduplicate; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java index 95b7ab78c055..e001e083e41a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java @@ -42,7 +42,6 @@ import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE; import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE; import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup for the compaction @@ -52,6 +51,7 @@ public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewrite private final LookupLevels lookupLevels; private final MergeFunctionWrapperFactory wrapperFactory; + @Nullable private final DeletionVectorsMaintainer dvMaintainer; public LookupMergeTreeCompactRewriter( int maxLevel, @@ -64,8 +64,8 @@ public LookupMergeTreeCompactRewriter( MergeFunctionFactory mfFactory, MergeSorter mergeSorter, MergeFunctionWrapperFactory wrapperFactory, - LookupStrategy lookupStrategy, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + boolean produceChangelog, + @Nullable DeletionVectorsMaintainer dvMaintainer) { super( maxLevel, mergeEngine, @@ -75,17 +75,18 @@ public LookupMergeTreeCompactRewriter( userDefinedSeqComparator, mfFactory, mergeSorter, - lookupStrategy, - deletionVectorsMaintainer); - if (lookupStrategy.deletionVector) { - checkArgument( - deletionVectorsMaintainer != null, - "deletionVectorsMaintainer should not be null, there is a bug."); - } + produceChangelog, + dvMaintainer != null); + this.dvMaintainer = dvMaintainer; this.lookupLevels = lookupLevels; this.wrapperFactory = wrapperFactory; } + @Override + protected void notifyCompactBefore(List files) { + super.notifyCompactBefore(files); + } + @Override protected boolean rewriteChangelog( int outputLevel, boolean dropDelete, List> sections) { @@ -99,7 +100,8 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) { } // In deletionVector mode, since drop delete is required, rewrite is always required. - if (lookupStrategy.deletionVector) { + // TODO wait https://github.com/apache/incubator-paimon/pull/2962 + if (dvMaintainer != null) { return CHANGELOG_WITH_REWRITE; } @@ -121,8 +123,7 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) { @Override protected MergeFunctionWrapper createMergeWrapper(int outputLevel) { - return wrapperFactory.create( - mfFactory, outputLevel, lookupLevels, deletionVectorsMaintainer); + return wrapperFactory.create(mfFactory, outputLevel, lookupLevels, dvMaintainer); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index 8e8f67207317..b4659db48c0f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -21,7 +21,6 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -47,7 +46,6 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter { @Nullable protected final FieldsComparator userDefinedSeqComparator; protected final MergeFunctionFactory mfFactory; protected final MergeSorter mergeSorter; - @Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer; public MergeTreeCompactRewriter( KeyValueFileReaderFactory readerFactory, @@ -55,15 +53,13 @@ public MergeTreeCompactRewriter( Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, - MergeSorter mergeSorter, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + MergeSorter mergeSorter) { this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.keyComparator = keyComparator; this.userDefinedSeqComparator = userDefinedSeqComparator; this.mfFactory = mfFactory; this.mergeSorter = mergeSorter; - this.deletionVectorsMaintainer = deletionVectorsMaintainer; } @Override @@ -88,11 +84,9 @@ protected CompactResult rewriteCompaction( writer.write(new RecordReaderIterator<>(sectionsReader)); writer.close(); List before = extractFilesFromSections(sections); - if (deletionVectorsMaintainer != null) { - for (DataFileMeta dataFileMeta : before) { - deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName()); - } - } + notifyCompactBefore(before); return new CompactResult(before, writer.result()); } + + protected void notifyCompactBefore(List files) {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index b37069fe485b..ce4a27813923 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -163,7 +163,7 @@ protected MergeTreeWriter createWriter( List restoreFiles, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + @Nullable DeletionVectorsMaintainer dvMaintainer) { if (LOG.isDebugEnabled()) { LOG.debug( "Creating merge tree writer for partition {} bucket {} from restored files {}", @@ -188,12 +188,7 @@ protected MergeTreeWriter createWriter( : universalCompaction; CompactManager compactManager = createCompactManager( - partition, - bucket, - compactStrategy, - compactExecutor, - levels, - deletionVectorsMaintainer); + partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer); return new MergeTreeWriter( bufferSpillable(), @@ -222,7 +217,7 @@ private CompactManager createCompactManager( CompactStrategy compactStrategy, ExecutorService compactExecutor, Levels levels, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + @Nullable DeletionVectorsMaintainer dvMaintainer) { if (options.writeOnly()) { return new NoopCompactManager(); } else { @@ -235,7 +230,7 @@ private CompactManager createCompactManager( keyComparator, userDefinedSeqComparator, levels, - deletionVectorsMaintainer); + dvMaintainer); return new MergeTreeCompactManager( compactExecutor, levels, @@ -321,7 +316,7 @@ private MergeTreeCompactRewriter createRewriter( mfFactory, mergeSorter, wrapperFactory, - lookupStrategy, + lookupStrategy.produceChangelog, deletionVectorsMaintainer); } else { return new MergeTreeCompactRewriter( @@ -330,8 +325,7 @@ private MergeTreeCompactRewriter createRewriter( keyComparator, userDefinedSeqComparator, mfFactory, - mergeSorter, - deletionVectorsMaintainer); + mergeSorter); } }