diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 51ddbbed0d49..9c993c09df2e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -34,6 +34,7 @@ import org.apache.paimon.io.RowDataRollingFileWriter; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -74,6 +75,8 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final FieldStatsCollector.Factory[] statsCollectors; private final IOManager ioManager; + private WriterMetrics writerMetrics; + public AppendOnlyWriter( FileIO fileIO, IOManager ioManager, @@ -89,7 +92,8 @@ public AppendOnlyWriter( boolean useWriteBuffer, boolean spillable, String fileCompression, - FieldStatsCollector.Factory[] statsCollectors) { + FieldStatsCollector.Factory[] statsCollectors, + WriterMetrics writerMetrics) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -114,10 +118,12 @@ public AppendOnlyWriter( compactBefore.addAll(increment.compactIncrement().compactBefore()); compactAfter.addAll(increment.compactIncrement().compactAfter()); } + this.writerMetrics = writerMetrics; } @Override public void write(InternalRow rowData) throws Exception { + long start = System.currentTimeMillis(); Preconditions.checkArgument( rowData.getRowKind() == RowKind.INSERT, "Append-only writer can only accept insert row kind, but current row kind is: %s", @@ -133,6 +139,11 @@ public void write(InternalRow rowData) throws Exception { throw new RuntimeException("Mem table is too small to hold a single element."); } } + + if (writerMetrics != null) { + writerMetrics.incWriteRecordNum(); + writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start); + } } @Override @@ -152,13 +163,19 @@ public Collection dataFiles() { @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { + long start = System.currentTimeMillis(); flush(false, false); trySyncLatestCompaction(waitCompaction || forceCompact); - return drainIncrement(); + CommitIncrement increment = drainIncrement(); + if (writerMetrics != null) { + writerMetrics.updatePrepareCommitCostMS(System.currentTimeMillis() - start); + } + return increment; } private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { + long start = System.currentTimeMillis(); List flushedFiles = sinkWriter.flush(); // add new generated files @@ -166,6 +183,9 @@ private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction trySyncLatestCompaction(waitForLatestCompaction); compactManager.triggerCompaction(forcedFullCompaction); newFiles.addAll(flushedFiles); + if (writerMetrics != null) { + writerMetrics.updateBufferFlushCostMS(System.currentTimeMillis() - start); + } } @Override @@ -204,6 +224,7 @@ private RowDataRollingFileWriter createRollingRowWriter() { private void trySyncLatestCompaction(boolean blocking) throws ExecutionException, InterruptedException { + long start = System.currentTimeMillis(); compactManager .getCompactionResult(blocking) .ifPresent( @@ -211,6 +232,7 @@ private void trySyncLatestCompaction(boolean blocking) compactBefore.addAll(result.before()); compactAfter.addAll(result.after()); }); + writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start); } private CommitIncrement drainIncrement() { diff --git a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java index 73bb66de230f..1b53ec711f84 100644 --- a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java @@ -37,9 +37,13 @@ public class MemoryPoolFactory { private Iterable owners; + private final long totalBufferSize; + private long bufferPreemptCount; + public MemoryPoolFactory(MemorySegmentPool innerPool) { this.innerPool = innerPool; this.totalPages = innerPool.freePages(); + this.totalBufferSize = (long) totalPages * innerPool.pageSize(); } public MemoryPoolFactory addOwners(Iterable newOwners) { @@ -81,12 +85,38 @@ private void preemptMemory(MemoryOwner owner) { if (max != null) { try { max.flushMemory(); + ++bufferPreemptCount; } catch (Exception e) { throw new RuntimeException(e); } } } + public BufferStat bufferStat() { + return new BufferStat(); + } + + /** stat for buffer metric. */ + public class BufferStat { + public long bufferPreemptCount() { + return bufferPreemptCount; + } + + public long usedBufferSize() { + long usedBufferSize = 0L; + if (owners != null) { + for (MemoryOwner owner : owners) { + usedBufferSize += owner.memoryOccupancy(); + } + } + return usedBufferSize; + } + + public long totalBufferSize() { + return totalBufferSize; + } + } + private class OwnerMemoryPool implements MemorySegmentPool { private final MemoryOwner owner; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index c43f97604757..13aa71094dd0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -33,6 +33,7 @@ import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.mergetree.compact.MergeFunction; +import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.RecordWriter; @@ -74,6 +75,8 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private long newSequenceNumber; private WriteBuffer writeBuffer; + private WriterMetrics writerMetrics; + public MergeTreeWriter( boolean writeBufferSpillable, int sortMaxFan, @@ -85,7 +88,8 @@ public MergeTreeWriter( KeyValueFileWriterFactory writerFactory, boolean commitForceCompact, ChangelogProducer changelogProducer, - @Nullable CommitIncrement increment) { + @Nullable CommitIncrement increment, + WriterMetrics writerMetrics) { this.writeBufferSpillable = writeBufferSpillable; this.sortMaxFan = sortMaxFan; this.ioManager = ioManager; @@ -114,6 +118,7 @@ public MergeTreeWriter( compactAfter.addAll(increment.compactIncrement().compactAfter()); compactChangelog.addAll(increment.compactIncrement().changelogFiles()); } + this.writerMetrics = writerMetrics; } private long newSequenceNumber() { @@ -139,10 +144,12 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { @Override public void write(KeyValue kv) throws Exception { + long start = System.currentTimeMillis(); long sequenceNumber = kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE ? newSequenceNumber() : kv.sequenceNumber(); + boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value()); if (!success) { flushWriteBuffer(false, false); @@ -151,6 +158,11 @@ public void write(KeyValue kv) throws Exception { throw new RuntimeException("Mem table is too small to hold a single element."); } } + + if (writerMetrics != null) { + writerMetrics.incWriteRecordNum(); + writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start); + } } @Override @@ -183,6 +195,7 @@ public void flushMemory() throws Exception { private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { + long start = System.currentTimeMillis(); if (writeBuffer.size() > 0) { if (compactManager.shouldWaitForLatestCompaction()) { waitForLatestCompaction = true; @@ -222,16 +235,26 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul trySyncLatestCompaction(waitForLatestCompaction); compactManager.triggerCompaction(forcedFullCompaction); + if (writerMetrics != null) { + writerMetrics.updateBufferFlushCostMS(System.currentTimeMillis() - start); + } } @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { + long start = System.currentTimeMillis(); flushWriteBuffer(waitCompaction, false); trySyncLatestCompaction( waitCompaction || commitForceCompact || compactManager.shouldWaitForPreparingCheckpoint()); - return drainIncrement(); + + CommitIncrement increment = drainIncrement(); + + if (writerMetrics != null) { + writerMetrics.updatePrepareCommitCostMS(System.currentTimeMillis() - start); + } + return increment; } @Override @@ -281,8 +304,10 @@ private void updateCompactResult(CompactResult result) { } private void trySyncLatestCompaction(boolean blocking) throws Exception { + long start = System.currentTimeMillis(); Optional result = compactManager.getCompactionResult(blocking); result.ifPresent(this::updateCompactResult); + writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java index 60f0e2a07852..ac9cc7cdaa0b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java @@ -27,10 +27,18 @@ public abstract class MetricRegistry { private static final String KEY_TABLE = "table"; private static final String KEY_PARTITION = "partition"; private static final String KEY_BUCKET = "bucket"; + private static final String COMMIT_USER = "commit_user"; public MetricGroup tableMetricGroup(String groupName, String tableName) { + return tableMetricGroup(groupName, tableName, null); + } + + public MetricGroup tableMetricGroup(String groupName, String tableName, String commitUser) { Map variables = new LinkedHashMap<>(); variables.put(KEY_TABLE, tableName); + if (commitUser != null) { + variables.put(COMMIT_USER, commitUser); + } return createMetricGroup(groupName, variables); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index a5f64562bf0d..6e014b806b4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -31,6 +31,7 @@ import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.metrics.CompactionMetrics; +import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.CommitIncrement; @@ -77,7 +78,10 @@ public abstract class AbstractFileStoreWrite private boolean closeCompactExecutorWhenLeaving = true; private boolean ignorePreviousFiles = false; protected boolean isStreamingMode = false; - private MetricRegistry metricRegistry = null; + private MetricRegistry metricRegistry; + + protected WriterMetrics writerMetrics; + private final String tableName; private final FileStorePathFactory pathFactory; @@ -369,6 +373,14 @@ public CompactionMetrics getCompactionMetrics(BinaryRow partition, int bucket) { return null; } + @Nullable + public WriterMetrics getWriterMetrics() { + if (metricRegistry != null && writerMetrics == null) { + writerMetrics = new WriterMetrics(metricRegistry, tableName, commitUser); + } + return writerMetrics; + } + private String getPartitionString(FileStorePathFactory pathFactory, BinaryRow partition) { String partitionStr = pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_"); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index eb4353f8c42d..145241010ce0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -31,6 +31,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.RowDataRollingFileWriter; +import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.table.BucketMode; @@ -124,6 +125,9 @@ protected RecordWriter createWriter( targetFileSize, compactRewriter(partition, bucket), getCompactionMetrics(partition, bucket)); + WriterMetrics writerMetrics = getWriterMetrics(); + registerMemoryPoolMetric(writerMetrics); + return new AppendOnlyWriter( fileIO, ioManager, @@ -139,7 +143,8 @@ protected RecordWriter createWriter( useWriteBuffer, spillable, fileCompression, - statsCollectors); + statsCollectors, + writerMetrics); } public AppendOnlyCompactManager.CompactRewriter compactRewriter( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index 96fe0e1f589b..f68344484821 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -25,6 +25,7 @@ import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.utils.RecordWriter; @@ -125,4 +126,6 @@ List prepareCommit(boolean waitCompaction, long commitIdentifier) /** With metrics to measure compaction. */ FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry); + + WriterMetrics getWriterMetrics(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index f6097c18314e..69c8a0ad4cbc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -51,6 +51,7 @@ import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter; import org.apache.paimon.mergetree.compact.UniversalCompaction; import org.apache.paimon.operation.metrics.CompactionMetrics; +import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.types.RowType; @@ -170,6 +171,10 @@ protected MergeTreeWriter createWriter( compactExecutor, levels, getCompactionMetrics(partition, bucket)); + + WriterMetrics writerMetrics = getWriterMetrics(); + registerMemoryPoolMetric(writerMetrics); + return new MergeTreeWriter( bufferSpillable(), options.localSortMaxNumFileHandles(), @@ -181,7 +186,8 @@ protected MergeTreeWriter createWriter( writerFactory, options.commitForceCompact(), options.changelogProducer(), - restoreIncrement); + restoreIncrement, + writerMetrics); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 9fbff8efa012..6a1aa3234f98 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -24,6 +24,7 @@ import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.operation.metrics.WriterMetrics; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -51,7 +52,7 @@ public abstract class MemoryFileStoreWrite extends AbstractFileStoreWrite private final CoreOptions options; protected final CacheManager cacheManager; - private MemoryPoolFactory writeBufferPool; + protected MemoryPoolFactory writeBufferPool; public MemoryFileStoreWrite( String commitUser, @@ -114,4 +115,13 @@ protected void notifyNewWriter(RecordWriter writer) { } writeBufferPool.notifyNewOwner((MemoryOwner) writer); } + + protected void registerMemoryPoolMetric(WriterMetrics writerMetrics) { + if (writeBufferPool != null && writerMetrics != null) { + MemoryPoolFactory.BufferStat bufferStat = writeBufferPool.bufferStat(); + writerMetrics.setMemoryPreemptCount(bufferStat::bufferPreemptCount); + writerMetrics.setUsedWriteBufferSize(bufferStat::usedBufferSize); + writerMetrics.setTotaldWriteBufferSize(bufferStat::totalBufferSize); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java new file mode 100644 index 000000000000..6bb803b4704d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java @@ -0,0 +1,115 @@ +package org.apache.paimon.operation.metrics; + +import org.apache.paimon.metrics.Counter; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.metrics.MetricGroup; +import org.apache.paimon.metrics.MetricRegistry; + +import java.util.function.Supplier; + +/** Metrics for writer. */ +public class WriterMetrics { + + private static final String GROUP_NAME = "writer"; + + private static final int WINDOW_SAMPLE_SIZE = 10000; + private static final String WRITE_RECORD_NUM = "writeRecordCount"; + + private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount"; + + private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte"; + + private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte"; + + public static final String WRITE_COST_MS = "writeCostMS"; + + private static final String FLUSH_COST_MS = "flushCostMS"; + + public static final String PREPARE_COMMIT_COST = "prepareCommitCostMS"; + + public static final String SYNC_LASTEST_COMPACTION_COST_MS = "syncLastestCompactionCostMS"; + + private final Counter writeRecordNumCounter; + + private final Gauge memoryPreemptCount; + + private final Gauge usedWriteBufferSizeGauge; + + private final Gauge totalWriteBufferSizeGauge; + + private final Histogram writeCostMS; + + private final Histogram bufferFlushCostMS; + + private final Histogram prepareCommitCostMS; + + private final Histogram syncLastestCompactionCostMS; + + private Stats stats; + + public WriterMetrics(MetricRegistry registry, String tableName, String commitUser) { + stats = new Stats(); + MetricGroup metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName, commitUser); + writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM); + + // buffer + memoryPreemptCount = + metricGroup.gauge(BUFFER_PREEMPT_COUNT, () -> stats.bufferPreemptCount.get()); + + usedWriteBufferSizeGauge = + metricGroup.gauge(USED_WRITE_BUFFER_SIZE, () -> stats.usedWriteBufferSize.get()); + + totalWriteBufferSizeGauge = + metricGroup.gauge(TOTAL_WRITE_BUFFER_SIZE, () -> stats.totalWriteBufferSize.get()); + + // cost + writeCostMS = metricGroup.histogram(WRITE_COST_MS, WINDOW_SAMPLE_SIZE); + bufferFlushCostMS = metricGroup.histogram(FLUSH_COST_MS, WINDOW_SAMPLE_SIZE); + + // prepareCommittime + prepareCommitCostMS = metricGroup.histogram(PREPARE_COMMIT_COST, WINDOW_SAMPLE_SIZE); + + syncLastestCompactionCostMS = + metricGroup.histogram(SYNC_LASTEST_COMPACTION_COST_MS, WINDOW_SAMPLE_SIZE); + } + + public void incWriteRecordNum() { + writeRecordNumCounter.inc(); + } + + public void updateWriteCostMS(long bufferAppendCost) { + writeCostMS.update(bufferAppendCost); + } + + public void updateBufferFlushCostMS(long bufferFlushCost) { + bufferFlushCostMS.update(bufferFlushCost); + } + + public void updatePrepareCommitCostMS(long cost) { + this.prepareCommitCostMS.update(cost); + } + + public void updateSyncLastestCompactionCostMS(long cost) { + this.syncLastestCompactionCostMS.update(cost); + } + + public void setMemoryPreemptCount(Supplier bufferPreemptNumSupplier) { + this.stats.bufferPreemptCount = bufferPreemptNumSupplier; + } + + public void setUsedWriteBufferSize(Supplier usedWriteBufferSize) { + this.stats.usedWriteBufferSize = usedWriteBufferSize; + } + + public void setTotaldWriteBufferSize(Supplier totaldWriteBufferSize) { + this.stats.totalWriteBufferSize = totaldWriteBufferSize; + } + + /** buffer stat for metric. */ + public class Stats { + private Supplier bufferPreemptCount = () -> -1L; + private Supplier totalWriteBufferSize = () -> -1L; + private Supplier usedWriteBufferSize = () -> -1L; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 272870cc4e1f..6264980390fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -157,6 +157,7 @@ record -> { "Append only writer can not accept row with RowKind %s", record.row().getRowKind()); return record.row(); - }); + }, + name()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 02c6a36c2e67..9ce53df587cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -204,6 +204,7 @@ record -> { sequenceNumber, record.row().getRowKind(), record.row()); - }); + }, + name()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index f34a38a44931..c8809125b88e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -50,13 +50,17 @@ public class TableWriteImpl private boolean batchCommitted = false; + private String tableName; + public TableWriteImpl( FileStoreWrite write, KeyAndBucketExtractor keyAndBucketExtractor, - RecordExtractor recordExtractor) { + RecordExtractor recordExtractor, + String tableName) { this.write = (AbstractFileStoreWrite) write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; + this.tableName = tableName; } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 4eefdfef64f1..e18eaf066f87 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -547,7 +547,8 @@ private Pair> createWriter( spillable, CoreOptions.FILE_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( - options, AppendOnlyWriterTest.SCHEMA.getFieldNames())); + options, AppendOnlyWriterTest.SCHEMA.getFieldNames()), + null); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index c8f5ebd00f5c..2c24e7135d9e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -85,7 +85,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception false, CoreOptions.FILE_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( - options, SCHEMA.getFieldNames())); + options, SCHEMA.getFieldNames()), + null); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); appendOnlyWriter.write( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 4e544b0e656a..34c0eb4d9283 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -424,6 +424,7 @@ private MergeTreeWriter createMergeTreeWriter( writerFactory, options.commitForceCompact(), ChangelogProducer.NONE, + null, null); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index 534ce439e414..7330e63c5afa 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -142,7 +142,8 @@ public void processElement(StreamRecord element) throws Exce commitUser, state, getContainingTask().getEnvironment().getIOManager(), - memoryPoolFactory)); + memoryPoolFactory, + getMetricGroup())); ((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 5eab1d1c4fc7..b3a2ad28705d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -78,7 +78,7 @@ public FlinkCdcMultiTableSink( private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() { // for now, no compaction for multiplexed sink - return (table, commitUser, state, ioManager, memoryPoolFactory) -> + return (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl( table, commitUser, @@ -87,7 +87,8 @@ private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() { isOverwrite, false, true, - memoryPoolFactory); + memoryPoolFactory, + metricGroup); } public DataStreamSink sinkFrom( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java index 59e61080bfa3..2036d351c597 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java @@ -707,7 +707,7 @@ public void testUsingTheSameCompactExecutor() throws Exception { CdcRecordStoreMultiWriteOperator operator = new CdcRecordStoreMultiWriteOperator( catalogLoader, - (t, commitUser, state, ioManager, memoryPoolFactory) -> + (t, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl( t, commitUser, @@ -716,7 +716,8 @@ public void testUsingTheSameCompactExecutor() throws Exception { false, false, true, - memoryPoolFactory), + memoryPoolFactory, + metricGroup), commitUser, Options.fromMap(new HashMap<>()), new HashMap<>()); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java index 0f2161f58532..3a30af20493a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java @@ -255,7 +255,7 @@ private OneInputStreamOperatorTestHarness createTestHarn CdcRecordStoreWriteOperator operator = new CdcRecordStoreWriteOperator( table, - (t, commitUser, state, ioManager, memoryPool) -> + (t, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl( t, commitUser, @@ -264,7 +264,8 @@ private OneInputStreamOperatorTestHarness createTestHarn false, false, true, - memoryPool), + memoryPool, + metricGroup), commitUser); TypeSerializer inputSerializer = new JavaSerializer<>(); TypeSerializer outputSerializer = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index d0bae779d585..01d77bb8b754 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -99,7 +99,7 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); - return (table, commitUser, state, ioManager, memoryPool) -> + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new GlobalFullCompactionSinkWrite( table, commitUser, @@ -109,11 +109,12 @@ private StoreSinkWrite.Provider createWriteProvider( waitCompaction, finalDeltaCommits, isStreaming, - memoryPool); + memoryPool, + metricGroup); } } - return (table, commitUser, state, ioManager, memoryPool) -> + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl( table, commitUser, @@ -122,7 +123,8 @@ private StoreSinkWrite.Provider createWriteProvider( ignorePreviousFiles, waitCompaction, isStreaming, - memoryPool); + memoryPool, + metricGroup); } public DataStreamSink sinkFrom(DataStream input) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java index 8e0745afb9b4..317d80a384c5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java @@ -27,6 +27,7 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,8 @@ public GlobalFullCompactionSinkWrite( boolean waitCompaction, int deltaCommits, boolean isStreaming, - @Nullable MemorySegmentPool memoryPool) { + @Nullable MemorySegmentPool memoryPool, + MetricGroup metricGroup) { super( table, commitUser, @@ -82,7 +84,8 @@ public GlobalFullCompactionSinkWrite( ignorePreviousFiles, waitCompaction, isStreaming, - memoryPool); + memoryPool, + metricGroup); this.deltaCommits = deltaCommits; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 15948d6d58c0..6adbbd1280f6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -155,7 +155,8 @@ public void processElement(StreamRecord element) throws Exception { commitUser, state, getContainingTask().getEnvironment().getIOManager(), - memoryPool)); + memoryPool, + getMetricGroup())); if (write.streamingMode()) { write.notifyNewFiles(snapshotId, partition, bucket, files); @@ -257,7 +258,7 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); - return (table, commitUser, state, ioManager, memoryPool) -> + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new GlobalFullCompactionSinkWrite( table, commitUser, @@ -267,11 +268,12 @@ private StoreSinkWrite.Provider createWriteProvider( waitCompaction, finalDeltaCommits, isStreaming, - memoryPool); + memoryPool, + metricGroup); } } - return (table, commitUser, state, ioManager, memoryPool) -> + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl( table, commitUser, @@ -280,6 +282,7 @@ private StoreSinkWrite.Provider createWriteProvider( ignorePreviousFiles, waitCompaction, isStreaming, - memoryPool); + memoryPool, + metricGroup); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index a2f398fd3677..2af731eb3a7e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -93,7 +93,8 @@ public void initializeState(StateInitializationContext context) throws Exception commitUser, state, getContainingTask().getEnvironment().getIOManager(), - memoryPool); + memoryPool, + getMetricGroup()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index f7acabb812d9..28a82a6c4dd6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import javax.annotation.Nullable; @@ -75,7 +76,8 @@ StoreSinkWrite provide( String commitUser, StoreSinkWriteState state, IOManager ioManager, - @Nullable MemorySegmentPool memoryPool); + @Nullable MemorySegmentPool memoryPool, + @Nullable MetricGroup metricGroup); } /** Provider of {@link StoreSinkWrite} that uses given write buffer. */ @@ -87,6 +89,7 @@ StoreSinkWrite provide( String commitUser, StoreSinkWriteState state, IOManager ioManager, - @Nullable MemoryPoolFactory memoryPoolFactory); + @Nullable MemoryPoolFactory memoryPoolFactory, + MetricGroup metricGroup); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index 8cf7a64e03f3..c9418e46624f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryPoolFactory; @@ -32,6 +33,7 @@ import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { protected TableWriteImpl write; + private MetricGroup metricGroup; + public StoreSinkWriteImpl( FileStoreTable table, String commitUser, @@ -69,7 +73,8 @@ public StoreSinkWriteImpl( boolean ignorePreviousFiles, boolean waitCompaction, boolean isStreamingMode, - @Nullable MemorySegmentPool memoryPool) { + @Nullable MemorySegmentPool memoryPool, + @Nullable MetricGroup metricGroup) { this( table, commitUser, @@ -79,7 +84,8 @@ public StoreSinkWriteImpl( waitCompaction, isStreamingMode, memoryPool, - null); + null, + metricGroup); } public StoreSinkWriteImpl( @@ -90,7 +96,8 @@ public StoreSinkWriteImpl( boolean ignorePreviousFiles, boolean waitCompaction, boolean isStreamingMode, - MemoryPoolFactory memoryPoolFactory) { + MemoryPoolFactory memoryPoolFactory, + @Nullable MetricGroup metricGroup) { this( table, commitUser, @@ -100,7 +107,8 @@ public StoreSinkWriteImpl( waitCompaction, isStreamingMode, null, - memoryPoolFactory); + memoryPoolFactory, + metricGroup); } private StoreSinkWriteImpl( @@ -112,7 +120,8 @@ private StoreSinkWriteImpl( boolean waitCompaction, boolean isStreamingMode, @Nullable MemorySegmentPool memoryPool, - @Nullable MemoryPoolFactory memoryPoolFactory) { + @Nullable MemoryPoolFactory memoryPoolFactory, + @Nullable MetricGroup metricGroup) { this.commitUser = commitUser; this.state = state; this.paimonIOManager = new IOManagerImpl(ioManager.getSpillingDirectoriesPaths()); @@ -121,6 +130,7 @@ private StoreSinkWriteImpl( this.isStreamingMode = isStreamingMode; this.memoryPool = memoryPool; this.memoryPoolFactory = memoryPoolFactory; + this.metricGroup = metricGroup; this.write = newTableWrite(table); } @@ -128,7 +138,6 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { checkArgument( !(memoryPool != null && memoryPoolFactory != null), "memoryPool and memoryPoolFactory cannot be set at the same time."); - TableWriteImpl tableWrite = table.newWrite( commitUser, @@ -138,6 +147,10 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { .withIgnorePreviousFiles(ignorePreviousFiles) .isStreamingMode(isStreamingMode); + if (metricGroup != null) { + tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup)); + } + if (memoryPoolFactory != null) { return tableWrite.withMemoryPoolFactory(memoryPoolFactory); } else { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index 860116081d98..12e815b30ae4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -23,6 +23,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -91,7 +92,10 @@ void initStateAndWriter( // runtime context, we can test to construct a writer here state = new StoreSinkWriteState(context, stateFilter); - write = storeSinkWriteProvider.provide(table, commitUser, state, ioManager, memoryPool); + OperatorMetricGroup metricGroup = getMetricGroup(); + write = + storeSinkWriteProvider.provide( + table, commitUser, state, ioManager, memoryPool, metricGroup); } protected abstract boolean containLogSystem(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java new file mode 100644 index 000000000000..ecac40a57e24 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java @@ -0,0 +1,14 @@ +package org.apache.paimon.flink.sink; + +import org.apache.paimon.options.Options; + +/** test class for {@link TableWriteOperator} with append only writer. */ +public class AppendOnlyWriterOperatorTest extends WriterOperatorTestBase { + @Override + protected void setTableConfig(Options options) { + options.set("write-buffer-for-append", "true"); + options.set("write-buffer-size", "256 b"); + options.set("page-size", "32 b"); + options.set("write-buffer-spillable", "false"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java new file mode 100644 index 000000000000..4311a544e4e7 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java @@ -0,0 +1,14 @@ +package org.apache.paimon.flink.sink; + +import org.apache.paimon.options.Options; + +/** test class for {@link TableWriteOperator} with primarykey writer. */ +public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase { + @Override + protected void setTableConfig(Options options) { + options.set("primary-key", "a"); + options.set("bucket-key", "a"); + options.set("write-buffer-size", "256 b"); + options.set("page-size", "32 b"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java new file mode 100644 index 000000000000..27a4d726f598 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java @@ -0,0 +1,174 @@ +package org.apache.paimon.flink.sink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.InternalRowTypeSerializer; +import org.apache.paimon.flink.utils.InternalTypeInfo; +import org.apache.paimon.flink.utils.MetricUtils; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** test class for {@link TableWriteOperator}. */ +public abstract class WriterOperatorTestBase { + private static final RowType ROW_TYPE = + RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"a", "b"}); + @TempDir public java.nio.file.Path tempDir; + protected Path tablePath; + + @BeforeEach + public void before() { + tablePath = new Path(tempDir.toString()); + } + + @Test + public void testMetric() throws Exception { + String tableName = tablePath.getName(); + FileStoreTable fileStoreTable = createFileStoreTable(); + RowDataStoreWriteOperator rowDataStoreWriteOperator = + getRowDataStoreWriteOperator(fileStoreTable); + + OneInputStreamOperatorTestHarness harness = + createWriteOperatorHarness(fileStoreTable, rowDataStoreWriteOperator); + + TypeSerializer serializer = + new CommittableTypeInfo().createSerializer(new ExecutionConfig()); + harness.setup(serializer); + harness.open(); + + int size = 10; + for (int i = 0; i < size; i++) { + GenericRow row = GenericRow.of(1, 1); + harness.processElement(row, 1); + } + harness.prepareSnapshotPreBarrier(1); + harness.snapshot(1, 2); + harness.notifyOfCompletedCheckpoint(1); + + MetricGroup metricGroup = + rowDataStoreWriteOperator + .getMetricGroup() + .addGroup("paimon") + .addGroup("table", tableName) + .addGroup("commit_user", "test") + .addGroup("writer"); + + Counter writeRecordCount = MetricUtils.getCounter(metricGroup, "writeRecordCount"); + Assertions.assertThat(writeRecordCount.getCount()).isEqualTo(size); + + // test histogram has sample + Histogram writeCostMS = MetricUtils.getHistogram(metricGroup, "writeCostMS"); + Assertions.assertThat(writeCostMS.getCount()).isEqualTo(size); + + Histogram flushCostMS = MetricUtils.getHistogram(metricGroup, "flushCostMS"); + Assertions.assertThat(flushCostMS.getCount()).isGreaterThan(0); + + Histogram prepareCommitCostMS = + MetricUtils.getHistogram(metricGroup, "prepareCommitCostMS"); + Assertions.assertThat(prepareCommitCostMS.getCount()).isGreaterThan(0); + + Histogram syncLastestCompactionCostMS = + MetricUtils.getHistogram(metricGroup, "syncLastestCompactionCostMS"); + Assertions.assertThat(syncLastestCompactionCostMS.getCount()).isGreaterThan(0); + + Gauge bufferPreemptCount = MetricUtils.getGauge(metricGroup, "bufferPreemptCount"); + Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0); + + Gauge totalWriteBufferSizeByte = + MetricUtils.getGauge(metricGroup, "totalWriteBufferSizeByte"); + Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256); + + GenericRow row = GenericRow.of(1, 1); + harness.processElement(row, 1); + Gauge usedWriteBufferSizeByte = + MetricUtils.getGauge(metricGroup, "usedWriteBufferSizeByte"); + Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0); + } + + @NotNull + private static OneInputStreamOperatorTestHarness + createWriteOperatorHarness( + FileStoreTable fileStoreTable, RowDataStoreWriteOperator operator) + throws Exception { + InternalTypeInfo internalRowInternalTypeInfo = + new InternalTypeInfo<>(new InternalRowTypeSerializer(ROW_TYPE)); + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness<>( + operator, + internalRowInternalTypeInfo.createSerializer(new ExecutionConfig())); + return harness; + } + + @NotNull + private static RowDataStoreWriteOperator getRowDataStoreWriteOperator( + FileStoreTable fileStoreTable) { + StoreSinkWrite.Provider provider = + (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + new StoreSinkWriteImpl( + table, + commitUser, + state, + ioManager, + false, + false, + true, + memoryPool, + metricGroup); + RowDataStoreWriteOperator operator = + new RowDataStoreWriteOperator(fileStoreTable, null, provider, "test"); + return operator; + } + + abstract void setTableConfig(Options options); + + protected FileStoreTable createFileStoreTable() throws Exception { + Options conf = new Options(); + conf.set(CoreOptions.PATH, tablePath.toString()); + setTableConfig(conf); + SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); + + List primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY); + List paritionKeys = setKeys(conf, CoreOptions.PARTITION); + + schemaManager.createTable( + new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys, conf.toMap(), "")); + return FileStoreTableFactory.create(LocalFileIO.create(), conf); + } + + @NotNull + private static List setKeys(Options conf, ConfigOption primaryKey) { + List primaryKeys = + Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY)) + .map(key -> Arrays.asList(key.split(","))) + .orElse(Collections.emptyList()); + conf.remove(primaryKey.key()); + return primaryKeys; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java index 93451a1f7f9c..b37e4db805d3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java @@ -18,7 +18,9 @@ package org.apache.paimon.flink.utils; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; @@ -29,8 +31,16 @@ /** Test utils for Flink's {@link Metric}s. */ public class MetricUtils { - public static Gauge getGauge(MetricGroup group, String metricName) { - return (Gauge) getMetric(group, metricName); + public static Gauge getGauge(MetricGroup group, String metricName) { + return (Gauge) getMetric(group, metricName); + } + + public static Counter getCounter(MetricGroup group, String metricName) { + return (Counter) getMetric(group, metricName); + } + + public static Histogram getHistogram(MetricGroup group, String metricName) { + return (Histogram) getMetric(group, metricName); } @SuppressWarnings("unchecked")