Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark][core] spark compact with deletion vector #3971

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
Expand Down Expand Up @@ -93,6 +94,11 @@ public AppendOnlyFileStoreWrite newWrite(String commitUser) {
@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null;
if (options.deletionVectorsEnabled()) {
deletionVectorsMaintainerFactory =
new DeletionVectorsMaintainer.Factory(newIndexFileHandler());
}
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
Expand All @@ -103,6 +109,8 @@ public AppendOnlyFileStoreWrite newWrite(
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
options,
bucketMode(),
deletionVectorsMaintainerFactory,
tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
Expand Down Expand Up @@ -75,6 +77,8 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final List<IndexFileMeta> indexFilesBefore;
private final List<IndexFileMeta> indexFilesAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
private final String spillCompression;
Expand Down Expand Up @@ -121,6 +125,8 @@ public AppendOnlyWriter(
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
this.indexFilesBefore = new ArrayList<>();
this.indexFilesAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
this.spillCompression = spillCompression;
Expand All @@ -139,6 +145,10 @@ public AppendOnlyWriter(
deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
if (increment.indexIncrement() != null) {
indexFilesBefore.addAll(increment.indexIncrement().deletedIndexFiles());
indexFilesAfter.addAll(increment.indexIncrement().newIndexFiles());
}
}
}

Expand Down Expand Up @@ -276,6 +286,11 @@ private void trySyncLatestCompaction(boolean blocking)
result -> {
compactBefore.addAll(result.before());
compactAfter.addAll(result.after());
if (result.indexIncrement() != null) {
indexFilesBefore.addAll(
result.indexIncrement().deletedIndexFiles());
indexFilesAfter.addAll(result.indexIncrement().newIndexFiles());
}
});
}

Expand All @@ -291,12 +306,21 @@ private CommitIncrement drainIncrement() {
new ArrayList<>(compactAfter),
Collections.emptyList());

IndexIncrement indexIncrement = null;
if (!indexFilesBefore.isEmpty() || !indexFilesAfter.isEmpty()) {
indexIncrement =
new IndexIncrement(
new ArrayList<>(indexFilesAfter), new ArrayList<>(indexFilesBefore));
}

newFiles.clear();
deletedFiles.clear();
compactBefore.clear();
compactAfter.clear();
indexFilesBefore.clear();
indexFilesAfter.clear();

return new CommitIncrement(dataIncrement, compactIncrement, null);
return new CommitIncrement(dataIncrement, compactIncrement, indexIncrement, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@
import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.MetricUtils;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
Expand All @@ -36,6 +42,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -52,6 +59,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager {
private static final int FULL_COMPACT_MIN_FILE = 3;

private final ExecutorService executor;
private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
private final TreeSet<DataFileMeta> toCompact;
private final int minFileNum;
private final int maxFileNum;
Expand All @@ -65,12 +73,14 @@ public class BucketedAppendCompactManager extends CompactFutureManager {
public BucketedAppendCompactManager(
ExecutorService executor,
List<DataFileMeta> restored,
@Nullable AppendDeletionFileMaintainer dvIndexFileMaintainer,
int minFileNum,
int maxFileNum,
long targetFileSize,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
this.executor = executor;
this.dvIndexFileMaintainer = dvIndexFileMaintainer;
this.toCompact = new TreeSet<>(fileComparator(false));
this.toCompact.addAll(restored);
this.minFileNum = minFileNum;
Expand All @@ -94,13 +104,20 @@ private void triggerFullCompaction() {
taskFuture == null,
"A compaction task is still running while the user "
+ "forces a new compaction. This is unexpected.");
if (toCompact.size() < FULL_COMPACT_MIN_FILE) {
// if deletion vector enables, always trigger compaction.
if (toCompact.isEmpty()
|| (dvIndexFileMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE)) {
return;
}

taskFuture =
executor.submit(
new FullCompactTask(toCompact, targetFileSize, rewriter, metricsReporter));
new FullCompactTask(
dvIndexFileMaintainer,
toCompact,
targetFileSize,
rewriter,
metricsReporter));
compacting = new ArrayList<>(toCompact);
toCompact.clear();
}
Expand All @@ -113,7 +130,9 @@ private void triggerCompactionWithBestEffort() {
if (picked.isPresent()) {
compacting = picked.get();
taskFuture =
executor.submit(new AutoCompactTask(compacting, rewriter, metricsReporter));
executor.submit(
new AutoCompactTask(
dvIndexFileMaintainer, compacting, rewriter, metricsReporter));
}
}

Expand Down Expand Up @@ -207,52 +226,63 @@ public void close() throws IOException {
/** A {@link CompactTask} impl for full compaction of append-only table. */
public static class FullCompactTask extends CompactTask {

private final LinkedList<DataFileMeta> inputs;
private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
private final LinkedList<DataFileMeta> toCompact;
private final long targetFileSize;
private final CompactRewriter rewriter;

public FullCompactTask(
AppendDeletionFileMaintainer dvIndexFileMaintainer,
Collection<DataFileMeta> inputs,
long targetFileSize,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
this.inputs = new LinkedList<>(inputs);
this.dvIndexFileMaintainer = dvIndexFileMaintainer;
this.toCompact = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
}

@Override
protected CompactResult doCompact() throws Exception {
// remove large files
while (!inputs.isEmpty()) {
DataFileMeta file = inputs.peekFirst();
if (file.fileSize() >= targetFileSize) {
inputs.poll();
while (!toCompact.isEmpty()) {
DataFileMeta file = toCompact.peekFirst();
// the data file with deletion file always need to be compacted.
if (file.fileSize() >= targetFileSize && !hasDeletionFile(file)) {
toCompact.poll();
continue;
}
break;
}

// compute small files
int big = 0;
int small = 0;
for (DataFileMeta file : inputs) {
if (file.fileSize() >= targetFileSize) {
big++;
// do compaction
if (dvIndexFileMaintainer != null) {
// if deletion vector enables, always trigger compaction.
return compact(dvIndexFileMaintainer, toCompact, rewriter);
} else {
// compute small files
int big = 0;
int small = 0;
for (DataFileMeta file : toCompact) {
if (file.fileSize() >= targetFileSize) {
big++;
} else {
small++;
}
}
if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) {
return compact(dvIndexFileMaintainer, toCompact, rewriter);
} else {
small++;
return result(Collections.emptyList(), Collections.emptyList());
}
}
}

// do compaction
List<DataFileMeta> compactBefore = new ArrayList<>();
List<DataFileMeta> compactAfter = new ArrayList<>();
if (small > big && inputs.size() >= FULL_COMPACT_MIN_FILE) {
compactBefore = new ArrayList<>(inputs);
compactAfter = rewriter.rewrite(inputs);
}
return result(new ArrayList<>(compactBefore), compactAfter);
private boolean hasDeletionFile(DataFileMeta file) {
return dvIndexFileMaintainer != null
&& dvIndexFileMaintainer.getDeletionFile(file.fileName()) == null;
}
}

Expand All @@ -265,36 +295,75 @@ protected CompactResult doCompact() throws Exception {
*/
public static class AutoCompactTask extends CompactTask {

private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
private final List<DataFileMeta> toCompact;
private final CompactRewriter rewriter;

public AutoCompactTask(
AppendDeletionFileMaintainer dvIndexFileMaintainer,
List<DataFileMeta> toCompact,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
this.dvIndexFileMaintainer = dvIndexFileMaintainer;
this.toCompact = toCompact;
this.rewriter = rewriter;
}

@Override
protected CompactResult doCompact() throws Exception {
return compact(dvIndexFileMaintainer, toCompact, rewriter);
}
}

private static CompactResult compact(
AppendDeletionFileMaintainer dvIndexFileMaintainer,
List<DataFileMeta> toCompact,
CompactRewriter rewriter)
throws Exception {
if (dvIndexFileMaintainer == null) {
return result(toCompact, rewriter.rewrite(toCompact));
} else {
List<DeletionFile> deletionFiles = new ArrayList<>();
for (DataFileMeta dataFile : toCompact) {
deletionFiles.add(dvIndexFileMaintainer.getDeletionFile(dataFile.fileName()));
}
List<DataFileMeta> compactAfter = rewriter.rewrite(toCompact);
toCompact.forEach(f -> dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName()));

List<IndexManifestEntry> indexManifestEntries = dvIndexFileMaintainer.persist();
if (indexManifestEntries.isEmpty()) {
return result(toCompact, compactAfter);
} else {
List<IndexFileMeta> indexFilesBefore = new ArrayList<>();
List<IndexFileMeta> indexFilesAfter = new ArrayList<>();
for (IndexManifestEntry entry : indexManifestEntries) {
if (entry.kind() == FileKind.ADD) {
indexFilesAfter.add(entry.indexFile());
} else {
indexFilesBefore.add(entry.indexFile());
}
}
return result(toCompact, indexFilesBefore, compactAfter, indexFilesAfter);
}
}
}

private static CompactResult result(List<DataFileMeta> before, List<DataFileMeta> after) {
return new CompactResult() {
@Override
public List<DataFileMeta> before() {
return before;
}
return new CompactResult(before, after);
}

@Override
public List<DataFileMeta> after() {
return after;
}
};
private static CompactResult result(
List<DataFileMeta> before,
@Nullable List<IndexFileMeta> indexFilesBefore,
List<DataFileMeta> after,
@Nullable List<IndexFileMeta> indexFilesAfter) {
CompactResult result = new CompactResult(before, after);
if (indexFilesBefore != null || indexFilesAfter != null) {
IndexIncrement indexIncrement = new IndexIncrement(indexFilesAfter, indexFilesBefore);
result.setIndexIncrement(indexIncrement);
}
return result;
}

/** Compact rewriter for append-only table. */
Expand Down
Loading
Loading