diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 4626d8fe0cf3..7be5fc74119e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -18,7 +18,7 @@ package org.apache.paimon.mergetree.compact; -import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.compact.CompactResult; @@ -44,13 +44,13 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewriter { protected final int maxLevel; - protected final CoreOptions.MergeEngine mergeEngine; + protected final MergeEngine mergeEngine; protected final RecordEqualiser valueEqualiser; protected final boolean changelogRowDeduplicate; public ChangelogMergeTreeRewriter( int maxLevel, - CoreOptions.MergeEngine mergeEngine, + MergeEngine mergeEngine, KeyValueFileReaderFactory readerFactory, KeyValueFileWriterFactory writerFactory, Comparator keyComparator, @@ -68,7 +68,7 @@ public ChangelogMergeTreeRewriter( protected abstract boolean rewriteChangelog( int outputLevel, boolean dropDelete, List> sections); - protected abstract boolean upgradeChangelog(int outputLevel, DataFileMeta file); + protected abstract UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file); protected abstract MergeFunctionWrapper createMergeWrapper(int outputLevel); @@ -164,26 +164,30 @@ private CompactResult rewriteChangelogCompaction( @Override public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception { - if (upgradeChangelog(outputLevel, file)) { + UpgradeStrategy strategy = upgradeChangelog(outputLevel, file); + if (strategy.changelog) { return rewriteChangelogCompaction( outputLevel, Collections.singletonList( Collections.singletonList(SortedRun.fromSingle(file))), - !rewriteCompactFileForUpgrade(outputLevel)); + strategy.rewrite); } else { return super.upgrade(outputLevel, file); } } - /** - * For upgrade, there are two scenarios can skip rewrite compact file. - * - *
    - *
  • mergeEngine == DEDUPLICATE, no need to consider previous records - *
  • outputLevel == maxLevel, no previous records - *
- */ - private boolean rewriteCompactFileForUpgrade(int outputLevel) { - return mergeEngine == CoreOptions.MergeEngine.DEDUPLICATE || outputLevel == maxLevel; + /** Strategy for upgrade. */ + protected enum UpgradeStrategy { + NO_CHANGELOG(false, false), + CHANGELOG_NO_REWRITE(true, false), + CHANGELOG_WITH_REWRITE(true, true); + + private final boolean changelog; + private final boolean rewrite; + + UpgradeStrategy(boolean changelog, boolean rewrite) { + this.changelog = changelog; + this.rewrite = rewrite; + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java index 7336ade38dcb..24f872da1188 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java @@ -36,6 +36,9 @@ import java.util.Comparator; import java.util.List; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -77,8 +80,19 @@ protected boolean rewriteChangelog( } @Override - protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) { - return file.level() == 0; + protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) { + if (file.level() != 0) { + return NO_CHANGELOG; + } + + if (outputLevel == maxLevel) { + return CHANGELOG_NO_REWRITE; + } + + // FIRST_ROW must rewrite file, because some records that are already at higher level may be + // skipped + // See LookupMergeFunction, it just returns newly records. + return CHANGELOG_WITH_REWRITE; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java index a0f387b24ed5..b8d64fa869ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java @@ -33,6 +33,9 @@ import java.util.Comparator; import java.util.List; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG; + /** A {@link MergeTreeCompactRewriter} which produces changelog files for each full compaction. */ public class FullChangelogMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter { @@ -71,8 +74,8 @@ protected boolean rewriteChangelog( } @Override - protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) { - return outputLevel == maxLevel; + protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) { + return outputLevel == maxLevel ? CHANGELOG_NO_REWRITE : NO_CHANGELOG; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java index 68df5593de9b..c78ded5ff0b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java @@ -18,7 +18,7 @@ package org.apache.paimon.mergetree.compact; -import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; @@ -34,6 +34,10 @@ import java.util.Comparator; import java.util.List; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE; +import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG; + /** * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup for the compaction * involving level 0 files. @@ -44,7 +48,7 @@ public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter { public LookupMergeTreeCompactRewriter( int maxLevel, - CoreOptions.MergeEngine mergeEngine, + MergeEngine mergeEngine, LookupLevels lookupLevels, KeyValueFileReaderFactory readerFactory, KeyValueFileWriterFactory writerFactory, @@ -73,8 +77,25 @@ protected boolean rewriteChangelog( } @Override - protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) { - return file.level() == 0; + protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) { + if (file.level() != 0) { + return NO_CHANGELOG; + } + + if (outputLevel == maxLevel) { + return CHANGELOG_NO_REWRITE; + } + + // DEDUPLICATE retains the latest records as the final result, so merging has no impact on + // it at all + if (mergeEngine == MergeEngine.DEDUPLICATE) { + return CHANGELOG_NO_REWRITE; + } + + // other merge engines must rewrite file, because some records that are already at higher + // level may be merged + // See LookupMergeFunction, it just returns newly records. + return CHANGELOG_WITH_REWRITE; } @Override