Skip to content

Commit

Permalink
[core] Support compaction metrics (apache#2224)
Browse files Browse the repository at this point in the history
This closes apache#2224.
  • Loading branch information
schnappi17 authored Nov 8, 2023
1 parent c3cadc0 commit f09f63f
Show file tree
Hide file tree
Showing 27 changed files with 588 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {

private final RowType bucketKeyType;
private final RowType rowType;
private final String tableName;

public AppendOnlyFileStore(
FileIO fileIO,
Expand All @@ -51,10 +52,12 @@ public AppendOnlyFileStore(
CoreOptions options,
RowType partitionType,
RowType bucketKeyType,
RowType rowType) {
RowType rowType,
String tableName) {
super(fileIO, schemaManager, schemaId, options, partitionType);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
this.tableName = tableName;
}

@Override
Expand Down Expand Up @@ -95,7 +98,8 @@ public AppendOnlyFileStoreWrite newWrite(
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
options);
options,
tableName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<RecordEqualiser> valueEqualiserSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final String tableName;

public KeyValueFileStore(
FileIO fileIO,
Expand All @@ -77,7 +78,8 @@ public KeyValueFileStore(
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory) {
MergeFunctionFactory<KeyValue> mfFactory,
String tableName) {
super(fileIO, schemaManager, schemaId, options, partitionType);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
Expand All @@ -87,6 +89,7 @@ public KeyValueFileStore(
this.mfFactory = mfFactory;
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
this.valueEqualiserSupplier = new ValueEqualiserSupplier(valueType);
this.tableName = tableName;
}

@Override
Expand Down Expand Up @@ -147,7 +150,8 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor);
keyValueFieldsExtractor,
tableName);
}

private Map<String, FileStorePathFactory> format2PathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -56,20 +59,24 @@ public class AppendOnlyCompactManager extends CompactFutureManager {

private List<DataFileMeta> compacting;

@Nullable private final CompactionMetrics metrics;

public AppendOnlyCompactManager(
ExecutorService executor,
List<DataFileMeta> restored,
int minFileNum,
int maxFileNum,
long targetFileSize,
CompactRewriter rewriter) {
CompactRewriter rewriter,
@Nullable CompactionMetrics metrics) {
this.executor = executor;
this.toCompact = new TreeSet<>(fileComparator());
this.toCompact.addAll(restored);
this.minFileNum = minFileNum;
this.maxFileNum = maxFileNum;
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
this.metrics = metrics;
}

@Override
Expand All @@ -90,7 +97,8 @@ private void triggerFullCompaction() {
return;
}

taskFuture = executor.submit(new FullCompactTask(toCompact, targetFileSize, rewriter));
taskFuture =
executor.submit(new FullCompactTask(toCompact, targetFileSize, rewriter, metrics));
compacting = new ArrayList<>(toCompact);
toCompact.clear();
}
Expand All @@ -102,7 +110,7 @@ private void triggerCompactionWithBestEffort() {
Optional<List<DataFileMeta>> picked = pickCompactBefore();
if (picked.isPresent()) {
compacting = picked.get();
taskFuture = executor.submit(new AutoCompactTask(compacting, rewriter));
taskFuture = executor.submit(new AutoCompactTask(compacting, rewriter, metrics));
}
}

Expand Down Expand Up @@ -197,7 +205,11 @@ public static class FullCompactTask extends CompactTask {
private final CompactRewriter rewriter;

public FullCompactTask(
Collection<DataFileMeta> inputs, long targetFileSize, CompactRewriter rewriter) {
Collection<DataFileMeta> inputs,
long targetFileSize,
CompactRewriter rewriter,
@Nullable CompactionMetrics metrics) {
super(metrics);
this.inputs = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
Expand Down Expand Up @@ -249,7 +261,11 @@ public static class AutoCompactTask extends CompactTask {
private final List<DataFileMeta> toCompact;
private final CompactRewriter rewriter;

public AutoCompactTask(List<DataFileMeta> toCompact, CompactRewriter rewriter) {
public AutoCompactTask(
List<DataFileMeta> toCompact,
CompactRewriter rewriter,
@Nullable CompactionMetrics metrics) {
super(metrics);
this.toCompact = toCompact;
this.rewriter = rewriter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.CompactionStats;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.Preconditions;
Expand Down Expand Up @@ -61,15 +63,27 @@ public List<DataFileMeta> compactAfter() {
}

public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws Exception {
compactAfter.addAll(write.compactRewriter(partition, 0).rewrite(compactBefore));
CompactIncrement compactIncrement =
new CompactIncrement(compactBefore, compactAfter, Collections.emptyList());
return new CommitMessageImpl(
partition,
0, // bucket 0 is bucket for unaware-bucket table for compatibility with the old
// design
NewFilesIncrement.emptyIncrement(),
compactIncrement);
CompactionMetrics metrics = write.getCompactionMetrics(partition, 0);
long startMillis = System.currentTimeMillis();
try {
compactAfter.addAll(write.compactRewriter(partition, 0).rewrite(compactBefore));
CompactIncrement compactIncrement =
new CompactIncrement(compactBefore, compactAfter, Collections.emptyList());
return new CommitMessageImpl(
partition,
0, // bucket 0 is bucket for unaware-bucket table for compatibility with the old
// design
NewFilesIncrement.emptyIncrement(),
compactIncrement);
} finally {
if (metrics != null) {
long duration = System.currentTimeMillis() - startMillis;
CompactionStats compactionStats =
new CompactionStats(
duration, compactBefore, compactAfter, Collections.emptyList());
metrics.reportCompaction(compactionStats);
}
}
}

public int hashCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,57 @@
package org.apache.paimon.compact;

import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.CompactionStats;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;

/** Compact task. */
public abstract class CompactTask implements Callable<CompactResult> {

private static final Logger LOG = LoggerFactory.getLogger(CompactTask.class);
@Nullable private final CompactionMetrics metrics;

public CompactTask(@Nullable CompactionMetrics metrics) {
this.metrics = metrics;
}

@Override
public CompactResult call() throws Exception {
long startMillis = System.currentTimeMillis();
CompactResult result = doCompact();
CompactResult result = null;
try {
result = doCompact();

if (LOG.isDebugEnabled()) {
logMetric(startMillis, result.before(), result.after());
if (LOG.isDebugEnabled()) {
logMetric(startMillis, result.before(), result.after());
}
return result;
} finally {
if (metrics != null) {
long duration = System.currentTimeMillis() - startMillis;
CompactionStats compactionStats =
result == null
? new CompactionStats(
duration,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList())
: new CompactionStats(
duration,
result.before(),
result.after(),
result.changelog());
metrics.reportCompaction(compactionStats);
}
}

return result;
}

protected String logMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
Expand All @@ -53,21 +56,25 @@ public class MergeTreeCompactManager extends CompactFutureManager {
private final int numSortedRunStopTrigger;
private final CompactRewriter rewriter;

@Nullable private final CompactionMetrics metrics;

public MergeTreeCompactManager(
ExecutorService executor,
Levels levels,
CompactStrategy strategy,
Comparator<InternalRow> keyComparator,
long compactionFileSize,
int numSortedRunStopTrigger,
CompactRewriter rewriter) {
CompactRewriter rewriter,
@Nullable CompactionMetrics metrics) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
this.compactionFileSize = compactionFileSize;
this.numSortedRunStopTrigger = numSortedRunStopTrigger;
this.keyComparator = keyComparator;
this.rewriter = rewriter;
this.metrics = metrics;
}

@Override
Expand Down Expand Up @@ -162,7 +169,7 @@ public Levels levels() {
private void submitCompaction(CompactUnit unit, boolean dropDelete) {
MergeTreeCompactTask task =
new MergeTreeCompactTask(
keyComparator, compactionFileSize, rewriter, unit, dropDelete);
keyComparator, compactionFileSize, rewriter, unit, dropDelete, metrics);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for compaction: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.operation.metrics.CompactionMetrics;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -50,7 +53,9 @@ public MergeTreeCompactTask(
long minFileSize,
CompactRewriter rewriter,
CompactUnit unit,
boolean dropDelete) {
boolean dropDelete,
@Nullable CompactionMetrics metrics) {
super(metrics);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
this.outputLevel = unit.outputLevel();
Expand Down
Loading

0 comments on commit f09f63f

Please sign in to comment.