Skip to content

Commit

Permalink
[core] Refactor ChangelogMergeTreeRewriter to lookup free
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Mar 11, 2024
1 parent d8d737f commit f622d80
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
Expand All @@ -49,7 +47,8 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewrite

protected final int maxLevel;
protected final MergeEngine mergeEngine;
protected final LookupStrategy lookupStrategy;
private final boolean produceChangelog;
private final boolean forceDropDelete;

public ChangelogMergeTreeRewriter(
int maxLevel,
Expand All @@ -60,19 +59,19 @@ public ChangelogMergeTreeRewriter(
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
LookupStrategy lookupStrategy,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
boolean produceChangelog,
boolean forceDropDelete) {
super(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter,
deletionVectorsMaintainer);
mergeSorter);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
this.lookupStrategy = lookupStrategy;
this.produceChangelog = produceChangelog;
this.forceDropDelete = forceDropDelete;
}

protected abstract boolean rewriteChangelog(
Expand Down Expand Up @@ -143,17 +142,19 @@ private CompactResult rewriteChangelogCompaction(
if (rewriteCompactFile) {
compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel);
}
if (lookupStrategy.produceChangelog) {
if (produceChangelog) {
changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel);
}

while (iterator.hasNext()) {
ChangelogResult result = iterator.next();
KeyValue keyValue = result.result();
if (rewriteCompactFile && keyValue != null && (!dropDelete || keyValue.isAdd())) {
if (compactFileWriter != null
&& keyValue != null
&& (!dropDelete || keyValue.isAdd())) {
compactFileWriter.write(keyValue);
}
if (lookupStrategy.produceChangelog) {
if (produceChangelog) {
for (KeyValue kv : result.changelogs()) {
changelogFileWriter.write(kv);
}
Expand All @@ -173,24 +174,19 @@ private CompactResult rewriteChangelogCompaction(

List<DataFileMeta> before = extractFilesFromSections(sections);
List<DataFileMeta> after =
rewriteCompactFile
compactFileWriter != null
? compactFileWriter.result()
: before.stream()
.map(x -> x.upgrade(outputLevel))
.collect(Collectors.toList());

if (deletionVectorsMaintainer != null) {
for (DataFileMeta dataFileMeta : before) {
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
}
}
notifyCompactBefore(before);

return new CompactResult(
before,
after,
lookupStrategy.produceChangelog
List<DataFileMeta> changelogFiles =
changelogFileWriter != null
? changelogFileWriter.result()
: Collections.emptyList());
: Collections.emptyList();
return new CompactResult(before, after, changelogFiles);
}

@Override
Expand All @@ -201,8 +197,7 @@ public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exceptio
outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))),
// In deletion vector mode, we always drop deletion
lookupStrategy.deletionVector,
forceDropDelete,
strategy.rewrite);
} else {
return super.upgrade(outputLevel, file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.FieldsComparator;
Expand Down Expand Up @@ -66,8 +65,8 @@ public FullChangelogMergeTreeCompactRewriter(
userDefinedSeqComparator,
mfFactory,
mergeSorter,
LookupStrategy.CHANGELOG_ONLY,
null);
true,
false);
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A {@link MergeTreeCompactRewriter} which produces changelog files by lookup for the compaction
Expand All @@ -52,6 +51,7 @@ public class LookupMergeTreeCompactRewriter<T> extends ChangelogMergeTreeRewrite

private final LookupLevels<T> lookupLevels;
private final MergeFunctionWrapperFactory<T> wrapperFactory;
@Nullable private final DeletionVectorsMaintainer dvMaintainer;

public LookupMergeTreeCompactRewriter(
int maxLevel,
Expand All @@ -64,8 +64,8 @@ public LookupMergeTreeCompactRewriter(
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
MergeFunctionWrapperFactory<T> wrapperFactory,
LookupStrategy lookupStrategy,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
boolean produceChangelog,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
super(
maxLevel,
mergeEngine,
Expand All @@ -75,17 +75,18 @@ public LookupMergeTreeCompactRewriter(
userDefinedSeqComparator,
mfFactory,
mergeSorter,
lookupStrategy,
deletionVectorsMaintainer);
if (lookupStrategy.deletionVector) {
checkArgument(
deletionVectorsMaintainer != null,
"deletionVectorsMaintainer should not be null, there is a bug.");
}
produceChangelog,
dvMaintainer != null);
this.dvMaintainer = dvMaintainer;
this.lookupLevels = lookupLevels;
this.wrapperFactory = wrapperFactory;
}

@Override
protected void notifyCompactBefore(List<DataFileMeta> files) {
super.notifyCompactBefore(files);
}

@Override
protected boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) {
Expand All @@ -99,7 +100,8 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
}

// In deletionVector mode, since drop delete is required, rewrite is always required.
if (lookupStrategy.deletionVector) {
// TODO wait https://github.com/apache/incubator-paimon/pull/2962
if (dvMaintainer != null) {
return CHANGELOG_WITH_REWRITE;
}

Expand All @@ -121,8 +123,7 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {

@Override
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel) {
return wrapperFactory.create(
mfFactory, outputLevel, lookupLevels, deletionVectorsMaintainer);
return wrapperFactory.create(mfFactory, outputLevel, lookupLevels, dvMaintainer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
Expand All @@ -47,23 +46,20 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final MergeFunctionFactory<KeyValue> mfFactory;
protected final MergeSorter mergeSorter;
@Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer;

public MergeTreeCompactRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
MergeSorter mergeSorter) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.keyComparator = keyComparator;
this.userDefinedSeqComparator = userDefinedSeqComparator;
this.mfFactory = mfFactory;
this.mergeSorter = mergeSorter;
this.deletionVectorsMaintainer = deletionVectorsMaintainer;
}

@Override
Expand All @@ -88,11 +84,9 @@ protected CompactResult rewriteCompaction(
writer.write(new RecordReaderIterator<>(sectionsReader));
writer.close();
List<DataFileMeta> before = extractFilesFromSections(sections);
if (deletionVectorsMaintainer != null) {
for (DataFileMeta dataFileMeta : before) {
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
}
}
notifyCompactBefore(before);
return new CompactResult(before, writer.result());
}

protected void notifyCompactBefore(List<DataFileMeta> files) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected MergeTreeWriter createWriter(
List<DataFileMeta> restoreFiles,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Creating merge tree writer for partition {} bucket {} from restored files {}",
Expand All @@ -188,12 +188,7 @@ protected MergeTreeWriter createWriter(
: universalCompaction;
CompactManager compactManager =
createCompactManager(
partition,
bucket,
compactStrategy,
compactExecutor,
levels,
deletionVectorsMaintainer);
partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);

return new MergeTreeWriter(
bufferSpillable(),
Expand Down Expand Up @@ -222,7 +217,7 @@ private CompactManager createCompactManager(
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
Levels levels,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
Expand All @@ -235,7 +230,7 @@ private CompactManager createCompactManager(
keyComparator,
userDefinedSeqComparator,
levels,
deletionVectorsMaintainer);
dvMaintainer);
return new MergeTreeCompactManager(
compactExecutor,
levels,
Expand Down Expand Up @@ -321,7 +316,7 @@ private MergeTreeCompactRewriter createRewriter(
mfFactory,
mergeSorter,
wrapperFactory,
lookupStrategy,
lookupStrategy.produceChangelog,
deletionVectorsMaintainer);
} else {
return new MergeTreeCompactRewriter(
Expand All @@ -330,8 +325,7 @@ private MergeTreeCompactRewriter createRewriter(
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter,
deletionVectorsMaintainer);
mergeSorter);
}
}

Expand Down

0 comments on commit f622d80

Please sign in to comment.