Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce compactionCompletedCount/compactionQueuedCount metrics. #4391

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
<td>Gauge</td>
<td>The average runtime of compaction threads, calculated based on recorded compaction time data in milliseconds. The value represents the average duration of compaction operations. Higher values indicate longer average compaction times, which may suggest the need for performance optimization.</td>
</tr>
<tr>
<td>compactionCompletedCount</td>
<td>Counter</td>
<td>The total number of compactions that have completed.</td>
</tr>
<tr>
<td>compactionQueuedCount</td>
<td>Counter</td>
<td>The total number of compactions that are queued/running.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,17 @@ private void triggerFullCompaction() {
targetFileSize,
rewriter,
metricsReporter));
recordCompactionsQueuedRequest();
compacting = new ArrayList<>(toCompact);
toCompact.clear();
}

private void recordCompactionsQueuedRequest() {
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
}
}

private void triggerCompactionWithBestEffort() {
if (taskFuture != null) {
return;
Expand All @@ -130,6 +137,7 @@ private void triggerCompactionWithBestEffort() {
executor.submit(
new AutoCompactTask(
dvMaintainer, compacting, rewriter, metricsReporter));
recordCompactionsQueuedRequest();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public CompactResult call() throws Exception {
if (metricsReporter != null) {
metricsReporter.reportCompactionTime(
System.currentTimeMillis() - startMillis);
metricsReporter.increaseCompactionsCompletedCount();
}
},
LOG);
Expand All @@ -63,6 +64,13 @@ public CompactResult call() throws Exception {
return result;
} finally {
MetricUtils.safeCall(this::stopTimer, LOG);
MetricUtils.safeCall(this::decreaseCompactionsQueuedCount, LOG);
}
}

private void decreaseCompactionsQueuedCount() {
if (metricsReporter != null) {
metricsReporter.decreaseCompactionsQueuedCount();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
.collect(Collectors.joining(", ")));
}
taskFuture = executor.submit(task);
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
}
}

/** Finish current task, and update result files to {@link Levels}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.Counter;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;

Expand All @@ -41,13 +42,17 @@ public class CompactionMetrics {
public static final String AVG_LEVEL0_FILE_COUNT = "avgLevel0FileCount";
public static final String COMPACTION_THREAD_BUSY = "compactionThreadBusy";
public static final String AVG_COMPACTION_TIME = "avgCompactionTime";
public static final String COMPACTION_COMPLETED_COUNT = "compactionCompletedCount";
public static final String COMPACTION_QUEUED_COUNT = "compactionQueuedCount";
private static final long BUSY_MEASURE_MILLIS = 60_000;
private static final int COMPACTION_TIME_WINDOW = 100;

private final MetricGroup metricGroup;
private final Map<PartitionAndBucket, ReporterImpl> reporters;
private final Map<Long, CompactTimer> compactTimers;
private final Queue<Long> compactionTimes;
private Counter compactionsCompletedCounter;
private Counter compactionsQueuedCounter;

public CompactionMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
Expand All @@ -70,6 +75,9 @@ private void registerGenericCompactionMetrics() {
metricGroup.gauge(
AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0));
metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum());

compactionsCompletedCounter = metricGroup.counter(COMPACTION_COMPLETED_COUNT);
compactionsQueuedCounter = metricGroup.counter(COMPACTION_QUEUED_COUNT);
}

private LongStream getLevel0FileCountStream() {
Expand Down Expand Up @@ -98,6 +106,12 @@ public interface Reporter {

void reportCompactionTime(long time);

void increaseCompactionsCompletedCount();

void increaseCompactionsQueuedCount();

void decreaseCompactionsQueuedCount();

void unregister();
}

Expand Down Expand Up @@ -133,6 +147,21 @@ public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
}

@Override
public void increaseCompactionsCompletedCount() {
compactionsCompletedCounter.inc();
}

@Override
public void increaseCompactionsQueuedCount() {
compactionsQueuedCounter.inc();
}

@Override
public void decreaseCompactionsQueuedCount() {
compactionsQueuedCounter.dec();
}

@Override
public void unregister() {
reporters.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.paimon.operation.metrics;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.Counter;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricRegistryImpl;

import org.junit.jupiter.api.Test;
Expand All @@ -36,6 +38,8 @@ public void testReportMetrics() {
assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(-1.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.AVG_COMPACTION_TIME)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_COMPLETED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_QUEUED_COUNT)).isEqualTo(0L);
CompactionMetrics.Reporter[] reporters = new CompactionMetrics.Reporter[3];
for (int i = 0; i < reporters.length; i++) {
reporters[i] = metrics.createReporter(BinaryRow.EMPTY_ROW, i);
Expand All @@ -44,6 +48,8 @@ public void testReportMetrics() {
assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_COMPLETED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_QUEUED_COUNT)).isEqualTo(0L);

reporters[0].reportLevel0FileCount(5);
reporters[1].reportLevel0FileCount(3);
Expand All @@ -60,9 +66,28 @@ public void testReportMetrics() {
reporters[0].reportCompactionTime(270000);
assertThat(getMetric(metrics, CompactionMetrics.AVG_COMPACTION_TIME))
.isEqualTo(273333.3333333333);

// enqueue compaction request
reporters[0].increaseCompactionsQueuedCount();
reporters[1].increaseCompactionsQueuedCount();
reporters[2].increaseCompactionsQueuedCount();
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_COMPLETED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_QUEUED_COUNT)).isEqualTo(3L);

// completed compactions and remove them from queue
reporters[0].increaseCompactionsCompletedCount();
reporters[0].decreaseCompactionsQueuedCount();
reporters[1].decreaseCompactionsQueuedCount();
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_COMPLETED_COUNT)).isEqualTo(1L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_QUEUED_COUNT)).isEqualTo(1L);
}

private Object getMetric(CompactionMetrics metrics, String metricName) {
return ((Gauge<?>) metrics.getMetricGroup().getMetrics().get(metricName)).getValue();
Metric metric = metrics.getMetricGroup().getMetrics().get(metricName);
if (metric instanceof Gauge) {
return ((Gauge<?>) metric).getValue();
} else {
return ((Counter) metric).getCount();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,31 @@ public void processElement(UnawareAppendCompactionTask task) throws Exception {
metricsReporter.reportCompactionTime(
System.currentTimeMillis()
- startMillis);
metricsReporter
.increaseCompactionsCompletedCount();
}
},
LOG);
return commitMessage;
} finally {
MetricUtils.safeCall(this::stopTimer, LOG);
MetricUtils.safeCall(
this::decreaseCompactionsQueuedCount, LOG);
}
}));
recordCompactionsQueuedRequest();
}

private void recordCompactionsQueuedRequest() {
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
}
}

private void decreaseCompactionsQueuedCount() {
if (metricsReporter != null) {
metricsReporter.decreaseCompactionsQueuedCount();
}
}

private void startTimer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.ExecutorThreadFactory;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -48,6 +50,8 @@
import java.util.concurrent.Executors;

import static org.apache.paimon.operation.metrics.CompactionMetrics.AVG_COMPACTION_TIME;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTION_COMPLETED_COUNT;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTION_QUEUED_COUNT;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTION_THREAD_BUSY;

/** Test for {@link UnawareBucketCompactor}. */
Expand All @@ -65,7 +69,8 @@ public void testGaugeCollection() throws Exception {
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory(
Thread.currentThread().getName() + "-append-only-compact-worker"));
Map<String, Gauge> map = new HashMap<>();
Map<String, Gauge> gaugeMap = new HashMap<>();
Map<String, Counter> counterMap = new HashMap<>();
UnawareBucketCompactor unawareBucketCompactor =
new UnawareBucketCompactor(
(FileStoreTable) catalog.getTable(identifier()),
Expand All @@ -74,7 +79,7 @@ public void testGaugeCollection() throws Exception {
new FileStoreSourceReaderTest.DummyMetricGroup() {
@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
map.put(name, gauge);
gaugeMap.put(name, gauge);
return null;
}

Expand All @@ -87,18 +92,29 @@ public MetricGroup addGroup(String name) {
public MetricGroup addGroup(String key, String value) {
return this;
}

@Override
public Counter counter(String name) {
SimpleCounter counter = new SimpleCounter();
counterMap.put(name, counter);
return counter;
}
});

for (int i = 0; i < 320; i++) {
unawareBucketCompactor.processElement(new MockCompactionTask());
Thread.sleep(250);
}

double compactionThreadBusy = (double) map.get(COMPACTION_THREAD_BUSY).getValue();
double compactionAvrgTime = (double) map.get(AVG_COMPACTION_TIME).getValue();
double compactionThreadBusy = (double) gaugeMap.get(COMPACTION_THREAD_BUSY).getValue();
double compactionAvgTime = (double) gaugeMap.get(AVG_COMPACTION_TIME).getValue();
long compactionsCompletedCount = counterMap.get(COMPACTION_COMPLETED_COUNT).getCount();
long compactionsQueuedCount = counterMap.get(COMPACTION_QUEUED_COUNT).getCount();

Assertions.assertThat(compactionThreadBusy).isGreaterThan(45).isLessThan(55);
Assertions.assertThat(compactionAvrgTime).isGreaterThan(120).isLessThan(140);
Assertions.assertThat(compactionAvgTime).isGreaterThan(120).isLessThan(140);
Assertions.assertThat(compactionsCompletedCount).isEqualTo(320L);
Assertions.assertThat(compactionsQueuedCount).isEqualTo(0L);
}

protected Catalog getCatalog() {
Expand Down
Loading