Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jan 20, 2024
1 parent f67a432 commit 6c8c9a6
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ public class CoreOptions implements Serializable {
.intType()
.noDefaultValue()
.withDescription(
"Total level number, for example, there are 3 levels, including 0,1,2 levels.");
"Total level number, for example, there are 3 levels, including 0,1,2 levels,"
+ " the default value is 'num-sorted-run.compaction-trigger' + 1.");

public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT =
key("commit.force-compact")
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public RowKind valueKind() {
return valueKind;
}

public boolean isAdd() {
return valueKind.isAdd();
}

public InternalRow value() {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class AppendOnlyCompactManager extends CompactFutureManager {
private final long targetFileSize;
private final CompactRewriter rewriter;

private List<DataFileMeta> compacting;
@Nullable private List<DataFileMeta> compacting;

@Nullable private final CompactionMetrics metrics;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public CompactResult(List<DataFileMeta> before, List<DataFileMeta> after) {
this(before, after, Collections.emptyList());
}

public CompactResult(DataFileMeta before, DataFileMeta after, List<DataFileMeta> changelog) {
this(Collections.singletonList(before), Collections.singletonList(after), changelog);
}

public CompactResult(
List<DataFileMeta> before, List<DataFileMeta> after, List<DataFileMeta> changelog) {
this.before = new ArrayList<>(before);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/** A {@link MergeTreeCompactRewriter} which produces changelog files for the compaction. */
/**
* A {@link MergeTreeCompactRewriter} which produces changelog files while performing compaction.
*/
public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewriter {

protected final RecordEqualiser valueEqualiser;
Expand All @@ -55,10 +58,10 @@ public ChangelogMergeTreeRewriter(
this.changelogRowDeduplicate = changelogRowDeduplicate;
}

protected abstract boolean rewriteChangelog(
protected abstract boolean needRewriteWithChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections);

protected abstract boolean upgradeChangelog(int outputLevel, DataFileMeta file);
protected abstract boolean needUpgradeWithChangelog(int outputLevel, DataFileMeta file);

protected abstract MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel);

Expand All @@ -82,15 +85,16 @@ protected boolean rewriteLookupChangelog(int outputLevel, List<List<SortedRun>>
@Override
public CompactResult rewrite(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {
if (rewriteChangelog(outputLevel, dropDelete, sections)) {
return rewriteChangelogCompaction(outputLevel, sections);
if (needRewriteWithChangelog(outputLevel, dropDelete, sections)) {
return rewriteWithChangelog(outputLevel, sections);
} else {
return rewriteCompaction(outputLevel, dropDelete, sections);
return super.rewrite(outputLevel, dropDelete, sections);
}
}

private CompactResult rewriteChangelogCompaction(
int outputLevel, List<List<SortedRun>> sections) throws Exception {
/** Rewrite and produce changelog at the same time. */
private CompactResult rewriteWithChangelog(int outputLevel, List<List<SortedRun>> sections)
throws Exception {
List<ConcatRecordReader.ReaderSupplier<ChangelogResult>> sectionReaders = new ArrayList<>();
for (List<SortedRun> section : sections) {
sectionReaders.add(
Expand Down Expand Up @@ -141,13 +145,47 @@ private CompactResult rewriteChangelogCompaction(

@Override
public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception {
if (upgradeChangelog(outputLevel, file)) {
return rewriteChangelogCompaction(
outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))));
if (needUpgradeWithChangelog(outputLevel, file)) {
return updateWithChangelog(outputLevel, file);
} else {
return super.upgrade(outputLevel, file);
}
}

/** Update and produce changelog at the same time. */
private CompactResult updateWithChangelog(int outputLevel, DataFileMeta file) throws Exception {
RecordReader<ChangelogResult> sectionReader =
MergeTreeReaders.readerForSection(
Collections.singletonList(SortedRun.fromSingle(file)),
readerFactory,
keyComparator,
createMergeWrapper(outputLevel),
mergeSorter);

RecordReaderIterator<ChangelogResult> iterator = null;
RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;

try {
iterator =
new RecordReaderIterator<>(
ConcatRecordReader.create(
Collections.singletonList(() -> sectionReader)));
changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel);

while (iterator.hasNext()) {
ChangelogResult result = iterator.next();
for (KeyValue kv : result.changelogs()) {
changelogFileWriter.write(kv);
}
}
} finally {
if (iterator != null) {
iterator.close();
}
if (changelogFileWriter != null) {
changelogFileWriter.close();
}
}
return new CompactResult(file, file.upgrade(outputLevel), changelogFileWriter.result());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,30 @@
import java.io.Closeable;
import java.util.List;

/** Rewrite sections to the files. */
/** Rewrite sections to new level. */
public interface CompactRewriter extends Closeable {

/**
* Rewrite sections to new level.
*
* @param outputLevel new level
* @param dropDelete whether to drop the deletion, see {@link
* MergeTreeCompactManager#triggerCompaction}
* @param sections list of sections (section is a list of {@link SortedRun}s, and key intervals
* between sections do not overlap)
* @return compaction result
* @throws Exception exception
*/
CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)
throws Exception;

/**
* Update file to new level, usually file data is not rewritten, only the metadata is updated.
*
* @param outputLevel new level
* @param file file to be updated
* @return compaction result
* @throws Exception exception
*/
CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface CompactStrategy {
* <li>level 0 is special, one run per file; all other levels are one run per level.
* <li>compaction is sequential from small level to large level.
* </ul>
*
* @return if CompactUnit is not empty, it must contain all runs in level 0.
*/
Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ public FirstRowMergeTreeCompactRewriter(
}

@Override
protected boolean rewriteChangelog(
protected boolean needRewriteWithChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) {
return rewriteLookupChangelog(outputLevel, sections);
}

@Override
protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
protected boolean needUpgradeWithChangelog(int outputLevel, DataFileMeta file) {
return file.level() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ public ChangelogResult getResult() {
if (isInitialized) {
KeyValue merged = mergeFunction.getResult();
if (topLevelKv == null) {
if (merged != null && isAdd(merged)) {
if (merged != null && merged.isAdd()) {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, merged));
}
} else {
if (merged == null || !isAdd(merged)) {
if (merged == null || !merged.isAdd()) {
reusedResult.addChangelog(replace(reusedBefore, RowKind.DELETE, topLevelKv));
} else if (!changelogRowDeduplicate
|| !valueEqualiser.equals(topLevelKv.value(), merged.value())) {
Expand All @@ -115,7 +115,7 @@ public ChangelogResult getResult() {
}
return reusedResult.setResultIfNotRetract(merged);
} else {
if (topLevelKv == null && isAdd(initialKv)) {
if (topLevelKv == null && initialKv.isAdd()) {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, initialKv));
}
// either topLevelKv is not null, but there is only one kv,
Expand All @@ -129,8 +129,4 @@ public ChangelogResult getResult() {
private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
return reused.replace(from.key(), from.sequenceNumber(), valueKind, from.value());
}

private boolean isAdd(KeyValue kv) {
return kv.valueKind() == RowKind.INSERT || kv.valueKind() == RowKind.UPDATE_AFTER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public FullChangelogMergeTreeCompactRewriter(
}

@Override
protected boolean rewriteChangelog(
protected boolean needRewriteWithChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) {
boolean changelog = outputLevel == maxLevel;
if (changelog) {
Expand All @@ -70,7 +70,7 @@ protected boolean rewriteChangelog(
}

@Override
protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
protected boolean needUpgradeWithChangelog(int outputLevel, DataFileMeta file) {
return outputLevel == maxLevel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public ChangelogResult getResult() {

// 2. With level 0, with the latest high level, return changelog
if (highLevel != null) {
// For first row, we should just return old value. And produce no changelog.
setChangelog(highLevel, result);
return reusedResult.setResult(result);
}
Expand All @@ -121,12 +120,12 @@ public ChangelogResult getResult() {
}

private void setChangelog(KeyValue before, KeyValue after) {
if (before == null || !isAdd(before)) {
if (isAdd(after)) {
if (before == null || !before.isAdd()) {
if (after.isAdd()) {
reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after));
}
} else {
if (!isAdd(after)) {
if (!after.isAdd()) {
reusedResult.addChangelog(replaceBefore(RowKind.DELETE, before));
} else if (!changelogRowDeduplicate
|| !valueEqualiser.equals(before.value(), after.value())) {
Expand All @@ -148,8 +147,4 @@ private KeyValue replaceAfter(RowKind valueKind, KeyValue from) {
private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
return reused.replace(from.key(), from.sequenceNumber(), valueKind, from.value());
}

private boolean isAdd(KeyValue kv) {
return kv.valueKind() == RowKind.INSERT || kv.valueKind() == RowKind.UPDATE_AFTER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ public LookupMergeTreeCompactRewriter(
}

@Override
protected boolean rewriteChangelog(
protected boolean needRewriteWithChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) {
return rewriteLookupChangelog(outputLevel, sections);
}

@Override
protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
protected boolean needUpgradeWithChangelog(int outputLevel, DataFileMeta file) {
return file.level() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -202,6 +203,12 @@ CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {
outputLevel = maxLevel;
}

if (runCount > 0) {
Preconditions.checkState(outputLevel > 0, "output level must be greater than 0");
Preconditions.checkState(
runCount == runs.size() || runs.get(runCount).level() > 0,
"next run's level must be greater than 0 (all files in level 0 must be picked");
}
return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));
}

Expand Down

0 comments on commit 6c8c9a6

Please sign in to comment.