Skip to content

Commit

Permalink
[core] Add avg/max/total compaction io size metric (apache#4392)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Oct 30, 2024
1 parent e7010ee commit 0797e81
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 5 deletions.
36 changes: 33 additions & 3 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
<td>Gauge</td>
<td>Number of buckets written in the last commit.</td>
</tr>
<tr>
<td>lastCompactionInputFileSize</td>
<td>Gauge</td>
<td>Total size of the input files for the last compaction.</td>
</tr>
<tr>
<td>lastCompactionOutputFileSize</td>
<td>Gauge</td>
<td>Total size of the output files for the last compaction.</td>
</tr>
</tbody>
</table>

Expand Down Expand Up @@ -232,17 +242,17 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
<tr>
<td>maxLevel0FileCount</td>
<td>Gauge</td>
<td>The maximum number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time.</td>
<td>The maximum number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time.</td>
</tr>
<tr>
<td>avgLevel0FileCount</td>
<td>Gauge</td>
<td>The average number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time.</td>
<td>The average number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time.</td>
</tr>
<tr>
<td>compactionThreadBusy</td>
<td>Gauge</td>
<td>The maximum business of compaction threads in this parallelism. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time).</td>
<td>The maximum business of compaction threads in this task. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time).</td>
</tr>
<tr>
<td>avgCompactionTime</td>
Expand All @@ -259,6 +269,26 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
<td>Counter</td>
<td>The total number of compactions that are queued/running.</td>
</tr>
<tr>
<td>maxCompactionInputSize</td>
<td>Gauge</td>
<td>The maximum input file size for this task's compaction.</td>
</tr>
<tr>
<td>avgCompactionInputSize/td>
<td>Gauge</td>
<td>The average input file size for this task's compaction.</td>
</tr>
<tr>
<td>maxCompactionOutputSize</td>
<td>Gauge</td>
<td>The maximum output file size for this task's compaction.</td>
</tr>
<tr>
<td>avgCompactionOutputSize</td>
<td>Gauge</td>
<td>The average output file size for this task's compaction.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ public CompactResult call() throws Exception {
metricsReporter.reportCompactionTime(
System.currentTimeMillis() - startMillis);
metricsReporter.increaseCompactionsCompletedCount();
metricsReporter.reportCompactionInputSize(
result.before().stream()
.map(DataFileMeta::fileSize)
.reduce(Long::sum)
.orElse(0L));
metricsReporter.reportCompactionOutputSize(
result.after().stream()
.map(DataFileMeta::fileSize)
.reduce(Long::sum)
.orElse(0L));
}
},
LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public MetricGroup getMetricGroup() {
@VisibleForTesting static final String LAST_PARTITIONS_WRITTEN = "lastPartitionsWritten";
@VisibleForTesting static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten";

static final String LAST_COMPACTION_INPUT_FILE_SIZE = "lastCompactionInputFileSize";
static final String LAST_COMPACTION_OUTPUT_FILE_SIZE = "lastCompactionOutputFileSize";

private void registerGenericCommitMetrics() {
metricGroup.gauge(
LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : latestCommit.getDuration());
Expand Down Expand Up @@ -121,6 +124,12 @@ private void registerGenericCommitMetrics() {
metricGroup.gauge(
LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted());
metricGroup.gauge(
LAST_COMPACTION_INPUT_FILE_SIZE,
() -> latestCommit == null ? 0L : latestCommit.getCompactionInputFileSize());
metricGroup.gauge(
LAST_COMPACTION_OUTPUT_FILE_SIZE,
() -> latestCommit == null ? 0L : latestCommit.getCompactionOutputFileSize());
}

public void reportCommit(CommitStats commitStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;

Expand All @@ -41,6 +42,8 @@ public class CommitStats {
private final long tableFilesAppended;
private final long tableFilesDeleted;
private final long changelogFilesAppended;
private final long compactionInputFileSize;
private final long compactionOutputFileSize;
private final long changelogFilesCompacted;
private final long changelogRecordsCompacted;

Expand All @@ -61,15 +64,28 @@ public CommitStats(
int generatedSnapshots,
int attempts) {
List<ManifestEntry> addedTableFiles = new ArrayList<>(appendTableFiles);
addedTableFiles.addAll(
List<ManifestEntry> compactAfterFiles =
compactTableFiles.stream()
.filter(f -> FileKind.ADD.equals(f.kind()))
.collect(Collectors.toList()));
.collect(Collectors.toList());
addedTableFiles.addAll(compactAfterFiles);
List<ManifestEntry> deletedTableFiles =
compactTableFiles.stream()
.filter(f -> FileKind.DELETE.equals(f.kind()))
.collect(Collectors.toList());

this.compactionInputFileSize =
deletedTableFiles.stream()
.map(ManifestEntry::file)
.map(DataFileMeta::fileSize)
.reduce(Long::sum)
.orElse(0L);
this.compactionOutputFileSize =
compactAfterFiles.stream()
.map(ManifestEntry::file)
.map(DataFileMeta::fileSize)
.reduce(Long::sum)
.orElse(0L);
this.tableFilesAdded = addedTableFiles.size();
this.tableFilesAppended = appendTableFiles.size();
this.tableFilesDeleted = deletedTableFiles.size();
Expand Down Expand Up @@ -203,4 +219,12 @@ protected long getDuration() {
protected int getAttempts() {
return attempts;
}

public long getCompactionInputFileSize() {
return compactionInputFileSize;
}

public long getCompactionOutputFileSize() {
return compactionOutputFileSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public class CompactionMetrics {
public static final String AVG_COMPACTION_TIME = "avgCompactionTime";
public static final String COMPACTION_COMPLETED_COUNT = "compactionCompletedCount";
public static final String COMPACTION_QUEUED_COUNT = "compactionQueuedCount";
public static final String MAX_COMPACTION_INPUT_SIZE = "maxCompactionInputSize";
public static final String MAX_COMPACTION_OUTPUT_SIZE = "maxCompactionOutputSize";
public static final String AVG_COMPACTION_INPUT_SIZE = "avgCompactionInputSize";
public static final String AVG_COMPACTION_OUTPUT_SIZE = "avgCompactionOutputSize";
private static final long BUSY_MEASURE_MILLIS = 60_000;
private static final int COMPACTION_TIME_WINDOW = 100;

Expand Down Expand Up @@ -72,6 +76,17 @@ private void registerGenericCompactionMetrics() {
metricGroup.gauge(MAX_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().max().orElse(-1));
metricGroup.gauge(
AVG_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().average().orElse(-1));
metricGroup.gauge(
MAX_COMPACTION_INPUT_SIZE, () -> getCompactionInputSizeStream().max().orElse(-1));
metricGroup.gauge(
MAX_COMPACTION_OUTPUT_SIZE, () -> getCompactionOutputSizeStream().max().orElse(-1));
metricGroup.gauge(
AVG_COMPACTION_INPUT_SIZE,
() -> getCompactionInputSizeStream().average().orElse(-1));
metricGroup.gauge(
AVG_COMPACTION_OUTPUT_SIZE,
() -> getCompactionOutputSizeStream().average().orElse(-1));

metricGroup.gauge(
AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0));
metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum());
Expand All @@ -84,6 +99,14 @@ private LongStream getLevel0FileCountStream() {
return reporters.values().stream().mapToLong(r -> r.level0FileCount);
}

private LongStream getCompactionInputSizeStream() {
return reporters.values().stream().mapToLong(r -> r.compactionInputSize);
}

private LongStream getCompactionOutputSizeStream() {
return reporters.values().stream().mapToLong(r -> r.compactionOutputSize);
}

private DoubleStream getCompactBusyStream() {
return compactTimers.values().stream()
.mapToDouble(t -> 100.0 * t.calculateLength() / BUSY_MEASURE_MILLIS);
Expand Down Expand Up @@ -112,13 +135,19 @@ public interface Reporter {

void decreaseCompactionsQueuedCount();

void reportCompactionInputSize(long bytes);

void reportCompactionOutputSize(long bytes);

void unregister();
}

private class ReporterImpl implements Reporter {

private final PartitionAndBucket key;
private long level0FileCount;
private long compactionInputSize = 0;
private long compactionOutputSize = 0;

private ReporterImpl(PartitionAndBucket key) {
this.key = key;
Expand All @@ -142,6 +171,16 @@ public void reportCompactionTime(long time) {
}
}

@Override
public void reportCompactionInputSize(long bytes) {
this.compactionInputSize = bytes;
}

@Override
public void reportCompactionOutputSize(long bytes) {
this.compactionOutputSize = bytes;
}

@Override
public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
Expand Down

0 comments on commit 0797e81

Please sign in to comment.