Skip to content

Commit

Permalink
[core] Add level0 file count metric (apache#2671)
Browse files Browse the repository at this point in the history
This closes apache#2671.
  • Loading branch information
JingsongLi authored Jan 12, 2024
1 parent d1696aa commit b7b25a1
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
</tr>
</thead>
<tbody>
<tr>
<td>level0FileCount</td>
<td>Bucket</td>
<td>Gauge</td>
<td>The level 0 file count will become larger if asynchronous compaction cannot be done in time.</td>
</tr>
<tr>
<td>lastCompactionDuration</td>
<td>Bucket</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private long newSequenceNumber;
private WriteBuffer writeBuffer;

private WriterMetrics writerMetrics;
private final WriterMetrics writerMetrics;

public MergeTreeWriter(
boolean writeBufferSpillable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public MergeTreeCompactManager(
this.keyComparator = keyComparator;
this.rewriter = rewriter;
this.metrics = metrics;
reportLevel0FileCount();
}

@Override
Expand All @@ -91,6 +92,7 @@ public boolean shouldWaitForPreparingCheckpoint() {
@Override
public void addNewFile(DataFileMeta file) {
levels.addLevel0File(file);
reportLevel0FileCount();
}

@Override
Expand Down Expand Up @@ -198,6 +200,7 @@ public Optional<CompactResult> getCompactionResult(boolean blocking)
r.after());
}
levels.update(r.before(), r.after());
reportLevel0FileCount();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Levels in compact manager updated. Current runs are\n{}",
Expand All @@ -207,6 +210,12 @@ public Optional<CompactResult> getCompactionResult(boolean blocking)
return result;
}

private void reportLevel0FileCount() {
if (metrics != null) {
metrics.reportLevel0FileCount(levels.level0().size());
}
}

@Override
public void close() throws IOException {
rewriter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public MetricGroup getMetricGroup() {

private Histogram durationHistogram;
private CompactionStats latestCompaction;
private long level0FileCount = -1;

@VisibleForTesting static final String LAST_COMPACTION_DURATION = "lastCompactionDuration";
@VisibleForTesting static final String COMPACTION_DURATION = "compactionDuration";
Expand All @@ -66,6 +67,8 @@ public MetricGroup getMetricGroup() {
@VisibleForTesting
static final String LAST_REWRITE_CHANGELOG_FILE_SIZE = "lastRewriteChangelogFileSize";

@VisibleForTesting static final String LEVEL_0_FILE_COUNT = "level0FileCount";

private void registerGenericCompactionMetrics() {
metricGroup.gauge(
LAST_COMPACTION_DURATION,
Expand Down Expand Up @@ -98,13 +101,18 @@ private void registerGenericCompactionMetrics() {
latestCompaction == null
? 0L
: latestCompaction.getRewriteChangelogFileSize());
metricGroup.gauge(LEVEL_0_FILE_COUNT, () -> level0FileCount);
}

public void reportCompaction(CompactionStats compactionStats) {
latestCompaction = compactionStats;
durationHistogram.update(compactionStats.getDuration());
}

public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
}

public void close() {
metricGroup.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ public class WriterMetrics {
private static final String GROUP_NAME = "writer";

private static final int WINDOW_SAMPLE_SIZE = 100;
private static final String WRITE_RECORD_NUM = "writeRecordCount";

private static final String WRITE_RECORD_NUM = "writeRecordCount";
private static final String FLUSH_COST_MILLIS = "flushCostMillis";

public static final String PREPARE_COMMIT_COST_MILLIS = "prepareCommitCostMillis";

private final Counter writeRecordNumCounter;
Expand All @@ -41,14 +40,12 @@ public class WriterMetrics {

private final Histogram prepareCommitCostMillis;

private MetricGroup metricGroup;
private final MetricGroup metricGroup;

public WriterMetrics(MetricRegistry registry, String tableName, String parition, int bucket) {
metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName, parition, bucket);
public WriterMetrics(MetricRegistry registry, String tableName, String partition, int bucket) {
metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName, partition, bucket);
writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM);

bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS, WINDOW_SAMPLE_SIZE);

prepareCommitCostMillis =
metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS, WINDOW_SAMPLE_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public void testMetricsAreUpdated() {
(Gauge<Long>)
registeredGenericMetrics.get(
CompactionMetrics.LAST_REWRITE_CHANGELOG_FILE_SIZE);
Gauge<Long> level0FileCount =
(Gauge<Long>) registeredGenericMetrics.get(CompactionMetrics.LEVEL_0_FILE_COUNT);

assertThat(lastCompactionDuration.getValue()).isEqualTo(0);
assertThat(compactionDuration.getCount()).isEqualTo(0);
Expand Down Expand Up @@ -132,6 +134,13 @@ public void testMetricsAreUpdated() {
assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001);
assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1201);
assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(2501);

// test level0FileCount
compactionMetrics.reportLevel0FileCount(10);
assertThat(level0FileCount.getValue()).isEqualTo(10);

compactionMetrics.reportLevel0FileCount(20);
assertThat(level0FileCount.getValue()).isEqualTo(20);
}

private void reportOnce(CompactionMetrics compactionMetrics) {
Expand Down

0 comments on commit b7b25a1

Please sign in to comment.