From e814e4c7198068ba94a7a62e75344d8883b87db4 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Mon, 28 Oct 2024 18:05:34 +0800 Subject: [PATCH] [core] Add avg/max/total compaction io size metric --- docs/content/maintenance/metrics.md | 36 +++++++++++++++-- .../apache/paimon/compact/CompactTask.java | 10 +++++ .../operation/metrics/CommitMetrics.java | 9 +++++ .../paimon/operation/metrics/CommitStats.java | 28 ++++++++++++- .../operation/metrics/CompactionMetrics.java | 39 +++++++++++++++++++ 5 files changed, 117 insertions(+), 5 deletions(-) diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md index e8a16049868d..139bdfff6bfe 100644 --- a/docs/content/maintenance/metrics.md +++ b/docs/content/maintenance/metrics.md @@ -181,6 +181,16 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca Gauge Number of buckets written in the last commit. + + lastCompactionInputFileSize + Gauge + Total size of the input files for the last compaction. + + + lastCompactionOutputFileSize + Gauge + Total size of the output files for the last compaction. + @@ -232,17 +242,17 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca maxLevel0FileCount Gauge - The maximum number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time. + The maximum number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time. avgLevel0FileCount Gauge - The average number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time. + The average number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time. compactionThreadBusy Gauge - The maximum business of compaction threads in this parallelism. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time). + The maximum business of compaction threads in this task. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time). avgCompactionTime @@ -259,6 +269,26 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca Counter The total number of compactions that are queued/running. + + maxCompactionInputSize + Gauge + The maximum input file size for this task's compaction. + + + avgCompactionInputSize/td> + Gauge + The average input file size for this task's compaction. + + + maxCompactionOutputSize + Gauge + The maximum output file size for this task's compaction. + + + avgCompactionOutputSize + Gauge + The average output file size for this task's compaction. + diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java index 7da79aa5164a..c8da0f3ef2c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java @@ -54,6 +54,16 @@ public CompactResult call() throws Exception { metricsReporter.reportCompactionTime( System.currentTimeMillis() - startMillis); metricsReporter.increaseCompactionsCompletedCount(); + metricsReporter.reportCompactionInputSize( + result.before().stream() + .map(DataFileMeta::fileSize) + .reduce(Long::sum) + .orElse(0L)); + metricsReporter.reportCompactionOutputSize( + result.after().stream() + .map(DataFileMeta::fileSize) + .reduce(Long::sum) + .orElse(0L)); } }, LOG); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java index 49476db7fad4..0f8ccbc65ce2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java @@ -76,6 +76,9 @@ public MetricGroup getMetricGroup() { @VisibleForTesting static final String LAST_PARTITIONS_WRITTEN = "lastPartitionsWritten"; @VisibleForTesting static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten"; + static final String LAST_COMPACTION_INPUT_FILE_SIZE = "lastCompactionInputFileSize"; + static final String LAST_COMPACTION_OUTPUT_FILE_SIZE = "lastCompactionOutputFileSize"; + private void registerGenericCommitMetrics() { metricGroup.gauge( LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : latestCommit.getDuration()); @@ -121,6 +124,12 @@ private void registerGenericCommitMetrics() { metricGroup.gauge( LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED, () -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted()); + metricGroup.gauge( + LAST_COMPACTION_INPUT_FILE_SIZE, + () -> latestCommit == null ? 0L : latestCommit.getCompactionInputFileSize()); + metricGroup.gauge( + LAST_COMPACTION_OUTPUT_FILE_SIZE, + () -> latestCommit == null ? 0L : latestCommit.getCompactionOutputFileSize()); } public void reportCommit(CommitStats commitStats) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java index a19d93508a90..657bd6aae153 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; @@ -41,6 +42,8 @@ public class CommitStats { private final long tableFilesAppended; private final long tableFilesDeleted; private final long changelogFilesAppended; + private final long compactionInputFileSize; + private final long compactionOutputFileSize; private final long changelogFilesCompacted; private final long changelogRecordsCompacted; @@ -61,15 +64,28 @@ public CommitStats( int generatedSnapshots, int attempts) { List addedTableFiles = new ArrayList<>(appendTableFiles); - addedTableFiles.addAll( + List compactAfterFiles = compactTableFiles.stream() .filter(f -> FileKind.ADD.equals(f.kind())) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + addedTableFiles.addAll(compactAfterFiles); List deletedTableFiles = compactTableFiles.stream() .filter(f -> FileKind.DELETE.equals(f.kind())) .collect(Collectors.toList()); + this.compactionInputFileSize = + deletedTableFiles.stream() + .map(ManifestEntry::file) + .map(DataFileMeta::fileSize) + .reduce(Long::sum) + .orElse(0L); + this.compactionOutputFileSize = + compactAfterFiles.stream() + .map(ManifestEntry::file) + .map(DataFileMeta::fileSize) + .reduce(Long::sum) + .orElse(0L); this.tableFilesAdded = addedTableFiles.size(); this.tableFilesAppended = appendTableFiles.size(); this.tableFilesDeleted = deletedTableFiles.size(); @@ -203,4 +219,12 @@ protected long getDuration() { protected int getAttempts() { return attempts; } + + public long getCompactionInputFileSize() { + return compactionInputFileSize; + } + + public long getCompactionOutputFileSize() { + return compactionOutputFileSize; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java index b0940a939e18..a3074daebbde 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java @@ -44,6 +44,10 @@ public class CompactionMetrics { public static final String AVG_COMPACTION_TIME = "avgCompactionTime"; public static final String COMPACTION_COMPLETED_COUNT = "compactionCompletedCount"; public static final String COMPACTION_QUEUED_COUNT = "compactionQueuedCount"; + public static final String MAX_COMPACTION_INPUT_SIZE = "maxCompactionInputSize"; + public static final String MAX_COMPACTION_OUTPUT_SIZE = "maxCompactionOutputSize"; + public static final String AVG_COMPACTION_INPUT_SIZE = "avgCompactionInputSize"; + public static final String AVG_COMPACTION_OUTPUT_SIZE = "avgCompactionOutputSize"; private static final long BUSY_MEASURE_MILLIS = 60_000; private static final int COMPACTION_TIME_WINDOW = 100; @@ -72,6 +76,17 @@ private void registerGenericCompactionMetrics() { metricGroup.gauge(MAX_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().max().orElse(-1)); metricGroup.gauge( AVG_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().average().orElse(-1)); + metricGroup.gauge( + MAX_COMPACTION_INPUT_SIZE, () -> getCompactionInputSizeStream().max().orElse(-1)); + metricGroup.gauge( + MAX_COMPACTION_OUTPUT_SIZE, () -> getCompactionOutputSizeStream().max().orElse(-1)); + metricGroup.gauge( + AVG_COMPACTION_INPUT_SIZE, + () -> getCompactionInputSizeStream().average().orElse(-1)); + metricGroup.gauge( + AVG_COMPACTION_OUTPUT_SIZE, + () -> getCompactionOutputSizeStream().average().orElse(-1)); + metricGroup.gauge( AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0)); metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum()); @@ -84,6 +99,14 @@ private LongStream getLevel0FileCountStream() { return reporters.values().stream().mapToLong(r -> r.level0FileCount); } + private LongStream getCompactionInputSizeStream() { + return reporters.values().stream().mapToLong(r -> r.compactionInputSize); + } + + private LongStream getCompactionOutputSizeStream() { + return reporters.values().stream().mapToLong(r -> r.compactionOutputSize); + } + private DoubleStream getCompactBusyStream() { return compactTimers.values().stream() .mapToDouble(t -> 100.0 * t.calculateLength() / BUSY_MEASURE_MILLIS); @@ -112,6 +135,10 @@ public interface Reporter { void decreaseCompactionsQueuedCount(); + void reportCompactionInputSize(long bytes); + + void reportCompactionOutputSize(long bytes); + void unregister(); } @@ -119,6 +146,8 @@ private class ReporterImpl implements Reporter { private final PartitionAndBucket key; private long level0FileCount; + private long compactionInputSize = 0; + private long compactionOutputSize = 0; private ReporterImpl(PartitionAndBucket key) { this.key = key; @@ -142,6 +171,16 @@ public void reportCompactionTime(long time) { } } + @Override + public void reportCompactionInputSize(long bytes) { + this.compactionInputSize = bytes; + } + + @Override + public void reportCompactionOutputSize(long bytes) { + this.compactionOutputSize = bytes; + } + @Override public void reportLevel0FileCount(long count) { this.level0FileCount = count;