Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce UpgradeStrategy to simplify logical #2781

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactResult;
Expand All @@ -44,13 +44,13 @@
public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewriter {

protected final int maxLevel;
protected final CoreOptions.MergeEngine mergeEngine;
protected final MergeEngine mergeEngine;
protected final RecordEqualiser valueEqualiser;
protected final boolean changelogRowDeduplicate;

public ChangelogMergeTreeRewriter(
int maxLevel,
CoreOptions.MergeEngine mergeEngine,
MergeEngine mergeEngine,
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
Expand All @@ -68,7 +68,7 @@ public ChangelogMergeTreeRewriter(
protected abstract boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections);

protected abstract boolean upgradeChangelog(int outputLevel, DataFileMeta file);
protected abstract UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file);

protected abstract MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel);

Expand Down Expand Up @@ -164,26 +164,30 @@ private CompactResult rewriteChangelogCompaction(

@Override
public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception {
if (upgradeChangelog(outputLevel, file)) {
UpgradeStrategy strategy = upgradeChangelog(outputLevel, file);
if (strategy.changelog) {
return rewriteChangelogCompaction(
outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))),
!rewriteCompactFileForUpgrade(outputLevel));
strategy.rewrite);
} else {
return super.upgrade(outputLevel, file);
}
}

/**
* For upgrade, there are two scenarios can skip rewrite compact file.
*
* <ul>
* <li>mergeEngine == DEDUPLICATE, no need to consider previous records
* <li>outputLevel == maxLevel, no previous records
* </ul>
*/
private boolean rewriteCompactFileForUpgrade(int outputLevel) {
return mergeEngine == CoreOptions.MergeEngine.DEDUPLICATE || outputLevel == maxLevel;
/** Strategy for upgrade. */
protected enum UpgradeStrategy {
NO_CHANGELOG(false, false),
CHANGELOG_NO_REWRITE(true, false),
CHANGELOG_WITH_REWRITE(true, true);

private final boolean changelog;
private final boolean rewrite;

UpgradeStrategy(boolean changelog, boolean rewrite) {
this.changelog = changelog;
this.rewrite = rewrite;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
Expand Down Expand Up @@ -77,8 +80,19 @@ protected boolean rewriteChangelog(
}

@Override
protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
return file.level() == 0;
protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this overwrite is no need

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class is not extended lookup rewriter.

if (file.level() != 0) {
return NO_CHANGELOG;
}

if (outputLevel == maxLevel) {
return CHANGELOG_NO_REWRITE;
}

// FIRST_ROW must rewrite file, because some records that are already at higher level may be
// skipped
// See LookupMergeFunction, it just returns newly records.
return CHANGELOG_WITH_REWRITE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;

/** A {@link MergeTreeCompactRewriter} which produces changelog files for each full compaction. */
public class FullChangelogMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter {

Expand Down Expand Up @@ -71,8 +74,8 @@ protected boolean rewriteChangelog(
}

@Override
protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
return outputLevel == maxLevel;
protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
return outputLevel == maxLevel ? CHANGELOG_NO_REWRITE : NO_CHANGELOG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
Expand All @@ -34,6 +34,10 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;

/**
* A {@link MergeTreeCompactRewriter} which produces changelog files by lookup for the compaction
* involving level 0 files.
Expand All @@ -44,7 +48,7 @@ public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter {

public LookupMergeTreeCompactRewriter(
int maxLevel,
CoreOptions.MergeEngine mergeEngine,
MergeEngine mergeEngine,
LookupLevels lookupLevels,
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Expand Down Expand Up @@ -73,8 +77,25 @@ protected boolean rewriteChangelog(
}

@Override
protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
return file.level() == 0;
protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
if (file.level() != 0) {
return NO_CHANGELOG;
}

if (outputLevel == maxLevel) {
return CHANGELOG_NO_REWRITE;
}

// DEDUPLICATE retains the latest records as the final result, so merging has no impact on
// it at all
if (mergeEngine == MergeEngine.DEDUPLICATE) {
return CHANGELOG_NO_REWRITE;
}

// other merge engines must rewrite file, because some records that are already at higher
// level may be merged
// See LookupMergeFunction, it just returns newly records.
return CHANGELOG_WITH_REWRITE;
}

@Override
Expand Down
Loading