diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md index 973a8fcfe7ba..5950c518ed50 100644 --- a/docs/content/maintenance/metrics.md +++ b/docs/content/maintenance/metrics.md @@ -28,7 +28,7 @@ under the License. Paimon has built a metrics system to measure the behaviours of reading and writing, like how many manifest files it scanned in the last planning, how long it took in the last commit operation, how many files it deleted in the last compact operation. -In Paimon's metrics system, metrics are updated and reported at different levels of granularity. Currently, the levels of **table** and **bucket** are provided, which means you can get metrics per table or bucket. +In Paimon's metrics system, metrics are updated and reported at table granularity. There are three types of metrics provided in the Paimon metric system, `Gauge`, `Counter`, `Histogram`. - `Gauge`: Provides a value of any type at a point in time. @@ -47,7 +47,6 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca Metrics Name - Level Type Description @@ -55,49 +54,41 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca lastScanDuration - Table Gauge The time it took to complete the last scan. scanDuration - Table Histogram Distributions of the time taken by the last few scans. lastScannedManifests - Table Gauge Number of scanned manifest files in the last scan. lastSkippedByPartitionAndStats - Table Gauge Skipped table files by partition filter and value / key stats information in the last scan. lastSkippedByBucketAndLevelFilter - Table Gauge Skipped table files by bucket, bucket key and level filter in the last scan. lastSkippedByWholeBucketFilesFilter - Table Gauge Skipped table files by bucket level value filter (only primary key table) in the last scan. lastScanSkippedTableFiles - Table Gauge Total skipped table files in the last scan. lastScanResultedTableFiles - Table Gauge Resulted table files in the last scan. @@ -110,7 +101,6 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca Metrics Name - Level Type Description @@ -118,143 +108,93 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca lastCommitDuration - Table Gauge The time it took to complete the last commit. commitDuration - Table Histogram Distributions of the time taken by the last few commits. lastCommitAttempts - Table Gauge The number of attempts the last commit made. lastTableFilesAdded - Table Gauge Number of added table files in the last commit, including newly created data files and compacted after. lastTableFilesDeleted - Table Gauge Number of deleted table files in the last commit, which comes from compacted before. lastTableFilesAppended - Table Gauge Number of appended table files in the last commit, which means the newly created data files. lastTableFilesCommitCompacted - Table Gauge Number of compacted table files in the last commit, including compacted before and after. lastChangelogFilesAppended - Table Gauge Number of appended changelog files in last commit. lastChangelogFileCommitCompacted - Table Gauge Number of compacted changelog files in last commit. lastGeneratedSnapshots - Table Gauge Number of snapshot files generated in the last commit, maybe 1 snapshot or 2 snapshots. lastDeltaRecordsAppended - Table Gauge Delta records count in last commit with APPEND commit kind. lastChangelogRecordsAppended - Table Gauge Changelog records count in last commit with APPEND commit kind. lastDeltaRecordsCommitCompacted - Table Gauge Delta records count in last commit with COMPACT commit kind. lastChangelogRecordsCommitCompacted - Table Gauge Changelog records count in last commit with COMPACT commit kind. lastPartitionsWritten - Table Gauge Number of partitions written in the last commit. lastBucketsWritten - Table Gauge Number of buckets written in the last commit. -### Write Metrics - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Metrics NameLevelTypeDescription
writeRecordCountBucketCounterTotal number of records written into the bucket.
flushCostMillisBucketHistogramDistributions of the time taken by the last few write buffer flushing.
prepareCommitCostMillisBucketHistogramDistributions of the time taken by the last few call of `prepareCommit`.
- ### Write Buffer Metrics - @@ -262,19 +202,16 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca - - - @@ -287,65 +224,25 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + - - + - + - - + - +
Metrics NameLevel Type Description
bufferPreemptCountTable Gauge The total number of memory preempted.
usedWriteBufferSizeByteTable Gauge Current used write buffer size in byte.
totalWriteBufferSizeByteTable Gauge The total write buffer size configured in byte.
Metrics NameLevel Type Description
level0FileCountBucketGaugeThe level 0 file count will become larger if asynchronous compaction cannot be done in time.
lastCompactionDurationBucketGaugeThe time it took to complete the last compaction.
compactionDurationBucketHistogramDistributions of the time taken by the last few compaction.
lastTableFilesCompactedBeforeBucketGaugeNumber of deleted files in the last compaction.
lastTableFilesCompactedAfterBucketGaugeNumber of added files in the last compaction.
lastChangelogFilesCompactedBucketGaugeNumber of changelog files compacted in last compaction.
lastRewriteInputFileSizeBucketmaxLevel0FileCount GaugeSize of deleted files in the last compaction.The maximum number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time.
lastRewriteOutputFileSizeBucketavgLevel0FileCount GaugeSize of added files in the last compaction.The average number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time.
lastRewriteChangelogFileSizeBucketcompactionThreadBusy GaugeSize of changelog files compacted in last compaction.The maximum business of compaction threads in this parallelism. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time).
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java index 58106e070373..7d4346bdb782 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java @@ -25,6 +25,7 @@ import org.apache.paimon.compact.CompactTask; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.operation.metrics.CompactionMetrics; +import org.apache.paimon.operation.metrics.MetricUtils; import org.apache.paimon.utils.Preconditions; import org.slf4j.Logger; @@ -59,7 +60,7 @@ public class AppendOnlyCompactManager extends CompactFutureManager { private List compacting; - @Nullable private final CompactionMetrics metrics; + @Nullable private final CompactionMetrics.Reporter metricsReporter; public AppendOnlyCompactManager( ExecutorService executor, @@ -68,7 +69,7 @@ public AppendOnlyCompactManager( int maxFileNum, long targetFileSize, CompactRewriter rewriter, - @Nullable CompactionMetrics metrics) { + @Nullable CompactionMetrics.Reporter metricsReporter) { this.executor = executor; this.toCompact = new TreeSet<>(fileComparator(false)); this.toCompact.addAll(restored); @@ -76,7 +77,7 @@ public AppendOnlyCompactManager( this.maxFileNum = maxFileNum; this.targetFileSize = targetFileSize; this.rewriter = rewriter; - this.metrics = metrics; + this.metricsReporter = metricsReporter; } @Override @@ -98,7 +99,8 @@ private void triggerFullCompaction() { } taskFuture = - executor.submit(new FullCompactTask(toCompact, targetFileSize, rewriter, metrics)); + executor.submit( + new FullCompactTask(toCompact, targetFileSize, rewriter, metricsReporter)); compacting = new ArrayList<>(toCompact); toCompact.clear(); } @@ -110,7 +112,8 @@ private void triggerCompactionWithBestEffort() { Optional> picked = pickCompactBefore(); if (picked.isPresent()) { compacting = picked.get(); - taskFuture = executor.submit(new AutoCompactTask(compacting, rewriter, metrics)); + taskFuture = + executor.submit(new AutoCompactTask(compacting, rewriter, metricsReporter)); } } @@ -196,8 +199,8 @@ TreeSet getToCompact() { @Override public void close() throws IOException { - if (metrics != null) { - metrics.close(); + if (metricsReporter != null) { + MetricUtils.safeCall(metricsReporter::unregister, LOG); } } @@ -212,8 +215,8 @@ public FullCompactTask( Collection inputs, long targetFileSize, CompactRewriter rewriter, - @Nullable CompactionMetrics metrics) { - super(metrics); + @Nullable CompactionMetrics.Reporter metricsReporter) { + super(metricsReporter); this.inputs = new LinkedList<>(inputs); this.targetFileSize = targetFileSize; this.rewriter = rewriter; @@ -268,8 +271,8 @@ public static class AutoCompactTask extends CompactTask { public AutoCompactTask( List toCompact, CompactRewriter rewriter, - @Nullable CompactionMetrics metrics) { - super(metrics); + @Nullable CompactionMetrics.Reporter metricsReporter) { + super(metricsReporter); this.toCompact = toCompact; this.rewriter = rewriter; } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 0810f023bf64..b8344b30119b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -33,7 +33,6 @@ import org.apache.paimon.io.RowDataRollingFileWriter; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; -import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -75,7 +74,6 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final IOManager ioManager; private MemorySegmentPool memorySegmentPool; - private WriterMetrics writerMetrics; public AppendOnlyWriter( FileIO fileIO, @@ -92,8 +90,7 @@ public AppendOnlyWriter( boolean useWriteBuffer, boolean spillable, String fileCompression, - FieldStatsCollector.Factory[] statsCollectors, - WriterMetrics writerMetrics) { + FieldStatsCollector.Factory[] statsCollectors) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -118,7 +115,6 @@ public AppendOnlyWriter( compactBefore.addAll(increment.compactIncrement().compactBefore()); compactAfter.addAll(increment.compactIncrement().compactAfter()); } - this.writerMetrics = writerMetrics; } @Override @@ -138,10 +134,6 @@ public void write(InternalRow rowData) throws Exception { throw new RuntimeException("Mem table is too small to hold a single element."); } } - - if (writerMetrics != null) { - writerMetrics.incWriteRecordNum(); - } } @Override @@ -161,14 +153,9 @@ public Collection dataFiles() { @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { - long start = System.currentTimeMillis(); flush(false, false); trySyncLatestCompaction(waitCompaction || forceCompact); - CommitIncrement increment = drainIncrement(); - if (writerMetrics != null) { - writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start); - } - return increment; + return drainIncrement(); } @Override @@ -178,7 +165,6 @@ public boolean isCompacting() { @VisibleForTesting void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { - long start = System.currentTimeMillis(); List flushedFiles = sinkWriter.flush(); // add new generated files @@ -186,9 +172,6 @@ void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws trySyncLatestCompaction(waitForLatestCompaction); compactManager.triggerCompaction(forcedFullCompaction); newFiles.addAll(flushedFiles); - if (writerMetrics != null) { - writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start); - } } @Override @@ -198,9 +181,6 @@ public void sync() throws Exception { @Override public void close() throws Exception { - if (writerMetrics != null) { - writerMetrics.close(); - } // cancel compaction so that it does not block job cancelling compactManager.cancelCompaction(); sync(); diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java index 7a8dc8da0434..8b8f7e533664 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java @@ -20,9 +20,6 @@ import org.apache.paimon.annotation.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -31,8 +28,6 @@ /** Base implementation of {@link CompactManager} which runs compaction in a separate thread. */ public abstract class CompactFutureManager implements CompactManager { - private static final Logger LOG = LoggerFactory.getLogger(CompactFutureManager.class); - protected Future taskFuture; @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java index dad4aa374a46..dac1005172c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java @@ -20,14 +20,13 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.operation.metrics.CompactionMetrics; -import org.apache.paimon.operation.metrics.CompactionStats; +import org.apache.paimon.operation.metrics.MetricUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; @@ -35,40 +34,37 @@ public abstract class CompactTask implements Callable { private static final Logger LOG = LoggerFactory.getLogger(CompactTask.class); - @Nullable private final CompactionMetrics metrics; - public CompactTask(@Nullable CompactionMetrics metrics) { - this.metrics = metrics; + @Nullable private final CompactionMetrics.Reporter metricsReporter; + + public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter) { + this.metricsReporter = metricsReporter; } @Override public CompactResult call() throws Exception { - long startMillis = System.currentTimeMillis(); - CompactResult result = null; + MetricUtils.safeCall(this::startTimer, LOG); try { - result = doCompact(); - + long startMillis = System.currentTimeMillis(); + CompactResult result = doCompact(); if (LOG.isDebugEnabled()) { logMetric(startMillis, result.before(), result.after()); } return result; } finally { - if (metrics != null) { - long duration = System.currentTimeMillis() - startMillis; - CompactionStats compactionStats = - result == null - ? new CompactionStats( - duration, - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList()) - : new CompactionStats( - duration, - result.before(), - result.after(), - result.changelog()); - metrics.reportCompaction(compactionStats); - } + MetricUtils.safeCall(this::stopTimer, LOG); + } + } + + private void startTimer() { + if (metricsReporter != null) { + metricsReporter.getCompactTimer().start(); + } + } + + private void stopTimer() { + if (metricsReporter != null) { + metricsReporter.getCompactTimer().finish(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index de5cbf14bed2..046fad893f2e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -33,7 +33,6 @@ import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.mergetree.compact.MergeFunction; -import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.table.sink.SequenceGenerator; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; @@ -78,8 +77,6 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private long newSequenceNumber; private WriteBuffer writeBuffer; - private final WriterMetrics writerMetrics; - public MergeTreeWriter( boolean writeBufferSpillable, int sortMaxFan, @@ -93,8 +90,7 @@ public MergeTreeWriter( boolean commitForceCompact, ChangelogProducer changelogProducer, @Nullable CommitIncrement increment, - @Nullable SequenceGenerator sequenceGenerator, - WriterMetrics writerMetrics) { + @Nullable SequenceGenerator sequenceGenerator) { this.writeBufferSpillable = writeBufferSpillable; this.sortMaxFan = sortMaxFan; this.sortCompression = sortCompression; @@ -125,7 +121,6 @@ public MergeTreeWriter( compactAfter.addAll(increment.compactIncrement().compactAfter()); compactChangelog.addAll(increment.compactIncrement().changelogFiles()); } - this.writerMetrics = writerMetrics; } private long newSequenceNumber() { @@ -164,10 +159,6 @@ public void write(KeyValue kv) throws Exception { throw new RuntimeException("Mem table is too small to hold a single element."); } } - - if (writerMetrics != null) { - writerMetrics.incWriteRecordNum(); - } } @Override @@ -200,7 +191,6 @@ public void flushMemory() throws Exception { private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { - long start = System.currentTimeMillis(); if (writeBuffer.size() > 0) { if (compactManager.shouldWaitForLatestCompaction()) { waitForLatestCompaction = true; @@ -240,26 +230,16 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul trySyncLatestCompaction(waitForLatestCompaction); compactManager.triggerCompaction(forcedFullCompaction); - if (writerMetrics != null) { - writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start); - } } @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { - long start = System.currentTimeMillis(); flushWriteBuffer(waitCompaction, false); trySyncLatestCompaction( waitCompaction || commitForceCompact || compactManager.shouldWaitForPreparingCheckpoint()); - - CommitIncrement increment = drainIncrement(); - - if (writerMetrics != null) { - writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start); - } - return increment; + return drainIncrement(); } @Override @@ -320,9 +300,6 @@ private void trySyncLatestCompaction(boolean blocking) throws Exception { @Override public void close() throws Exception { - if (writerMetrics != null) { - writerMetrics.close(); - } // cancel compaction so that it does not block job cancelling compactManager.cancelCompaction(); sync(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index 4da1e2038772..6d94ad574565 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -28,6 +28,7 @@ import org.apache.paimon.mergetree.LevelSortedRun; import org.apache.paimon.mergetree.Levels; import org.apache.paimon.operation.metrics.CompactionMetrics; +import org.apache.paimon.operation.metrics.MetricUtils; import org.apache.paimon.utils.Preconditions; import org.slf4j.Logger; @@ -56,7 +57,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { private final int numSortedRunStopTrigger; private final CompactRewriter rewriter; - @Nullable private final CompactionMetrics metrics; + @Nullable private final CompactionMetrics.Reporter metricsReporter; public MergeTreeCompactManager( ExecutorService executor, @@ -66,7 +67,7 @@ public MergeTreeCompactManager( long compactionFileSize, int numSortedRunStopTrigger, CompactRewriter rewriter, - @Nullable CompactionMetrics metrics) { + @Nullable CompactionMetrics.Reporter metricsReporter) { this.executor = executor; this.levels = levels; this.strategy = strategy; @@ -74,8 +75,9 @@ public MergeTreeCompactManager( this.numSortedRunStopTrigger = numSortedRunStopTrigger; this.keyComparator = keyComparator; this.rewriter = rewriter; - this.metrics = metrics; - reportLevel0FileCount(); + this.metricsReporter = metricsReporter; + + MetricUtils.safeCall(this::reportLevel0FileCount, LOG); } @Override @@ -92,7 +94,7 @@ public boolean shouldWaitForPreparingCheckpoint() { @Override public void addNewFile(DataFileMeta file) { levels.addLevel0File(file); - reportLevel0FileCount(); + MetricUtils.safeCall(this::reportLevel0FileCount, LOG); } @Override @@ -171,7 +173,12 @@ public Levels levels() { private void submitCompaction(CompactUnit unit, boolean dropDelete) { MergeTreeCompactTask task = new MergeTreeCompactTask( - keyComparator, compactionFileSize, rewriter, unit, dropDelete, metrics); + keyComparator, + compactionFileSize, + rewriter, + unit, + dropDelete, + metricsReporter); if (LOG.isDebugEnabled()) { LOG.debug( "Pick these files (name, level, size) for compaction: {}", @@ -200,7 +207,7 @@ public Optional getCompactionResult(boolean blocking) r.after()); } levels.update(r.before(), r.after()); - reportLevel0FileCount(); + MetricUtils.safeCall(this::reportLevel0FileCount, LOG); if (LOG.isDebugEnabled()) { LOG.debug( "Levels in compact manager updated. Current runs are\n{}", @@ -211,16 +218,16 @@ public Optional getCompactionResult(boolean blocking) } private void reportLevel0FileCount() { - if (metrics != null) { - metrics.reportLevel0FileCount(levels.level0().size()); + if (metricsReporter != null) { + metricsReporter.reportLevel0FileCount(levels.level0().size()); } } @Override public void close() throws IOException { rewriter.close(); - if (metrics != null) { - metrics.close(); + if (metricsReporter != null) { + MetricUtils.safeCall(metricsReporter::unregister, LOG); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index 4fc11b0760af..7299fbb5c95d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -54,8 +54,8 @@ public MergeTreeCompactTask( CompactRewriter rewriter, CompactUnit unit, boolean dropDelete, - @Nullable CompactionMetrics metrics) { - super(metrics); + @Nullable CompactionMetrics.Reporter metricsReporter) { + super(metricsReporter); this.minFileSize = minFileSize; this.rewriter = rewriter; this.outputLevel = unit.outputLevel(); diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java index 60f0e2a07852..25bcac4cbb02 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java @@ -25,8 +25,6 @@ public abstract class MetricRegistry { private static final String KEY_TABLE = "table"; - private static final String KEY_PARTITION = "partition"; - private static final String KEY_BUCKET = "bucket"; public MetricGroup tableMetricGroup(String groupName, String tableName) { Map variables = new LinkedHashMap<>(); @@ -35,16 +33,6 @@ public MetricGroup tableMetricGroup(String groupName, String tableName) { return createMetricGroup(groupName, variables); } - public MetricGroup bucketMetricGroup( - String groupName, String tableName, String partition, int bucket) { - Map variables = new LinkedHashMap<>(); - variables.put(KEY_TABLE, tableName); - variables.put(KEY_PARTITION, partition); - variables.put(KEY_BUCKET, String.valueOf(bucket)); - - return createMetricGroup(groupName, variables); - } - protected abstract MetricGroup createMetricGroup( String groupName, Map variables); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 7b119b8583a3..af78e34632e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.index.IndexMaintainer; import org.apache.paimon.io.DataFileMeta; @@ -31,12 +30,10 @@ import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.metrics.CompactionMetrics; -import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; -import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -77,10 +74,9 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { private boolean closeCompactExecutorWhenLeaving = true; private boolean ignorePreviousFiles = false; protected boolean isStreamingMode = false; - private MetricRegistry metricRegistry = null; + protected CompactionMetrics compactionMetrics = null; protected final String tableName; - private final FileStorePathFactory pathFactory; protected AbstractFileStoreWrite( String commitUser, @@ -88,7 +84,6 @@ protected AbstractFileStoreWrite( FileStoreScan scan, @Nullable IndexMaintainer.Factory indexFactory, String tableName, - FileStorePathFactory pathFactory, int writerNumberMax) { this.commitUser = commitUser; this.snapshotManager = snapshotManager; @@ -97,7 +92,6 @@ protected AbstractFileStoreWrite( this.writers = new HashMap<>(); this.tableName = tableName; - this.pathFactory = pathFactory; this.writerNumberMax = writerNumberMax; } @@ -259,6 +253,9 @@ public void close() throws Exception { if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) { lazyCompactExecutor.shutdownNow(); } + if (compactionMetrics != null) { + compactionMetrics.close(); + } } @Override @@ -376,37 +373,10 @@ public void withExecutionMode(boolean isStreamingMode) { @Override public FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry) { - this.metricRegistry = metricRegistry; + this.compactionMetrics = new CompactionMetrics(metricRegistry, tableName); return this; } - @Nullable - public CompactionMetrics getCompactionMetrics(BinaryRow partition, int bucket) { - if (metricRegistry != null) { - return new CompactionMetrics( - metricRegistry, tableName, getPartitionString(pathFactory, partition), bucket); - } - return null; - } - - @Nullable - public WriterMetrics getWriterMetrics(BinaryRow partition, int bucket) { - if (this.metricRegistry != null) { - return new WriterMetrics( - metricRegistry, tableName, getPartitionString(pathFactory, partition), bucket); - } - return null; - } - - private String getPartitionString(FileStorePathFactory pathFactory, BinaryRow partition) { - String partitionStr = - pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_"); - if (partitionStr.length() > 0) { - return partitionStr.substring(0, partitionStr.length() - 1); - } - return "_"; - } - private List scanExistingFileMetas( long snapshotId, BinaryRow partition, int bucket) { List existingFileMetas = new ArrayList<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index a76848b5e6fa..283df7b07583 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -85,7 +85,7 @@ public AppendOnlyFileStoreWrite( FileStoreScan scan, CoreOptions options, String tableName) { - super(commitUser, snapshotManager, scan, options, null, tableName, pathFactory); + super(commitUser, snapshotManager, scan, options, null, tableName); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; @@ -125,7 +125,9 @@ protected RecordWriter createWriter( compactionMaxFileNum, targetFileSize, compactRewriter(partition, bucket), - getCompactionMetrics(partition, bucket)); + compactionMetrics == null + ? null + : compactionMetrics.createReporter(partition, bucket)); return new AppendOnlyWriter( fileIO, @@ -142,8 +144,7 @@ protected RecordWriter createWriter( useWriteBuffer || forceBufferSpill, spillable || forceBufferSpill, fileCompression, - statsCollectors, - getWriterMetrics(partition, bucket)); + statsCollectors); } public AppendOnlyCompactManager.CompactRewriter compactRewriter( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 6a18a34dc047..269ff2dc5dc2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -110,7 +110,7 @@ public KeyValueFileStoreWrite( CoreOptions options, KeyValueFieldsExtractor extractor, String tableName) { - super(commitUser, snapshotManager, scan, options, indexFactory, tableName, pathFactory); + super(commitUser, snapshotManager, scan, options, indexFactory, tableName); this.fileIO = fileIO; this.keyType = keyType; this.valueType = valueType; @@ -186,8 +186,7 @@ protected MergeTreeWriter createWriter( options.commitForceCompact(), options.changelogProducer(), restoreIncrement, - SequenceGenerator.create(schema, options), - getWriterMetrics(partition, bucket)); + SequenceGenerator.create(schema, options)); } @VisibleForTesting @@ -214,7 +213,9 @@ private CompactManager createCompactManager( options.compactionFileSize(), options.numSortedRunStopTrigger(), rewriter, - getCompactionMetrics(partition, bucket)); + compactionMetrics == null + ? null + : compactionMetrics.createReporter(partition, bucket)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index b49824f01c59..9159965d9f9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -26,7 +26,6 @@ import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.metrics.WriterBufferMetric; -import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -61,15 +60,13 @@ public MemoryFileStoreWrite( FileStoreScan scan, CoreOptions options, @Nullable IndexMaintainer.Factory indexFactory, - String tableName, - FileStorePathFactory pathFactory) { + String tableName) { super( commitUser, snapshotManager, scan, indexFactory, tableName, - pathFactory, options.writeMaxWritersToSpill()); this.options = options; this.cacheManager = new CacheManager(options.lookupCacheMaxMemory()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactTimer.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactTimer.java new file mode 100644 index 000000000000..47036218f7cc --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactTimer.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.metrics; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.utils.Preconditions; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.LinkedList; + +/** + * A timer which supports the following operations in O(1) amortized time complexity. + * + *
    + *
  • Start the timer. + *
  • Stop the timer. + *
  • Query how long the timer is running in the last queryLengthMillis + * milliseconds, where queryLengthMillis is a constant. + *
+ */ +@ThreadSafe +public class CompactTimer { + + private final long queryLengthMillis; + private final LinkedList intervals; + // innerSum is the total length of intervals, except the first and the last one + private long innerSum; + private long lastCallMillis; + + public CompactTimer(long queryLengthMillis) { + this.queryLengthMillis = queryLengthMillis; + this.intervals = new LinkedList<>(); + this.innerSum = 0; + this.lastCallMillis = -1; + } + + public void start() { + start(System.currentTimeMillis()); + } + + @VisibleForTesting + void start(long millis) { + synchronized (intervals) { + removeExpiredIntervals(millis - queryLengthMillis); + Preconditions.checkArgument( + intervals.isEmpty() || intervals.getLast().finished(), + "There is an unfinished interval. This is unexpected."); + Preconditions.checkArgument(lastCallMillis <= millis, "millis must not decrease."); + lastCallMillis = millis; + + if (intervals.size() > 1) { + innerSum += intervals.getLast().totalLength(); + } + intervals.add(new TimeInterval(millis)); + } + } + + public void finish() { + finish(System.currentTimeMillis()); + } + + @VisibleForTesting + void finish(long millis) { + synchronized (intervals) { + removeExpiredIntervals(millis - queryLengthMillis); + Preconditions.checkArgument( + intervals.size() > 0 && !intervals.getLast().finished(), + "There is no unfinished interval. This is unexpected."); + Preconditions.checkArgument(lastCallMillis <= millis, "millis must not decrease."); + lastCallMillis = millis; + + intervals.getLast().finish(millis); + } + } + + public long calculateLength() { + return calculateLength(System.currentTimeMillis()); + } + + @VisibleForTesting + long calculateLength(long toMillis) { + synchronized (intervals) { + Preconditions.checkArgument(lastCallMillis <= toMillis, "millis must not decrease."); + lastCallMillis = toMillis; + + long fromMillis = toMillis - queryLengthMillis; + removeExpiredIntervals(fromMillis); + + if (intervals.isEmpty()) { + return 0; + } else if (intervals.size() == 1) { + return intervals.getFirst().calculateLength(fromMillis, toMillis); + } else { + // only the first and the last interval may not be complete, + // so we calculate them separately + return innerSum + + intervals.getFirst().calculateLength(fromMillis, toMillis) + + intervals.getLast().calculateLength(fromMillis, toMillis); + } + } + } + + private void removeExpiredIntervals(long expireMillis) { + while (intervals.size() > 0 + && intervals.getFirst().finished() + && intervals.getFirst().finishMillis <= expireMillis) { + intervals.removeFirst(); + if (intervals.size() > 1) { + innerSum -= intervals.getFirst().totalLength(); + } + } + } + + private static class TimeInterval { + + private final long startMillis; + private Long finishMillis; + + private TimeInterval(long startMillis) { + this.startMillis = startMillis; + this.finishMillis = null; + } + + private void finish(long finishMillis) { + this.finishMillis = finishMillis; + } + + private boolean finished() { + return finishMillis != null; + } + + private long totalLength() { + return finishMillis - startMillis; + } + + private long calculateLength(long fromMillis, long toMillis) { + if (finishMillis == null) { + return toMillis - Math.min(Math.max(startMillis, fromMillis), toMillis); + } else { + long l = Math.max(fromMillis, startMillis); + long r = Math.min(toMillis, finishMillis); + return Math.max(0, r - l); + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java index eca2053d4d92..4ca54a6c32d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java @@ -19,21 +19,36 @@ package org.apache.paimon.operation.metrics; import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metrics.MetricGroup; import org.apache.paimon.metrics.MetricRegistry; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.DoubleStream; +import java.util.stream.LongStream; + /** Metrics to measure a compaction. */ public class CompactionMetrics { - private static final int HISTOGRAM_WINDOW_SIZE = 100; private static final String GROUP_NAME = "compaction"; + public static final String MAX_LEVEL0_FILE_COUNT = "maxLevel0FileCount"; + public static final String AVG_LEVEL0_FILE_COUNT = "avgLevel0FileCount"; + public static final String COMPACTION_THREAD_BUSY = "compactionThreadBusy"; + private static final long BUSY_MEASURE_MILLIS = 60_000; + private final MetricGroup metricGroup; + private final Map reporters; + private final Map compactTimers; + + public CompactionMetrics(MetricRegistry registry, String tableName) { + this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName); + this.reporters = new HashMap<>(); + this.compactTimers = new ConcurrentHashMap<>(); - public CompactionMetrics( - MetricRegistry registry, String tableName, String partition, int bucket) { - this.metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName, partition, bucket); registerGenericCompactionMetrics(); } @@ -42,78 +57,97 @@ public MetricGroup getMetricGroup() { return metricGroup; } - private Histogram durationHistogram; - private CompactionStats latestCompaction; - private long level0FileCount = -1; + private void registerGenericCompactionMetrics() { + metricGroup.gauge(MAX_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().max().orElse(-1)); + metricGroup.gauge( + AVG_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().average().orElse(-1)); - @VisibleForTesting static final String LAST_COMPACTION_DURATION = "lastCompactionDuration"; - @VisibleForTesting static final String COMPACTION_DURATION = "compactionDuration"; + metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum()); + } - @VisibleForTesting - static final String LAST_TABLE_FILES_COMPACTED_BEFORE = "lastTableFilesCompactedBefore"; + private LongStream getLevel0FileCountStream() { + return reporters.values().stream().mapToLong(r -> r.level0FileCount); + } - @VisibleForTesting - static final String LAST_TABLE_FILES_COMPACTED_AFTER = "lastTableFilesCompactedAfter"; + private DoubleStream getCompactBusyStream() { + return compactTimers.values().stream() + .mapToDouble(t -> 100.0 * t.calculateLength() / BUSY_MEASURE_MILLIS); + } - @VisibleForTesting - static final String LAST_CHANGELOG_FILES_COMPACTED = "lastChangelogFilesCompacted"; + public void close() { + metricGroup.close(); + } - @VisibleForTesting - static final String LAST_REWRITE_INPUT_FILE_SIZE = "lastRewriteInputFileSize"; + /** Report metrics value to the {@link CompactionMetrics} object. */ + public interface Reporter { - @VisibleForTesting - static final String LAST_REWRITE_OUTPUT_FILE_SIZE = "lastRewriteOutputFileSize"; + CompactTimer getCompactTimer(); - @VisibleForTesting - static final String LAST_REWRITE_CHANGELOG_FILE_SIZE = "lastRewriteChangelogFileSize"; + void reportLevel0FileCount(long count); - @VisibleForTesting static final String LEVEL_0_FILE_COUNT = "level0FileCount"; - - private void registerGenericCompactionMetrics() { - metricGroup.gauge( - LAST_COMPACTION_DURATION, - () -> latestCompaction == null ? 0L : latestCompaction.getDuration()); - durationHistogram = metricGroup.histogram(COMPACTION_DURATION, HISTOGRAM_WINDOW_SIZE); - metricGroup.gauge( - LAST_TABLE_FILES_COMPACTED_BEFORE, - () -> - latestCompaction == null - ? 0L - : latestCompaction.getCompactedDataFilesBefore()); - metricGroup.gauge( - LAST_TABLE_FILES_COMPACTED_AFTER, - () -> - latestCompaction == null - ? 0L - : latestCompaction.getCompactedDataFilesAfter()); - metricGroup.gauge( - LAST_CHANGELOG_FILES_COMPACTED, - () -> latestCompaction == null ? 0L : latestCompaction.getCompactedChangelogs()); - metricGroup.gauge( - LAST_REWRITE_INPUT_FILE_SIZE, - () -> latestCompaction == null ? 0L : latestCompaction.getRewriteInputFileSize()); - metricGroup.gauge( - LAST_REWRITE_OUTPUT_FILE_SIZE, - () -> latestCompaction == null ? 0L : latestCompaction.getRewriteOutputFileSize()); - metricGroup.gauge( - LAST_REWRITE_CHANGELOG_FILE_SIZE, - () -> - latestCompaction == null - ? 0L - : latestCompaction.getRewriteChangelogFileSize()); - metricGroup.gauge(LEVEL_0_FILE_COUNT, () -> level0FileCount); + void unregister(); } - public void reportCompaction(CompactionStats compactionStats) { - latestCompaction = compactionStats; - durationHistogram.update(compactionStats.getDuration()); + private class ReporterImpl implements Reporter { + + private final PartitionAndBucket key; + private long level0FileCount; + + private ReporterImpl(PartitionAndBucket key) { + this.key = key; + this.level0FileCount = 0; + } + + @Override + public CompactTimer getCompactTimer() { + return compactTimers.computeIfAbsent( + Thread.currentThread().getId(), + ignore -> new CompactTimer(BUSY_MEASURE_MILLIS)); + } + + @Override + public void reportLevel0FileCount(long count) { + this.level0FileCount = count; + } + + @Override + public void unregister() { + reporters.remove(key); + } } - public void reportLevel0FileCount(long count) { - this.level0FileCount = count; + public Reporter createReporter(BinaryRow partition, int bucket) { + PartitionAndBucket key = new PartitionAndBucket(partition, bucket); + ReporterImpl reporter = new ReporterImpl(key); + reporters.put(key, reporter); + return reporter; } - public void close() { - metricGroup.close(); + private static class PartitionAndBucket { + + private final BinaryRow partition; + private final int bucket; + + private PartitionAndBucket(BinaryRow partition, int bucket) { + this.partition = partition; + this.bucket = bucket; + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PartitionAndBucket)) { + return false; + } + PartitionAndBucket other = (PartitionAndBucket) o; + return Objects.equals(partition, other.partition) && bucket == other.bucket; + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java deleted file mode 100644 index ba71a2dad09c..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.operation.metrics; - -import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.io.DataFileMeta; - -import java.util.List; - -/** Statistics for a compaction. */ -public class CompactionStats { - private final long duration; - private final long compactedDataFilesBefore; - private final long compactedDataFilesAfter; - private final long compactedChangelogs; - private final long rewriteInputFileSize; - private final long rewriteOutputFileSize; - private final long rewriteChangelogFileSize; - - public CompactionStats( - long compactionDuration, - List compactBefore, - List compactAfter, - List compactChangelog) { - this.duration = compactionDuration; - this.compactedDataFilesBefore = compactBefore.size(); - this.compactedDataFilesAfter = compactAfter.size(); - this.compactedChangelogs = compactChangelog.size(); - this.rewriteInputFileSize = rewriteFileSize(compactBefore); - this.rewriteOutputFileSize = rewriteFileSize(compactAfter); - this.rewriteChangelogFileSize = rewriteFileSize(compactChangelog); - } - - @VisibleForTesting - protected long getDuration() { - return duration; - } - - protected long getCompactedDataFilesBefore() { - return compactedDataFilesBefore; - } - - protected long getCompactedDataFilesAfter() { - return compactedDataFilesAfter; - } - - protected long getCompactedChangelogs() { - return compactedChangelogs; - } - - protected long getRewriteInputFileSize() { - return rewriteInputFileSize; - } - - protected long getRewriteOutputFileSize() { - return rewriteOutputFileSize; - } - - protected long getRewriteChangelogFileSize() { - return rewriteChangelogFileSize; - } - - private long rewriteFileSize(List files) { - return files.stream().mapToLong(DataFileMeta::fileSize).sum(); - } - - @Override - public String toString() { - return "CompactionStats{" - + "duration=" - + duration - + ", compactedDataFilesBefore=" - + compactedDataFilesBefore - + ", compactedDataFilesAfter=" - + compactedDataFilesAfter - + ", compactedChangelogs=" - + compactedChangelogs - + ", rewriteInputFileSize=" - + rewriteInputFileSize - + ", rewriteOutputFileSize=" - + rewriteOutputFileSize - + ", rewriteChangelogFileSize=" - + rewriteChangelogFileSize - + '}'; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/MetricUtils.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/MetricUtils.java new file mode 100644 index 000000000000..cc49d5a622d5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/MetricUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.metrics; + +import org.slf4j.Logger; + +/** Utils for metrics. */ +public class MetricUtils { + + public static void safeCall(Runnable runnable, Logger logger) { + try { + runnable.run(); + } catch (Throwable t) { + logger.warn("Exception occurs when reporting metrics", t); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java index 3a0c2e87b68a..a89531bc4d49 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java @@ -33,7 +33,7 @@ public class WriterBufferMetric { private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte"; private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte"; - private MetricGroup metricGroup; + private final MetricGroup metricGroup; public WriterBufferMetric( Supplier memoryPoolFactorySupplier, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java deleted file mode 100644 index 2bd0b940b752..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.operation.metrics; - -import org.apache.paimon.metrics.Counter; -import org.apache.paimon.metrics.Histogram; -import org.apache.paimon.metrics.MetricGroup; -import org.apache.paimon.metrics.MetricRegistry; - -/** Metrics for writer. */ -public class WriterMetrics { - - private static final String GROUP_NAME = "writer"; - - private static final int WINDOW_SAMPLE_SIZE = 100; - - private static final String WRITE_RECORD_NUM = "writeRecordCount"; - private static final String FLUSH_COST_MILLIS = "flushCostMillis"; - public static final String PREPARE_COMMIT_COST_MILLIS = "prepareCommitCostMillis"; - - private final Counter writeRecordNumCounter; - - private final Histogram bufferFlushCostMillis; - - private final Histogram prepareCommitCostMillis; - - private final MetricGroup metricGroup; - - public WriterMetrics(MetricRegistry registry, String tableName, String partition, int bucket) { - metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName, partition, bucket); - writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM); - bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS, WINDOW_SAMPLE_SIZE); - prepareCommitCostMillis = - metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS, WINDOW_SAMPLE_SIZE); - } - - public void incWriteRecordNum() { - writeRecordNumCounter.inc(); - } - - public void updateBufferFlushCostMillis(long bufferFlushCost) { - bufferFlushCostMillis.update(bufferFlushCost); - } - - public void updatePrepareCommitCostMillis(long cost) { - this.prepareCommitCostMillis.update(cost); - } - - public void close() { - metricGroup.close(); - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 5ab6b8537c45..c954aec1f114 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -602,8 +602,7 @@ private Pair> createWriter( spillable, CoreOptions.FILE_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( - options, AppendOnlyWriterTest.SCHEMA.getFieldNames()), - null); + options, AppendOnlyWriterTest.SCHEMA.getFieldNames())); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 219609ffb5bd..dcc02cde63de 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -85,8 +85,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception false, CoreOptions.FILE_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( - options, SCHEMA.getFieldNames()), - null); + options, SCHEMA.getFieldNames())); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); appendOnlyWriter.write( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index d3cfc2af71a2..1aa708306039 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -426,7 +426,6 @@ private MergeTreeWriter createMergeTreeWriter( options.commitForceCompact(), ChangelogProducer.NONE, null, - null, null); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactTimerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactTimerTest.java new file mode 100644 index 000000000000..44b15715b744 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactTimerTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.metrics; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CompactTimer}. */ +public class CompactTimerTest { + + @Test + public void testRandom() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + int totalLength = random.nextInt(1, 1000); + int queryLength; + if (random.nextBoolean() && totalLength > 100) { + queryLength = random.nextInt(1, 20); + } else { + queryLength = random.nextInt(1, totalLength + 1); + } + boolean[] running = new boolean[totalLength]; + CompactTimer timer = new CompactTimer(queryLength); + + boolean now = false; + for (int i = 0; i < totalLength; i++) { + if (random.nextInt(10) == 0) { + // change state + now = !now; + if (now) { + timer.start(i); + } else { + timer.finish(i); + } + } + running[i] = now; + + if (random.nextInt(10) == 0) { + // query + int expected = 0; + for (int j = 1; j <= queryLength; j++) { + if (i - j < 0) { + break; + } + if (running[i - j]) { + expected += 1; + } + } + assertThat(timer.calculateLength(i)).isEqualTo(expected); + } + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java index 507a2b4fe13d..b18250c9e327 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java @@ -18,166 +18,45 @@ package org.apache.paimon.operation.metrics; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.DataFileTestUtils; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metrics.Gauge; -import org.apache.paimon.metrics.Histogram; -import org.apache.paimon.metrics.Metric; import org.apache.paimon.metrics.MetricRegistryImpl; import org.junit.jupiter.api.Test; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.offset; /** Tests for {@link CompactionMetrics}. */ public class CompactionMetricsTest { - private static final String TABLE_NAME = "myTable"; - private static final String PARTITION = "date=20230623"; - private static final int BUCKET = 5; - - /** Tests that the metrics are updated properly. */ - @SuppressWarnings("unchecked") @Test - public void testMetricsAreUpdated() { - CompactionMetrics compactionMetrics = getCompactionMetrics(); - Map registeredGenericMetrics = - compactionMetrics.getMetricGroup().getMetrics(); - - // Check initial values - Gauge lastCompactionDuration = - (Gauge) - registeredGenericMetrics.get(CompactionMetrics.LAST_COMPACTION_DURATION); - Histogram compactionDuration = - (Histogram) registeredGenericMetrics.get(CompactionMetrics.COMPACTION_DURATION); - Gauge lastTableFilesCompactedBefore = - (Gauge) - registeredGenericMetrics.get( - CompactionMetrics.LAST_TABLE_FILES_COMPACTED_BEFORE); - Gauge lastTableFilesCompactedAfter = - (Gauge) - registeredGenericMetrics.get( - CompactionMetrics.LAST_TABLE_FILES_COMPACTED_AFTER); - Gauge lastChangelogFilesCompacted = - (Gauge) - registeredGenericMetrics.get( - CompactionMetrics.LAST_CHANGELOG_FILES_COMPACTED); - Gauge lastRewriteInputFileSize = - (Gauge) - registeredGenericMetrics.get( - CompactionMetrics.LAST_REWRITE_INPUT_FILE_SIZE); - Gauge lastRewriteOutputFileSize = - (Gauge) - registeredGenericMetrics.get( - CompactionMetrics.LAST_REWRITE_OUTPUT_FILE_SIZE); - Gauge lastRewriteChangelogFileSize = - (Gauge) - registeredGenericMetrics.get( - CompactionMetrics.LAST_REWRITE_CHANGELOG_FILE_SIZE); - Gauge level0FileCount = - (Gauge) registeredGenericMetrics.get(CompactionMetrics.LEVEL_0_FILE_COUNT); - - assertThat(lastCompactionDuration.getValue()).isEqualTo(0); - assertThat(compactionDuration.getCount()).isEqualTo(0); - assertThat(compactionDuration.getStatistics().size()).isEqualTo(0); - assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(0); - assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(0); - assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(0); - assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(0); - assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(0); - assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(0); - - // report once - reportOnce(compactionMetrics); - - // generic metrics value updated - assertThat(lastCompactionDuration.getValue()).isEqualTo(3000); - assertThat(compactionDuration.getCount()).isEqualTo(1); - assertThat(compactionDuration.getStatistics().size()).isEqualTo(1); - assertThat(compactionDuration.getStatistics().getValues()[0]).isEqualTo(3000L); - assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000); - assertThat(compactionDuration.getStatistics().getQuantile(0.5)) - .isCloseTo(3000.0, offset(0.001)); - assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(3000); - assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(3000); - assertThat(compactionDuration.getStatistics().getStdDev()).isEqualTo(0); - assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2); - assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2); - assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2); - assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001); - assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1101); - assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(3001); - - // report again - reportAgain(compactionMetrics); - - // generic metrics value updated - assertThat(lastCompactionDuration.getValue()).isEqualTo(6000); - assertThat(compactionDuration.getCount()).isEqualTo(2); - assertThat(compactionDuration.getStatistics().size()).isEqualTo(2); - assertThat(compactionDuration.getStatistics().getValues()[1]).isEqualTo(6000L); - assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000); - assertThat(compactionDuration.getStatistics().getQuantile(0.5)) - .isCloseTo(4500, offset(0.001)); - assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(4500); - assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(6000); - assertThat(compactionDuration.getStatistics().getStdDev()) - .isCloseTo(2121.320, offset(0.001)); - assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2); - assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2); - assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2); - assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001); - assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1201); - assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(2501); - - // test level0FileCount - compactionMetrics.reportLevel0FileCount(10); - assertThat(level0FileCount.getValue()).isEqualTo(10); - - compactionMetrics.reportLevel0FileCount(20); - assertThat(level0FileCount.getValue()).isEqualTo(20); - } - - private void reportOnce(CompactionMetrics compactionMetrics) { - List compactBefore = new ArrayList<>(); - List compactAfter = new ArrayList<>(); - List compactChangelog = new ArrayList<>(); - compactBefore.add(DataFileTestUtils.newFile(0, 1000)); - compactBefore.add(DataFileTestUtils.newFile(1001, 2000)); - compactAfter.add(DataFileTestUtils.newFile(400, 1000)); - compactAfter.add(DataFileTestUtils.newFile(1001, 1500)); - compactChangelog.add(DataFileTestUtils.newFile(0, 2000)); - compactChangelog.add(DataFileTestUtils.newFile(2001, 3000)); - - CompactionStats compactionStats = - new CompactionStats(3000, compactBefore, compactAfter, compactChangelog); - - compactionMetrics.reportCompaction(compactionStats); - } - - private void reportAgain(CompactionMetrics compactionMetrics) { - List compactBefore = new ArrayList<>(); - List compactAfter = new ArrayList<>(); - List compactChangelog = new ArrayList<>(); - compactBefore.add(DataFileTestUtils.newFile(2000, 3000)); - compactBefore.add(DataFileTestUtils.newFile(3001, 4000)); - compactAfter.add(DataFileTestUtils.newFile(600, 1200)); - compactAfter.add(DataFileTestUtils.newFile(1201, 1800)); - compactChangelog.add(DataFileTestUtils.newFile(0, 1500)); - compactChangelog.add(DataFileTestUtils.newFile(1501, 2500)); - - CompactionStats compactionStats = - new CompactionStats(6000, compactBefore, compactAfter, compactChangelog); - - compactionMetrics.reportCompaction(compactionStats); + public void testReportMetrics() { + CompactionMetrics metrics = new CompactionMetrics(new MetricRegistryImpl(), "myTable"); + assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(-1L); + assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(-1.0); + assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0); + + CompactionMetrics.Reporter[] reporters = new CompactionMetrics.Reporter[3]; + for (int i = 0; i < reporters.length; i++) { + reporters[i] = metrics.createReporter(BinaryRow.EMPTY_ROW, i); + } + + assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(0L); + assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(0.0); + assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0); + + reporters[0].reportLevel0FileCount(5); + reporters[1].reportLevel0FileCount(3); + reporters[2].reportLevel0FileCount(4); + assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(5L); + assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(4.0); + + reporters[0].reportLevel0FileCount(8); + assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(8L); + assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(5.0); } - private CompactionMetrics getCompactionMetrics() { - return new CompactionMetrics(new MetricRegistryImpl(), TABLE_NAME, PARTITION, BUCKET); + private Object getMetric(CompactionMetrics metrics, String metricName) { + return ((Gauge) metrics.getMetricGroup().getMetrics().get(metricName)).getValue(); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java deleted file mode 100644 index c72a1f637e15..000000000000 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink.cdc; - -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.flink.sink.Committable; -import org.apache.paimon.flink.sink.CommittableTypeInfo; -import org.apache.paimon.flink.sink.StoreSinkWriteImpl; -import org.apache.paimon.flink.utils.MetricUtils; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.SchemaUtils; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowKind; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.TraceableFileIO; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.state.JavaSerializer; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.function.Predicate; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link CdcRecordStoreWriteOperator}. */ -public class CdcDynamicBucketWriteOperatorTest { - - @TempDir java.nio.file.Path tempDir; - - private Path tablePath; - private String commitUser; - - @BeforeEach - public void before() { - tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString()); - commitUser = UUID.randomUUID().toString(); - } - - @AfterEach - public void after() { - // assert all connections are closed - Predicate pathPredicate = path -> path.toString().contains(tempDir.toString()); - assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty(); - assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty(); - } - - @Test - public void testCompactionMetrics() throws Exception { - RowType rowType = - RowType.of( - new DataType[] {DataTypes.INT(), DataTypes.INT()}, - new String[] {"pk", "col1"}); - FileStoreTable table = - createFileStoreTable( - rowType, Collections.emptyList(), Collections.singletonList("pk")); - OneInputStreamOperatorTestHarness, Committable> harness = - createTestHarness(table); - CdcDynamicBucketWriteOperator operator = - (CdcDynamicBucketWriteOperator) harness.getOneInputOperator(); - harness.open(); - - MetricGroup compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", table.name()) - .addGroup("partition", "_") - .addGroup("bucket", "0") - .addGroup("compaction"); - - long timestamp = 0; - long cpId = 1L; - Map fields = new HashMap<>(); - fields.put("pk", "1"); - fields.put("col1", "2"); - harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT, fields), 0), timestamp++); - operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true); - operator.getWrite().prepareCommit(true, cpId++); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - fields.put("pk", "1"); - fields.put("col1", "3"); - harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT, fields), 0), timestamp); - operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true); - operator.getWrite().prepareCommit(true, cpId); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - // operator closed, metric groups should be unregistered - harness.close(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")) - .isNull(); - } - - private OneInputStreamOperatorTestHarness, Committable> - createTestHarness(FileStoreTable table) throws Exception { - CdcDynamicBucketWriteOperator operator = - new CdcDynamicBucketWriteOperator( - table, - (t, commitUser, state, ioManager, memoryPool, metricGroup) -> - new StoreSinkWriteImpl( - t, - commitUser, - state, - ioManager, - false, - false, - true, - memoryPool, - metricGroup), - commitUser); - TypeSerializer> inputSerializer = new JavaSerializer<>(); - TypeSerializer outputSerializer = - new CommittableTypeInfo().createSerializer(new ExecutionConfig()); - OneInputStreamOperatorTestHarness, Committable> harness = - new OneInputStreamOperatorTestHarness<>(operator, inputSerializer); - harness.setup(outputSerializer); - return harness; - } - - private FileStoreTable createFileStoreTable( - RowType rowType, List partitions, List primaryKeys) throws Exception { - Options conf = new Options(); - conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, Duration.ofMillis(10)); - - TableSchema tableSchema = - SchemaUtils.forceCommit( - new SchemaManager(LocalFileIO.create(), tablePath), - new Schema(rowType.getFields(), partitions, primaryKeys, conf.toMap(), "")); - return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java index ff4132103a89..08d5fa72c2e6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java @@ -27,7 +27,6 @@ import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo; import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.flink.sink.StoreSinkWriteImpl; -import org.apache.paimon.flink.utils.MetricUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.AbstractFileStoreWrite; import org.apache.paimon.options.CatalogOptions; @@ -46,7 +45,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -71,7 +69,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; -import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link CdcRecordStoreMultiWriteOperator}. */ @@ -710,91 +707,6 @@ public void testUsingTheSameCompactExecutor() throws Exception { harness.close(); } - @Test - public void testSingleTableCompactionMetrics() throws Exception { - Identifier tableId = firstTable; - FileStoreTable table = (FileStoreTable) catalog.getTable(tableId); - - OneInputStreamOperatorTestHarness testHarness = - createTestHarness(catalogLoader); - - testHarness.open(); - - CdcRecordStoreMultiWriteOperator operator = - (CdcRecordStoreMultiWriteOperator) testHarness.getOneInputOperator(); - - MetricGroup compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", table.name()) - .addGroup("partition", "pt=0") - .addGroup("bucket", "0") - .addGroup("compaction"); - - long cpId = 1L; - Map fields = new HashMap<>(); - fields.put("pt", "0"); - fields.put("k", "1"); - fields.put("v", "10"); - - CdcMultiplexRecord record = - CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); - - testHarness.processElement(record, 0); - operator.writes().get(tableId).compact(row(0), 0, true); - operator.writes().get(tableId).prepareCommit(true, cpId++); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - fields.put("pt", "0"); - fields.put("k", "2"); - fields.put("v", "12"); - - CdcMultiplexRecord record1 = - CdcMultiplexRecord.fromCdcRecord( - databaseName, - tableId.getObjectName(), - new CdcRecord(RowKind.INSERT, fields)); - - testHarness.processElement(record1, 1); - operator.writes().get(tableId).compact(row(0), 0, true); - operator.writes().get(tableId).prepareCommit(true, cpId); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - // operator closed, metric groups should be unregistered - testHarness.close(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")) - .isNull(); - } - private OneInputStreamOperatorTestHarness createTestHarness(Catalog.Loader catalogLoader) throws Exception { CdcRecordStoreMultiWriteOperator operator = diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java index 8d03466f681c..9af7eabdaaad 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java @@ -19,11 +19,9 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.CoreOptions; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; import org.apache.paimon.flink.sink.StoreSinkWriteImpl; -import org.apache.paimon.flink.utils.MetricUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; @@ -42,7 +40,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.jupiter.api.AfterEach; @@ -254,78 +251,6 @@ public void testUpdateColumnType() throws Exception { harness.close(); } - @Test - public void testCompactionMetrics() throws Exception { - RowType rowType = - RowType.of( - new DataType[] {DataTypes.INT(), DataTypes.INT()}, - new String[] {"pk", "col1"}); - FileStoreTable table = - createFileStoreTable( - rowType, Collections.emptyList(), Collections.singletonList("pk")); - OneInputStreamOperatorTestHarness harness = - createTestHarness(table); - CdcRecordStoreWriteOperator operator = - (CdcRecordStoreWriteOperator) harness.getOneInputOperator(); - harness.open(); - - MetricGroup compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", table.name()) - .addGroup("partition", "_") - .addGroup("bucket", "0") - .addGroup("compaction"); - - long timestamp = 0; - long cpId = 1L; - Map fields = new HashMap<>(); - fields.put("pk", "1"); - fields.put("col1", "2"); - harness.processElement(new CdcRecord(RowKind.INSERT, fields), timestamp++); - operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true); - operator.getWrite().prepareCommit(true, cpId++); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - fields.put("pk", "1"); - fields.put("col1", "3"); - harness.processElement(new CdcRecord(RowKind.INSERT, fields), timestamp); - operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true); - operator.getWrite().prepareCommit(true, cpId); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - // operator closed, metric groups should be unregistered - harness.close(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")) - .isNull(); - } - private OneInputStreamOperatorTestHarness createTestHarness( FileStoreTable table) throws Exception { CdcRecordStoreWriteOperator operator = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 6c4b46ce3782..44ceab84b563 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -23,7 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.VersionedSerializerWrapper; -import org.apache.paimon.flink.utils.MetricUtils; +import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.NewFilesIncrement; @@ -577,14 +577,19 @@ public void testCommitMetrics() throws Exception { .addGroup("paimon") .addGroup("table", table.name()) .addGroup("commit"); - assertThat(MetricUtils.getGauge(commitMetricGroup, "lastTableFilesAdded").getValue()) + assertThat(TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAdded").getValue()) .isEqualTo(1L); - assertThat(MetricUtils.getGauge(commitMetricGroup, "lastTableFilesDeleted").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesDeleted") + .getValue()) .isEqualTo(0L); - assertThat(MetricUtils.getGauge(commitMetricGroup, "lastTableFilesAppended").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAppended") + .getValue()) .isEqualTo(1L); assertThat( - MetricUtils.getGauge(commitMetricGroup, "lastTableFilesCommitCompacted") + TestingMetricUtils.getGauge( + commitMetricGroup, "lastTableFilesCommitCompacted") .getValue()) .isEqualTo(0L); @@ -602,14 +607,19 @@ public void testCommitMetrics() throws Exception { testHarness.snapshot(cpId, timestamp++); testHarness.notifyOfCompletedCheckpoint(cpId); - assertThat(MetricUtils.getGauge(commitMetricGroup, "lastTableFilesAdded").getValue()) + assertThat(TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAdded").getValue()) .isEqualTo(3L); - assertThat(MetricUtils.getGauge(commitMetricGroup, "lastTableFilesDeleted").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesDeleted") + .getValue()) .isEqualTo(3L); - assertThat(MetricUtils.getGauge(commitMetricGroup, "lastTableFilesAppended").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAppended") + .getValue()) .isEqualTo(2L); assertThat( - MetricUtils.getGauge(commitMetricGroup, "lastTableFilesCommitCompacted") + TestingMetricUtils.getGauge( + commitMetricGroup, "lastTableFilesCommitCompacted") .getValue()) .isEqualTo(4L); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index 9a71cc2eab29..68c3c42b0a50 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -20,22 +20,16 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.flink.util.AbstractTestBase; -import org.apache.paimon.flink.utils.MetricUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.io.DataFileMetaSerializer; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -52,12 +46,10 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.utils.TraceableFileIO; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -67,7 +59,6 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.util.Arrays; @@ -187,188 +178,6 @@ public void testCompactParallelism() throws Exception { .isEqualTo(sinkParalellism); } - @Test - public void testCompactionMetrics() throws Exception { - FileStoreTable table = createFileStoreTable(); - prepareDataFile(table); - - StoreCompactOperator operator = createCompactOperator(table); - OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator); - testHarness.open(); - - MetricGroup compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", table.name()) - .addGroup("partition", "dt=20221208_hh=15") - .addGroup("bucket", "0") - .addGroup("compaction"); - DataFileMetaSerializer fileMetaSerializer = new DataFileMetaSerializer(); - RowData record = - new FlinkRowData( - GenericRow.of( - 1L, - partition("20221208", 15), - 0, - fileMetaSerializer.serializeList(Collections.emptyList()))); - - long timestamp = 0; - testHarness.processElement(record, timestamp++); - testHarness.endInput(); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", table.name()) - .addGroup("partition", "dt=20221208_hh=16") - .addGroup("bucket", "0") - .addGroup("compaction"); - record = - new FlinkRowData( - GenericRow.of( - 2L, - partition("20221208", 16), - 0, - fileMetaSerializer.serializeList(Collections.emptyList()))); - - testHarness.processElement(record, timestamp); - testHarness.endInput(); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - // operator closed, metric groups should be unregistered - testHarness.close(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")) - .isNull(); - } - - @Test - public void testMultiTablesCompactionMetrics(@TempDir java.nio.file.Path tempDir) - throws Exception { - Path warehouse = new Path(TraceableFileIO.SCHEME + "://" + tempDir); - Options catalogOptions = new Options(); - catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.toString()); - catalogOptions.set(CatalogOptions.URI, ""); - Catalog.Loader catalogLoader = - () -> CatalogFactory.createCatalog(CatalogContext.create(catalogOptions)); - Catalog catalog = catalogLoader.load(); - String databaseName = "test_db"; - catalog.createDatabase(databaseName, true); - Identifier firstTableId = Identifier.create(databaseName, "test_table1"); - Identifier secondTableId = Identifier.create(databaseName, "test_table2"); - FileStoreTable firstTable = createCatalogTable(catalog, firstTableId); - FileStoreTable secondTable = createCatalogTable(catalog, secondTableId); - prepareDataFile(firstTable); - prepareDataFile(secondTable); - - MultiTablesStoreCompactOperator operator = createMultiTablesCompactOperator(catalogLoader); - OneInputStreamOperatorTestHarness testHarness = - createMultiTablesTestHarness(operator); - testHarness.open(); - - MetricGroup compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", firstTable.name()) - .addGroup("partition", "_") - .addGroup("bucket", "0") - .addGroup("compaction"); - DataFileMetaSerializer fileMetaSerializer = new DataFileMetaSerializer(); - RowData record = - new FlinkRowData( - GenericRow.of( - 1L, - serializeBinaryRow(BinaryRow.EMPTY_ROW), - 0, - fileMetaSerializer.serializeList(Collections.emptyList()), - BinaryString.fromString(databaseName), - BinaryString.fromString(firstTableId.getObjectName()))); - - long timestamp = 0; - testHarness.processElement(record, timestamp++); - testHarness.endInput(); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", secondTable.name()) - .addGroup("partition", "_") - .addGroup("bucket", "0") - .addGroup("compaction"); - record = - new FlinkRowData( - GenericRow.of( - 2L, - serializeBinaryRow(BinaryRow.EMPTY_ROW), - 0, - fileMetaSerializer.serializeList(Collections.emptyList()), - BinaryString.fromString(databaseName), - BinaryString.fromString(secondTableId.getObjectName()))); - - testHarness.processElement(record, timestamp); - testHarness.endInput(); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - // operator closed, metric groups should be unregistered - testHarness.close(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")) - .isNull(); - } - private List> getSpecifiedPartitions() { Map partition1 = new HashMap<>(); partition1.put("dt", "20221208"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java index 608b659eb07b..23b983b06238 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java @@ -19,10 +19,8 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.CoreOptions; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.flink.utils.MetricUtils; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.operation.KeyValueFileStoreWrite; @@ -45,7 +43,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.streaming.api.datastream.DataStream; @@ -55,15 +52,12 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore; import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -88,142 +82,6 @@ public void testOptimizeKeyValueWriterForBatch() throws Exception { assertThat(testSpillable(streamExecutionEnvironment, fileStoreTable)).isFalse(); } - @Test - public void testCompactionMetrics() throws Exception { - FileStoreTable table = createFileStoreTable(); - RowDataStoreWriteOperator operator = createWriteOperator(table); - OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator); - MetricGroup compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", table.name()) - .addGroup("partition", "_") - .addGroup("bucket", "0") - .addGroup("compaction"); - testHarness.open(); - - final GenericRow row1 = GenericRow.of(1, 2); - final GenericRow row2 = GenericRow.of(2, 3); - final GenericRow row3 = GenericRow.of(3, 4); - final GenericRow row4 = GenericRow.of(4, 5); - - List> streamRecords = new ArrayList<>(); - streamRecords.add(new StreamRecord<>(row1)); - streamRecords.add(new StreamRecord<>(row2)); - streamRecords.add(new StreamRecord<>(row3)); - - long cpId = 1L; - testHarness.processElements(streamRecords); - operator.write.compact(BinaryRow.EMPTY_ROW, 0, true); - operator.write.prepareCommit(true, cpId++); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - testHarness.processElement(row4, 0); - operator.write.compact(BinaryRow.EMPTY_ROW, 0, true); - operator.write.prepareCommit(true, cpId); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - // operator closed, metric groups should be unregistered - testHarness.close(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")) - .isNull(); - } - - @Test - public void testDynamicBucketCompactionMetrics() throws Exception { - FileStoreTable table = createFileStoreTable(); - DynamicBucketRowWriteOperator operator = createDynamicBucketWriteOperator(table); - OneInputStreamOperatorTestHarness, Committable> testHarness = - createDynamicBucketTestHarness(operator); - MetricGroup compactionMetricGroup = - operator.getMetricGroup() - .addGroup("paimon") - .addGroup("table", table.name()) - .addGroup("partition", "_") - .addGroup("bucket", "0") - .addGroup("compaction"); - testHarness.open(); - - final GenericRow row1 = GenericRow.of(1, 2); - final GenericRow row2 = GenericRow.of(2, 3); - final GenericRow row3 = GenericRow.of(3, 4); - final GenericRow row4 = GenericRow.of(4, 5); - - List>> streamRecords = new ArrayList<>(); - streamRecords.add(new StreamRecord<>(Tuple2.of(row1, 0))); - streamRecords.add(new StreamRecord<>(Tuple2.of(row2, 1))); - streamRecords.add(new StreamRecord<>(Tuple2.of(row3, 2))); - - long cpId = 1L; - testHarness.processElements(streamRecords); - operator.write.compact(BinaryRow.EMPTY_ROW, 0, true); - operator.write.prepareCommit(true, cpId++); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - testHarness.processElement(Tuple2.of(row4, 0), 0); - operator.write.compact(BinaryRow.EMPTY_ROW, 0, true); - operator.write.prepareCommit(true, cpId); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore") - .getValue()) - .isEqualTo(2L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter") - .getValue()) - .isEqualTo(1L); - assertThat( - MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted") - .getValue()) - .isEqualTo(0L); - - // operator closed, metric groups should be unregistered - testHarness.close(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedBefore")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastTableFilesCompactedAfter")) - .isNull(); - assertThat(MetricUtils.getGauge(compactionMetricGroup, "lastChangelogFilesCompacted")) - .isNull(); - } - private boolean testSpillable( StreamExecutionEnvironment streamExecutionEnvironment, FileStoreTable fileStoreTable) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 3730a1930cea..cc7bb27a90d6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -28,7 +28,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.VersionedSerializerWrapper; -import org.apache.paimon.flink.utils.MetricUtils; +import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.WrappedManifestCommittable; @@ -586,25 +586,39 @@ public void testCommitMetrics() throws Exception { .addGroup("table", table2.name()) .addGroup("commit"); - assertThat(MetricUtils.getGauge(commitMetricGroup1, "lastTableFilesAdded").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup1, "lastTableFilesAdded") + .getValue()) .isEqualTo(1L); - assertThat(MetricUtils.getGauge(commitMetricGroup1, "lastTableFilesDeleted").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup1, "lastTableFilesDeleted") + .getValue()) .isEqualTo(0L); - assertThat(MetricUtils.getGauge(commitMetricGroup1, "lastTableFilesAppended").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup1, "lastTableFilesAppended") + .getValue()) .isEqualTo(1L); assertThat( - MetricUtils.getGauge(commitMetricGroup1, "lastTableFilesCommitCompacted") + TestingMetricUtils.getGauge( + commitMetricGroup1, "lastTableFilesCommitCompacted") .getValue()) .isEqualTo(0L); - assertThat(MetricUtils.getGauge(commitMetricGroup2, "lastTableFilesAdded").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup2, "lastTableFilesAdded") + .getValue()) .isEqualTo(4L); - assertThat(MetricUtils.getGauge(commitMetricGroup2, "lastTableFilesDeleted").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup2, "lastTableFilesDeleted") + .getValue()) .isEqualTo(3L); - assertThat(MetricUtils.getGauge(commitMetricGroup2, "lastTableFilesAppended").getValue()) + assertThat( + TestingMetricUtils.getGauge(commitMetricGroup2, "lastTableFilesAppended") + .getValue()) .isEqualTo(3L); assertThat( - MetricUtils.getGauge(commitMetricGroup2, "lastTableFilesCommitCompacted") + TestingMetricUtils.getGauge( + commitMetricGroup2, "lastTableFilesCommitCompacted") .getValue()) .isEqualTo(4L); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java index 82d7b0e3e30a..1fc6e8e615f9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java @@ -23,7 +23,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.utils.InternalRowTypeSerializer; import org.apache.paimon.flink.utils.InternalTypeInfo; -import org.apache.paimon.flink.utils.MetricUtils; +import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.ConfigOption; @@ -38,9 +38,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -92,25 +90,6 @@ public void testMetric() throws Exception { harness.notifyOfCompletedCheckpoint(1); OperatorMetricGroup metricGroup = rowDataStoreWriteOperator.getMetricGroup(); - MetricGroup writerMetricGroup = - metricGroup - .addGroup("paimon") - .addGroup("table", tableName) - .addGroup("partition", "_") - .addGroup("bucket", "0") - .addGroup("writer"); - - Counter writeRecordCount = MetricUtils.getCounter(writerMetricGroup, "writeRecordCount"); - Assertions.assertThat(writeRecordCount.getCount()).isEqualTo(size); - - // test histogram has sample - Histogram flushCostMS = MetricUtils.getHistogram(writerMetricGroup, "flushCostMillis"); - Assertions.assertThat(flushCostMS.getCount()).isGreaterThan(0); - - Histogram prepareCommitCostMS = - MetricUtils.getHistogram(writerMetricGroup, "prepareCommitCostMillis"); - Assertions.assertThat(prepareCommitCostMS.getCount()).isGreaterThan(0); - MetricGroup writerBufferMetricGroup = metricGroup .addGroup("paimon") @@ -118,17 +97,17 @@ public void testMetric() throws Exception { .addGroup("writerBuffer"); Gauge bufferPreemptCount = - MetricUtils.getGauge(writerBufferMetricGroup, "bufferPreemptCount"); + TestingMetricUtils.getGauge(writerBufferMetricGroup, "bufferPreemptCount"); Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0); Gauge totalWriteBufferSizeByte = - MetricUtils.getGauge(writerBufferMetricGroup, "totalWriteBufferSizeByte"); + TestingMetricUtils.getGauge(writerBufferMetricGroup, "totalWriteBufferSizeByte"); Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256); GenericRow row = GenericRow.of(1, 1); harness.processElement(row, 1); Gauge usedWriteBufferSizeByte = - MetricUtils.getGauge(writerBufferMetricGroup, "usedWriteBufferSizeByte"); + TestingMetricUtils.getGauge(writerBufferMetricGroup, "usedWriteBufferSizeByte"); Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java index 27039a6ee459..13613dab063a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java @@ -20,7 +20,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.utils.MetricUtils; +import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -87,9 +87,11 @@ public void staticFileStoreSourceScanMetricsTest() throws Exception { 1, FlinkConnectorOptions.SplitAssignMode.FAIR); staticFileStoreSource.restoreEnumerator(context, null); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) + assertThat(TestingMetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) .isEqualTo(1L); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles").getValue()) + assertThat( + TestingMetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles") + .getValue()) .isEqualTo(1L); } @@ -102,20 +104,24 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception { (ContinuousFileSplitEnumerator) continuousFileStoreSource.restoreEnumerator(context, null); enumerator.scanNextSnapshot(); - assertThat(MetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount()) + assertThat(TestingMetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount()) .isEqualTo(1); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) + assertThat(TestingMetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) .isEqualTo(1L); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles").getValue()) + assertThat( + TestingMetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles") + .getValue()) .isEqualTo(1L); writeAgain(); enumerator.scanNextSnapshot(); - assertThat(MetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount()) + assertThat(TestingMetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount()) .isEqualTo(2); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) + assertThat(TestingMetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) .isEqualTo(1L); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles").getValue()) + assertThat( + TestingMetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles") + .getValue()) .isEqualTo(1L); } @@ -125,9 +131,11 @@ public void logHybridFileStoreSourceScanMetricsTest() throws Exception { FlinkSource logHybridFileStoreSource = LogHybridSourceFactory.buildHybridFirstSource(table, null, null); logHybridFileStoreSource.restoreEnumerator(context, null); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) + assertThat(TestingMetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) .isEqualTo(1L); - assertThat(MetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles").getValue()) + assertThat( + TestingMetricUtils.getGauge(scanMetricGroup, "lastScanResultedTableFiles") + .getValue()) .isEqualTo(1L); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 3c046457e97b..0f6d66815d41 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -23,7 +23,7 @@ import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; -import org.apache.paimon.flink.utils.MetricUtils; +import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; @@ -194,24 +194,26 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { MetricGroup readerOperatorMetricGroup = readOperator.getMetricGroup(); harness.open(); assertThat( - MetricUtils.getGauge(readerOperatorMetricGroup, "currentFetchEventTimeLag") + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "currentFetchEventTimeLag") .getValue()) .isEqualTo(-1L); assertThat( - MetricUtils.getGauge(readerOperatorMetricGroup, "currentEmitEventTimeLag") + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "currentEmitEventTimeLag") .getValue()) .isEqualTo(-1L); harness.processElement(new StreamRecord<>(splits.get(0))); assertThat( (Long) - MetricUtils.getGauge( + TestingMetricUtils.getGauge( readerOperatorMetricGroup, "currentFetchEventTimeLag") .getValue()) .isGreaterThan(0); assertThat( (Long) - MetricUtils.getGauge( + TestingMetricUtils.getGauge( readerOperatorMetricGroup, "currentEmitEventTimeLag") .getValue()) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java similarity index 98% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java index b37e4db805d3..39ab5ae30bcb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java @@ -29,7 +29,7 @@ import java.util.Map; /** Test utils for Flink's {@link Metric}s. */ -public class MetricUtils { +public class TestingMetricUtils { public static Gauge getGauge(MetricGroup group, String metricName) { return (Gauge) getMetric(group, metricName);