Skip to content

Commit

Permalink
[core] Skip rewrite when delete row count is 0 with dv (#3070)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Mar 21, 2024
1 parent c92b00d commit 9a2f0ba
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public ChangelogMergeTreeRewriter(
protected abstract boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections);

protected abstract UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file);
protected abstract UpgradeStrategy upgradeStrategy(int outputLevel, DataFileMeta file);

protected abstract MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel);

Expand All @@ -100,19 +100,19 @@ protected boolean rewriteLookupChangelog(int outputLevel, List<List<SortedRun>>
public CompactResult rewrite(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {
if (rewriteChangelog(outputLevel, dropDelete, sections)) {
return rewriteChangelogCompaction(outputLevel, sections, dropDelete, true);
return rewriteOrProduceChangelog(outputLevel, sections, dropDelete, true);
} else {
return rewriteCompaction(outputLevel, dropDelete, sections);
}
}

/**
* Rewrite and produce changelog at the same time.
* Rewrite or produce changelog at the same time.
*
* @param dropDelete whether to drop delete when rewrite compact file
* @param rewriteCompactFile whether to rewrite compact file
*/
private CompactResult rewriteChangelogCompaction(
private CompactResult rewriteOrProduceChangelog(
int outputLevel,
List<List<SortedRun>> sections,
boolean dropDelete,
Expand Down Expand Up @@ -168,7 +168,9 @@ private CompactResult rewriteChangelogCompaction(
.map(x -> x.upgrade(outputLevel))
.collect(Collectors.toList());

notifyCompactBefore(before);
if (rewriteCompactFile) {
notifyRewriteCompactBefore(before);
}

List<DataFileMeta> changelogFiles =
changelogFileWriter != null
Expand All @@ -179,9 +181,9 @@ private CompactResult rewriteChangelogCompaction(

@Override
public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception {
UpgradeStrategy strategy = upgradeChangelog(outputLevel, file);
UpgradeStrategy strategy = upgradeStrategy(outputLevel, file);
if (strategy.changelog) {
return rewriteChangelogCompaction(
return rewriteOrProduceChangelog(
outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))),
Expand All @@ -194,7 +196,7 @@ public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exceptio

/** Strategy for upgrade. */
protected enum UpgradeStrategy {
NO_CHANGELOG(false, false),
NO_CHANGELOG_NO_REWRITE(false, false),
CHANGELOG_NO_REWRITE(true, false),
CHANGELOG_WITH_REWRITE(true, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.List;

import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG_NO_REWRITE;

/** A {@link MergeTreeCompactRewriter} which produces changelog files for each full compaction. */
public class FullChangelogMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter {
Expand Down Expand Up @@ -84,8 +84,8 @@ protected boolean rewriteChangelog(
}

@Override
protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
return outputLevel == maxLevel ? CHANGELOG_NO_REWRITE : NO_CHANGELOG;
protected UpgradeStrategy upgradeStrategy(int outputLevel, DataFileMeta file) {
return outputLevel == maxLevel ? CHANGELOG_NO_REWRITE : NO_CHANGELOG_NO_REWRITE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG_NO_REWRITE;

/**
* A {@link MergeTreeCompactRewriter} which produces changelog files by lookup for the compaction
Expand Down Expand Up @@ -83,7 +83,7 @@ public LookupMergeTreeCompactRewriter(
}

@Override
protected void notifyCompactBefore(List<DataFileMeta> files) {
protected void notifyRewriteCompactBefore(List<DataFileMeta> files) {
if (dvMaintainer != null) {
files.forEach(file -> dvMaintainer.removeDeletionVectorOf(file.fileName()));
}
Expand All @@ -96,15 +96,14 @@ protected boolean rewriteChangelog(
}

@Override
protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
protected UpgradeStrategy upgradeStrategy(int outputLevel, DataFileMeta file) {
if (file.level() != 0) {
return NO_CHANGELOG;
return NO_CHANGELOG_NO_REWRITE;
}

// TODO In deletionVector mode, since drop delete is required, rewrite is always required.
// TODO wait https://github.com/apache/incubator-paimon/pull/2962
// TODO but should be careful to not be deleted by DeletionVectorsMaintainer!
if (dvMaintainer != null) {
// In deletionVector mode, since drop delete is required, when delete row count > 0 rewrite
// is required.
if (dvMaintainer != null && file.deleteRowCount().map(cnt -> cnt > 0).orElse(true)) {
return CHANGELOG_WITH_REWRITE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected CompactResult rewriteCompaction(
writer.write(new RecordReaderIterator<>(reader));
writer.close();
List<DataFileMeta> before = extractFilesFromSections(sections);
notifyCompactBefore(before);
notifyRewriteCompactBefore(before);
return new CompactResult(before, writer.result());
}

Expand All @@ -98,5 +98,5 @@ protected <T> RecordReader<T> readerForMergeTree(
mergeSorter);
}

protected void notifyCompactBefore(List<DataFileMeta> files) {}
protected void notifyRewriteCompactBefore(List<DataFileMeta> files) {}
}

0 comments on commit 9a2f0ba

Please sign in to comment.