diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md
index e8a16049868d..139bdfff6bfe 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -181,6 +181,16 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
Gauge |
Number of buckets written in the last commit. |
+
+ lastCompactionInputFileSize |
+ Gauge |
+ Total size of the input files for the last compaction. |
+
+
+ lastCompactionOutputFileSize |
+ Gauge |
+ Total size of the output files for the last compaction. |
+
@@ -232,17 +242,17 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
maxLevel0FileCount |
Gauge |
- The maximum number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time. |
+ The maximum number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time. |
avgLevel0FileCount |
Gauge |
- The average number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time. |
+ The average number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time. |
compactionThreadBusy |
Gauge |
- The maximum business of compaction threads in this parallelism. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time). |
+ The maximum business of compaction threads in this task. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time). |
avgCompactionTime |
@@ -259,6 +269,26 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
Counter |
The total number of compactions that are queued/running. |
+
+ maxCompactionInputSize |
+ Gauge |
+ The maximum input file size for this task's compaction. |
+
+
+ avgCompactionInputSize/td>
+ | Gauge |
+ The average input file size for this task's compaction. |
+
+
+ maxCompactionOutputSize |
+ Gauge |
+ The maximum output file size for this task's compaction. |
+
+
+ avgCompactionOutputSize |
+ Gauge |
+ The average output file size for this task's compaction. |
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
index 7da79aa5164a..c8da0f3ef2c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
@@ -54,6 +54,16 @@ public CompactResult call() throws Exception {
metricsReporter.reportCompactionTime(
System.currentTimeMillis() - startMillis);
metricsReporter.increaseCompactionsCompletedCount();
+ metricsReporter.reportCompactionInputSize(
+ result.before().stream()
+ .map(DataFileMeta::fileSize)
+ .reduce(Long::sum)
+ .orElse(0L));
+ metricsReporter.reportCompactionOutputSize(
+ result.after().stream()
+ .map(DataFileMeta::fileSize)
+ .reduce(Long::sum)
+ .orElse(0L));
}
},
LOG);
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
index 49476db7fad4..0f8ccbc65ce2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
@@ -76,6 +76,9 @@ public MetricGroup getMetricGroup() {
@VisibleForTesting static final String LAST_PARTITIONS_WRITTEN = "lastPartitionsWritten";
@VisibleForTesting static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten";
+ static final String LAST_COMPACTION_INPUT_FILE_SIZE = "lastCompactionInputFileSize";
+ static final String LAST_COMPACTION_OUTPUT_FILE_SIZE = "lastCompactionOutputFileSize";
+
private void registerGenericCommitMetrics() {
metricGroup.gauge(
LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : latestCommit.getDuration());
@@ -121,6 +124,12 @@ private void registerGenericCommitMetrics() {
metricGroup.gauge(
LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted());
+ metricGroup.gauge(
+ LAST_COMPACTION_INPUT_FILE_SIZE,
+ () -> latestCommit == null ? 0L : latestCommit.getCompactionInputFileSize());
+ metricGroup.gauge(
+ LAST_COMPACTION_OUTPUT_FILE_SIZE,
+ () -> latestCommit == null ? 0L : latestCommit.getCompactionOutputFileSize());
}
public void reportCommit(CommitStats commitStats) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
index a19d93508a90..657bd6aae153 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
@@ -20,6 +20,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
@@ -41,6 +42,8 @@ public class CommitStats {
private final long tableFilesAppended;
private final long tableFilesDeleted;
private final long changelogFilesAppended;
+ private final long compactionInputFileSize;
+ private final long compactionOutputFileSize;
private final long changelogFilesCompacted;
private final long changelogRecordsCompacted;
@@ -61,15 +64,28 @@ public CommitStats(
int generatedSnapshots,
int attempts) {
List addedTableFiles = new ArrayList<>(appendTableFiles);
- addedTableFiles.addAll(
+ List compactAfterFiles =
compactTableFiles.stream()
.filter(f -> FileKind.ADD.equals(f.kind()))
- .collect(Collectors.toList()));
+ .collect(Collectors.toList());
+ addedTableFiles.addAll(compactAfterFiles);
List deletedTableFiles =
compactTableFiles.stream()
.filter(f -> FileKind.DELETE.equals(f.kind()))
.collect(Collectors.toList());
+ this.compactionInputFileSize =
+ deletedTableFiles.stream()
+ .map(ManifestEntry::file)
+ .map(DataFileMeta::fileSize)
+ .reduce(Long::sum)
+ .orElse(0L);
+ this.compactionOutputFileSize =
+ compactAfterFiles.stream()
+ .map(ManifestEntry::file)
+ .map(DataFileMeta::fileSize)
+ .reduce(Long::sum)
+ .orElse(0L);
this.tableFilesAdded = addedTableFiles.size();
this.tableFilesAppended = appendTableFiles.size();
this.tableFilesDeleted = deletedTableFiles.size();
@@ -203,4 +219,12 @@ protected long getDuration() {
protected int getAttempts() {
return attempts;
}
+
+ public long getCompactionInputFileSize() {
+ return compactionInputFileSize;
+ }
+
+ public long getCompactionOutputFileSize() {
+ return compactionOutputFileSize;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index b0940a939e18..a3074daebbde 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -44,6 +44,10 @@ public class CompactionMetrics {
public static final String AVG_COMPACTION_TIME = "avgCompactionTime";
public static final String COMPACTION_COMPLETED_COUNT = "compactionCompletedCount";
public static final String COMPACTION_QUEUED_COUNT = "compactionQueuedCount";
+ public static final String MAX_COMPACTION_INPUT_SIZE = "maxCompactionInputSize";
+ public static final String MAX_COMPACTION_OUTPUT_SIZE = "maxCompactionOutputSize";
+ public static final String AVG_COMPACTION_INPUT_SIZE = "avgCompactionInputSize";
+ public static final String AVG_COMPACTION_OUTPUT_SIZE = "avgCompactionOutputSize";
private static final long BUSY_MEASURE_MILLIS = 60_000;
private static final int COMPACTION_TIME_WINDOW = 100;
@@ -72,6 +76,17 @@ private void registerGenericCompactionMetrics() {
metricGroup.gauge(MAX_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().max().orElse(-1));
metricGroup.gauge(
AVG_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().average().orElse(-1));
+ metricGroup.gauge(
+ MAX_COMPACTION_INPUT_SIZE, () -> getCompactionInputSizeStream().max().orElse(-1));
+ metricGroup.gauge(
+ MAX_COMPACTION_OUTPUT_SIZE, () -> getCompactionOutputSizeStream().max().orElse(-1));
+ metricGroup.gauge(
+ AVG_COMPACTION_INPUT_SIZE,
+ () -> getCompactionInputSizeStream().average().orElse(-1));
+ metricGroup.gauge(
+ AVG_COMPACTION_OUTPUT_SIZE,
+ () -> getCompactionOutputSizeStream().average().orElse(-1));
+
metricGroup.gauge(
AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0));
metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum());
@@ -84,6 +99,14 @@ private LongStream getLevel0FileCountStream() {
return reporters.values().stream().mapToLong(r -> r.level0FileCount);
}
+ private LongStream getCompactionInputSizeStream() {
+ return reporters.values().stream().mapToLong(r -> r.compactionInputSize);
+ }
+
+ private LongStream getCompactionOutputSizeStream() {
+ return reporters.values().stream().mapToLong(r -> r.compactionOutputSize);
+ }
+
private DoubleStream getCompactBusyStream() {
return compactTimers.values().stream()
.mapToDouble(t -> 100.0 * t.calculateLength() / BUSY_MEASURE_MILLIS);
@@ -112,6 +135,10 @@ public interface Reporter {
void decreaseCompactionsQueuedCount();
+ void reportCompactionInputSize(long bytes);
+
+ void reportCompactionOutputSize(long bytes);
+
void unregister();
}
@@ -119,6 +146,8 @@ private class ReporterImpl implements Reporter {
private final PartitionAndBucket key;
private long level0FileCount;
+ private long compactionInputSize = 0;
+ private long compactionOutputSize = 0;
private ReporterImpl(PartitionAndBucket key) {
this.key = key;
@@ -142,6 +171,16 @@ public void reportCompactionTime(long time) {
}
}
+ @Override
+ public void reportCompactionInputSize(long bytes) {
+ this.compactionInputSize = bytes;
+ }
+
+ @Override
+ public void reportCompactionOutputSize(long bytes) {
+ this.compactionOutputSize = bytes;
+ }
+
@Override
public void reportLevel0FileCount(long count) {
this.level0FileCount = count;