Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce compactionCompletedCount/compactionQueuedCount metrics. #4391

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,17 @@ private void triggerFullCompaction() {
targetFileSize,
rewriter,
metricsReporter));
recordCompactionsQueuedRequest();
compacting = new ArrayList<>(toCompact);
toCompact.clear();
}

private void recordCompactionsQueuedRequest() {
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
}
}

private void triggerCompactionWithBestEffort() {
if (taskFuture != null) {
return;
Expand All @@ -130,6 +137,7 @@ private void triggerCompactionWithBestEffort() {
executor.submit(
new AutoCompactTask(
dvMaintainer, compacting, rewriter, metricsReporter));
recordCompactionsQueuedRequest();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public CompactResult call() throws Exception {
if (metricsReporter != null) {
metricsReporter.reportCompactionTime(
System.currentTimeMillis() - startMillis);
metricsReporter.increaseCompactionsCompletedCount();
}
},
LOG);
Expand All @@ -61,8 +62,24 @@ public CompactResult call() throws Exception {
logMetric(startMillis, result.before(), result.after());
}
return result;
} catch (Exception e) {
MetricUtils.safeCall(this::increaseCompactionsFailedCount, LOG);
throw e;
} finally {
MetricUtils.safeCall(this::stopTimer, LOG);
MetricUtils.safeCall(this::decreaseCompactionsQueuedCount, LOG);
}
}

private void decreaseCompactionsQueuedCount() {
if (metricsReporter != null) {
metricsReporter.decreaseCompactionsQueuedCount();
}
}

private void increaseCompactionsFailedCount() {
if (metricsReporter != null) {
metricsReporter.increaseCompactionsFailedCount();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
.collect(Collectors.joining(", ")));
}
taskFuture = executor.submit(task);
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
}
}

/** Finish current task, and update result files to {@link Levels}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.Counter;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;

Expand All @@ -41,13 +42,19 @@ public class CompactionMetrics {
public static final String AVG_LEVEL0_FILE_COUNT = "avgLevel0FileCount";
public static final String COMPACTION_THREAD_BUSY = "compactionThreadBusy";
public static final String AVG_COMPACTION_TIME = "avgCompactionTime";
public static final String COMPACTIONS_COMPLETED_COUNT = "compactionsCompletedCount";
zhuyaogai marked this conversation as resolved.
Show resolved Hide resolved
public static final String COMPACTIONS_FAILED_COUNT = "compactionsFailedCount";
public static final String COMPACTIONS_QUEUED_COUNT = "compactionsQueuedCount";
private static final long BUSY_MEASURE_MILLIS = 60_000;
private static final int COMPACTION_TIME_WINDOW = 100;

private final MetricGroup metricGroup;
private final Map<PartitionAndBucket, ReporterImpl> reporters;
private final Map<Long, CompactTimer> compactTimers;
private final Queue<Long> compactionTimes;
private Counter compactionsCompletedCounter;
private Counter compactionsFailedCounter;
private Counter compactionsQueuedCounter;

public CompactionMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
Expand All @@ -70,6 +77,10 @@ private void registerGenericCompactionMetrics() {
metricGroup.gauge(
AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0));
metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum());

compactionsCompletedCounter = metricGroup.counter(COMPACTIONS_COMPLETED_COUNT);
compactionsFailedCounter = metricGroup.counter(COMPACTIONS_FAILED_COUNT);
compactionsQueuedCounter = metricGroup.counter(COMPACTIONS_QUEUED_COUNT);
}

private LongStream getLevel0FileCountStream() {
Expand Down Expand Up @@ -98,6 +109,14 @@ public interface Reporter {

void reportCompactionTime(long time);

void increaseCompactionsCompletedCount();

void increaseCompactionsFailedCount();

void increaseCompactionsQueuedCount();

void decreaseCompactionsQueuedCount();

void unregister();
}

Expand Down Expand Up @@ -133,6 +152,26 @@ public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
}

@Override
public void increaseCompactionsCompletedCount() {
compactionsCompletedCounter.inc();
}

@Override
public void increaseCompactionsFailedCount() {
compactionsFailedCounter.inc();
}

@Override
public void increaseCompactionsQueuedCount() {
compactionsQueuedCounter.inc();
}

@Override
public void decreaseCompactionsQueuedCount() {
compactionsQueuedCounter.dec();
}

@Override
public void unregister() {
reporters.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.paimon.operation.metrics;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.Counter;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricRegistryImpl;

import org.junit.jupiter.api.Test;
Expand All @@ -36,6 +38,9 @@ public void testReportMetrics() {
assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(-1.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.AVG_COMPACTION_TIME)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_COMPLETED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_FAILED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_QUEUED_COUNT)).isEqualTo(0L);
CompactionMetrics.Reporter[] reporters = new CompactionMetrics.Reporter[3];
for (int i = 0; i < reporters.length; i++) {
reporters[i] = metrics.createReporter(BinaryRow.EMPTY_ROW, i);
Expand All @@ -44,6 +49,9 @@ public void testReportMetrics() {
assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_COMPLETED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_FAILED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_QUEUED_COUNT)).isEqualTo(0L);

reporters[0].reportLevel0FileCount(5);
reporters[1].reportLevel0FileCount(3);
Expand All @@ -60,9 +68,31 @@ public void testReportMetrics() {
reporters[0].reportCompactionTime(270000);
assertThat(getMetric(metrics, CompactionMetrics.AVG_COMPACTION_TIME))
.isEqualTo(273333.3333333333);

// enqueue compaction request
reporters[0].increaseCompactionsQueuedCount();
reporters[1].increaseCompactionsQueuedCount();
reporters[2].increaseCompactionsQueuedCount();
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_COMPLETED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_FAILED_COUNT)).isEqualTo(0L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_QUEUED_COUNT)).isEqualTo(3L);

// completed compactions and failed compactions
reporters[0].increaseCompactionsCompletedCount();
reporters[0].decreaseCompactionsQueuedCount();
reporters[1].increaseCompactionsFailedCount();
reporters[1].decreaseCompactionsQueuedCount();
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_COMPLETED_COUNT)).isEqualTo(1L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_FAILED_COUNT)).isEqualTo(1L);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTIONS_QUEUED_COUNT)).isEqualTo(1L);
}

private Object getMetric(CompactionMetrics metrics, String metricName) {
return ((Gauge<?>) metrics.getMetricGroup().getMetrics().get(metricName)).getValue();
Metric metric = metrics.getMetricGroup().getMetrics().get(metricName);
if (metric instanceof Gauge) {
return ((Gauge<?>) metric).getValue();
} else {
return ((Counter) metric).getCount();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,41 @@ public void processElement(UnawareAppendCompactionTask task) throws Exception {
metricsReporter.reportCompactionTime(
System.currentTimeMillis()
- startMillis);
metricsReporter
.increaseCompactionsCompletedCount();
}
},
LOG);
return commitMessage;
} catch (Exception e) {
MetricUtils.safeCall(
this::increaseCompactionsFailedCount, LOG);
zhuyaogai marked this conversation as resolved.
Show resolved Hide resolved
throw e;
} finally {
MetricUtils.safeCall(this::stopTimer, LOG);
MetricUtils.safeCall(
this::decreaseCompactionsQueuedCount, LOG);
}
}));
recordCompactionsQueuedRequest();
}

private void recordCompactionsQueuedRequest() {
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
}
}

private void decreaseCompactionsQueuedCount() {
if (metricsReporter != null) {
metricsReporter.decreaseCompactionsQueuedCount();
}
}

private void increaseCompactionsFailedCount() {
if (metricsReporter != null) {
metricsReporter.increaseCompactionsFailedCount();
}
}

private void startTimer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.ExecutorThreadFactory;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -48,6 +50,9 @@
import java.util.concurrent.Executors;

import static org.apache.paimon.operation.metrics.CompactionMetrics.AVG_COMPACTION_TIME;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTIONS_COMPLETED_COUNT;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTIONS_FAILED_COUNT;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTIONS_QUEUED_COUNT;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTION_THREAD_BUSY;

/** Test for {@link UnawareBucketCompactor}. */
Expand All @@ -65,7 +70,8 @@ public void testGaugeCollection() throws Exception {
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory(
Thread.currentThread().getName() + "-append-only-compact-worker"));
Map<String, Gauge> map = new HashMap<>();
Map<String, Gauge> gaugeMap = new HashMap<>();
Map<String, Counter> counterMap = new HashMap<>();
UnawareBucketCompactor unawareBucketCompactor =
new UnawareBucketCompactor(
(FileStoreTable) catalog.getTable(identifier()),
Expand All @@ -74,7 +80,7 @@ public void testGaugeCollection() throws Exception {
new FileStoreSourceReaderTest.DummyMetricGroup() {
@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
map.put(name, gauge);
gaugeMap.put(name, gauge);
return null;
}

Expand All @@ -87,18 +93,31 @@ public MetricGroup addGroup(String name) {
public MetricGroup addGroup(String key, String value) {
return this;
}

@Override
public Counter counter(String name) {
SimpleCounter counter = new SimpleCounter();
counterMap.put(name, counter);
return counter;
}
});

for (int i = 0; i < 320; i++) {
unawareBucketCompactor.processElement(new MockCompactionTask());
Thread.sleep(250);
}

double compactionThreadBusy = (double) map.get(COMPACTION_THREAD_BUSY).getValue();
double compactionAvrgTime = (double) map.get(AVG_COMPACTION_TIME).getValue();
double compactionThreadBusy = (double) gaugeMap.get(COMPACTION_THREAD_BUSY).getValue();
double compactionAvgTime = (double) gaugeMap.get(AVG_COMPACTION_TIME).getValue();
long compactionsCompletedCount = counterMap.get(COMPACTIONS_COMPLETED_COUNT).getCount();
long compactionsFailedCount = counterMap.get(COMPACTIONS_FAILED_COUNT).getCount();
long compactionsQueuedCount = counterMap.get(COMPACTIONS_QUEUED_COUNT).getCount();

Assertions.assertThat(compactionThreadBusy).isGreaterThan(45).isLessThan(55);
Assertions.assertThat(compactionAvrgTime).isGreaterThan(120).isLessThan(140);
Assertions.assertThat(compactionAvgTime).isGreaterThan(120).isLessThan(140);
Assertions.assertThat(compactionsCompletedCount).isEqualTo(320L);
Assertions.assertThat(compactionsFailedCount).isEqualTo(0L);
Assertions.assertThat(compactionsQueuedCount).isEqualTo(0L);
}

protected Catalog getCatalog() {
Expand Down
Loading