diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index d78013ffd306..0259e7dc47bb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -26,6 +26,7 @@ import org.apache.paimon.manifest.IndexManifestFile; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.operation.FileStoreExpireImpl; import org.apache.paimon.operation.PartitionExpire; @@ -57,6 +58,7 @@ public abstract class AbstractFileStore implements FileStore { protected final long schemaId; protected final CoreOptions options; protected final RowType partitionType; + protected final MetricRepository metricRepository; @Nullable private final SegmentsCache writeManifestCache; @@ -65,7 +67,8 @@ public AbstractFileStore( SchemaManager schemaManager, long schemaId, CoreOptions options, - RowType partitionType) { + RowType partitionType, + MetricRepository metricRepository) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schemaId = schemaId; @@ -76,6 +79,7 @@ public AbstractFileStore( writeManifestCache.getBytes() == 0 ? null : new SegmentsCache<>(options.pageSize(), writeManifestCache); + this.metricRepository = metricRepository; } public FileStorePathFactory pathFactory() { diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index f5fe41617448..559beb55d55b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -22,6 +22,7 @@ import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestCacheFilter; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.operation.AppendOnlyFileStoreRead; import org.apache.paimon.operation.AppendOnlyFileStoreScan; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; @@ -51,8 +52,9 @@ public AppendOnlyFileStore( CoreOptions options, RowType partitionType, RowType bucketKeyType, - RowType rowType) { - super(fileIO, schemaManager, schemaId, options, partitionType); + RowType rowType, + MetricRepository metricRepository) { + super(fileIO, schemaManager, schemaId, options, partitionType, metricRepository); this.bucketKeyType = bucketKeyType; this.rowType = rowType; } @@ -95,7 +97,8 @@ public AppendOnlyFileStoreWrite newWrite( pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), - options); + options, + metricRepository); } private AppendOnlyFileStoreScan newScan(boolean forWrite) { diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index ef122ee78d81..8a208bfb1b6a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -26,6 +26,7 @@ import org.apache.paimon.index.IndexMaintainer; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.operation.KeyValueFileStoreRead; import org.apache.paimon.operation.KeyValueFileStoreScan; import org.apache.paimon.operation.KeyValueFileStoreWrite; @@ -77,8 +78,9 @@ public KeyValueFileStore( RowType keyType, RowType valueType, KeyValueFieldsExtractor keyValueFieldsExtractor, - MergeFunctionFactory mfFactory) { - super(fileIO, schemaManager, schemaId, options, partitionType); + MergeFunctionFactory mfFactory, + MetricRepository metricRepository) { + super(fileIO, schemaManager, schemaId, options, partitionType, metricRepository); this.crossPartitionUpdate = crossPartitionUpdate; this.bucketKeyType = bucketKeyType; this.keyType = keyType; @@ -147,7 +149,8 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma newScan(true).withManifestCacheFilter(manifestFilter), indexFactory, options, - keyValueFieldsExtractor); + keyValueFieldsExtractor, + metricRepository); } private Map format2PathFactory() { diff --git a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java index 8770348e4157..3faeac761390 100644 --- a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java @@ -37,6 +37,8 @@ public class MemoryPoolFactory { private Iterable owners; + private long memoryPreemptCount; + public MemoryPoolFactory(MemorySegmentPool innerPool) { this.innerPool = innerPool; this.totalPages = innerPool.freePages(); @@ -57,6 +59,10 @@ public void notifyNewOwner(MemoryOwner owner) { owner.setMemoryPool(createSubPool(owner)); } + public long getMemoryPreemptCount() { + return memoryPreemptCount; + } + @VisibleForTesting public Iterable memoryOwners() { return owners; @@ -81,6 +87,7 @@ private void preemptMemory(MemoryOwner owner) { if (max != null) { try { max.flushMemory(); + memoryPreemptCount++; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java index 78f293a50e05..b147bc2c088f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java @@ -52,7 +52,6 @@ public abstract class AbstractMetricGroup implements MetricGroup { public AbstractMetricGroup(Map tags) { this.tags = tags; - Metrics.getInstance().addGroup(this); } @Override @@ -183,7 +182,6 @@ public void close() { if (!closed) { closed = true; metrics.clear(); - Metrics.getInstance().removeGroup(this); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRepository.java b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRepository.java new file mode 100644 index 000000000000..bfe3efe9e2ec --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRepository.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** A Table level repostory for metricGroup. */ +public class MetricRepository { + private final Queue metricGroups = new ConcurrentLinkedQueue<>(); + private final String tableName; + private final String metricName; + + private final String repositoryName; + + public MetricRepository(String tableName, String metricName) { + this.tableName = tableName; + this.metricName = metricName; + this.repositoryName = String.format("%s-%s", tableName, metricName); + Metrics.getInstance().addMetricRepository(repositoryName, this); + } + + public String getTableName() { + return tableName; + } + + public String getMetricName() { + return metricName; + } + + public void registerMetricGroup(MetricGroup metricGroup) { + metricGroups.offer(metricGroup); + } + + public Queue getMetricGroups() { + return metricGroups; + } + + public void close() { + Metrics.getInstance().removeMetricRepository(repositoryName); + metricGroups.clear(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java index 6ef28749bb61..9464851da81b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java @@ -18,7 +18,8 @@ package org.apache.paimon.metrics; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** Core of Paimon metrics system. */ public class Metrics { @@ -28,7 +29,7 @@ public class Metrics { * The metrics groups. All the commit & compaction & scan metric groups are collected in this * group container, there is no need to distinguish the groups by group name for reporters. */ - private final ConcurrentLinkedQueue metricGroups = new ConcurrentLinkedQueue<>(); + private final Map metricRepositorys = new ConcurrentHashMap<>(); private Metrics() {} @@ -40,17 +41,18 @@ public static Metrics getInstance() { * Add a metric group. Which is called by metrics instances, like commit / compaction metrics * instances. */ - public void addGroup(AbstractMetricGroup group) { - metricGroups.add(group); + public void addMetricRepository( + String metricRepositoryName, MetricRepository metricRepository) { + metricRepositorys.put(metricRepositoryName, metricRepository); } /** Remove a metric group. Called when closing the corresponding instances, like committer. */ - public void removeGroup(AbstractMetricGroup group) { - metricGroups.remove(group); + public void removeMetricRepository(String metricRepositoryName) { + metricRepositorys.remove(metricRepositoryName); } /** Get metric groups. */ - public ConcurrentLinkedQueue getMetricGroups() { - return metricGroups; + public Map getMetricRepositorys() { + return metricRepositorys; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/WriterMetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/WriterMetricGroup.java new file mode 100644 index 000000000000..63f6eee058e3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/WriterMetricGroup.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics.groups; + +import org.apache.paimon.metrics.AbstractMetricGroup; + +import java.util.HashMap; +import java.util.Map; + +/** A Writer MetricGroup {@link org.apache.paimon.metrics.MetricGroup}. */ +public class WriterMetricGroup extends AbstractMetricGroup { + + private final String groupName; + + WriterMetricGroup(final Map tags, final String groupName) { + super(tags); + this.groupName = groupName; + } + + public static WriterMetricGroup createWriterMetricGroup( + final String table, final String groupName, final Map externTags) { + Map tags = new HashMap<>(externTags); + tags.put("table", table); + return new WriterMetricGroup(tags, groupName); + } + + @Override + public String getGroupName() { + return groupName; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 125756a01105..35ecb7eeffa9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -27,7 +27,9 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.metrics.MetricGroup; +import org.apache.paimon.metrics.MetricRepository; +import org.apache.paimon.metrics.groups.WriterMetricGroup; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.CommitIncrement; @@ -36,6 +38,8 @@ import org.apache.paimon.utils.Restorable; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,17 +78,32 @@ public abstract class AbstractFileStoreWrite private boolean ignorePreviousFiles = false; protected boolean isStreamingMode = false; + protected MetricRepository metricRepository; + protected MetricGroup writeMetricGroup; + protected AbstractFileStoreWrite( String commitUser, SnapshotManager snapshotManager, FileStoreScan scan, - @Nullable IndexMaintainer.Factory indexFactory) { + @Nullable IndexMaintainer.Factory indexFactory, + @Nullable MetricRepository metricRepository) { this.commitUser = commitUser; this.snapshotManager = snapshotManager; this.scan = scan; this.indexFactory = indexFactory; this.writers = new HashMap<>(); + this.metricRepository = metricRepository; + if (metricRepository != null) { + Map externTags = Maps.newHashMap(); + externTags.put("commitUser", commitUser); + writeMetricGroup = + WriterMetricGroup.createWriterMetricGroup( + metricRepository.getTableName(), + metricRepository.getMetricName(), + externTags); + metricRepository.registerMetricGroup(writeMetricGroup); + } } @Override @@ -93,11 +112,6 @@ public FileStoreWrite withIOManager(IOManager ioManager) { return this; } - @Override - public FileStoreWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) { - return this; - } - @Override public void withIgnorePreviousFiles(boolean ignorePreviousFiles) { this.ignorePreviousFiles = ignorePreviousFiles; @@ -368,6 +382,10 @@ public ExecutorService getCompactExecutor() { return lazyCompactExecutor; } + protected boolean needCollectMetric() { + return metricRepository != null; + } + protected void notifyNewWriter(RecordWriter writer) {} protected abstract RecordWriter createWriter( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index dc8b74433de3..8bf6a37a56d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -31,6 +31,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.RowDataRollingFileWriter; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.table.BucketMode; @@ -81,8 +82,9 @@ public AppendOnlyFileStoreWrite( FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, - CoreOptions options) { - super(commitUser, snapshotManager, scan, options, null); + CoreOptions options, + MetricRepository metricRepository) { + super(commitUser, snapshotManager, scan, options, null, metricRepository); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 6f4735be3d68..8b7327fc664e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -50,6 +50,7 @@ import org.apache.paimon.mergetree.compact.MergeTreeCompactManager; import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter; import org.apache.paimon.mergetree.compact.UniversalCompaction; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.types.RowType; @@ -101,8 +102,9 @@ public KeyValueFileStoreWrite( FileStoreScan scan, @Nullable IndexMaintainer.Factory indexFactory, CoreOptions options, - KeyValueFieldsExtractor extractor) { - super(commitUser, snapshotManager, scan, options, indexFactory); + KeyValueFieldsExtractor extractor, + MetricRepository metricRepository) { + super(commitUser, snapshotManager, scan, options, indexFactory, metricRepository); this.fileIO = fileIO; this.keyType = keyType; this.valueType = valueType; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 84c00756c265..c22739316202 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -24,6 +24,7 @@ import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -57,8 +58,9 @@ public MemoryFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, - @Nullable IndexMaintainer.Factory indexFactory) { - super(commitUser, snapshotManager, scan, indexFactory); + @Nullable IndexMaintainer.Factory indexFactory, + MetricRepository metricRepository) { + super(commitUser, snapshotManager, scan, indexFactory, metricRepository); this.options = options; this.cacheManager = new CacheManager( @@ -69,6 +71,9 @@ public MemoryFileStoreWrite( @Override public FileStoreWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) { this.writeBufferPool = memoryPoolFactory.addOwners(this::memoryOwners); + if (needCollectMetric()) { + writeMetricGroup.gauge("MemoryPreemptCount", memoryPoolFactory::getMemoryPreemptCount); + } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 916fe17f455c..725aab413bb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.metastore.AddPartitionCommitCallback; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.Options; @@ -77,6 +78,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable { protected final Path path; protected final TableSchema tableSchema; protected final CatalogEnvironment catalogEnvironment; + protected transient MetricRepository metricRepository; public AbstractFileStoreTable( FileIO fileIO, @@ -84,7 +86,7 @@ public AbstractFileStoreTable( TableSchema tableSchema, CatalogEnvironment catalogEnvironment) { this.fileIO = fileIO; - this.path = path; + this.path = Preconditions.checkNotNull(path); if (!tableSchema.options().containsKey(PATH.key())) { // make sure table is always available Map newOptions = new HashMap<>(tableSchema.options()); @@ -458,4 +460,10 @@ private RollbackHelper rollbackHelper() { store().newSnapshotDeletion(), store().newTagDeletion()); } + + @Override + public MetricRepository metricRepository(String tableName, String metricName) { + this.metricRepository = new MetricRepository(tableName, metricName); + return this.metricRepository; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 630a100e2dbf..a1acc8843eac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -82,7 +82,8 @@ public AppendOnlyFileStore store() { new CoreOptions(tableSchema.options()), tableSchema.logicalPartitionType(), tableSchema.logicalBucketKeyType(), - tableSchema.logicalRowType()); + tableSchema.logicalRowType(), + metricRepository); } return lazyStore; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java index f7a457f1e8e8..84071d977592 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java @@ -95,7 +95,8 @@ public KeyValueFileStore store() { new RowType(extractor.keyFields(tableSchema)), countType, extractor, - ValueCountMergeFunction.factory()); + ValueCountMergeFunction.factory(), + metricRepository); } return lazyStore; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java index 1a645ae6286b..a2806e20798c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java @@ -107,7 +107,8 @@ public KeyValueFileStore store() { new RowType(extractor.keyFields(tableSchema)), rowType, extractor, - mfFactory); + mfFactory, + metricRepository); } return lazyStore; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 860bb91be942..d8afefe92722 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCacheFilter; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.table.sink.TableCommitImpl; @@ -93,6 +94,8 @@ default Optional comment() { @Override TableCommitImpl newCommit(String commitUser); + MetricRepository metricRepository(String tableName, String metricGroupName); + default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) { return dataFileMeta.valueStats(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 6b3b3f8a0439..bf5416503417 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -33,6 +33,7 @@ import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.operation.AbstractFileStoreWrite; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreCommitImpl; @@ -100,7 +101,8 @@ private TestFileStore( RowType keyType, RowType valueType, KeyValueFieldsExtractor keyValueFieldsExtractor, - MergeFunctionFactory mfFactory) { + MergeFunctionFactory mfFactory, + MetricRepository metricRepository) { super( FileIOFinder.find(new Path(root)), new SchemaManager(FileIOFinder.find(new Path(root)), options.path()), @@ -112,7 +114,8 @@ private TestFileStore( keyType, valueType, keyValueFieldsExtractor, - mfFactory); + mfFactory, + metricRepository); this.root = root; this.fileIO = FileIOFinder.find(new Path(root)); this.keySerializer = new InternalRowSerializer(keyType); @@ -600,7 +603,7 @@ public TestFileStore build() { // disable dynamic-partition-overwrite in FileStoreCommit layer test conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false); - + String tableName = new Path(root).getName(); return new TestFileStore( root, new CoreOptions(conf), @@ -608,7 +611,8 @@ public TestFileStore build() { keyType, valueType, keyValueFieldsExtractor, - mfFactory); + mfFactory, + new MetricRepository(tableName, "testMetricGroup")); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java index 24f903de9b1a..929a23d6aa59 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java @@ -60,13 +60,4 @@ public void testTolerateMetricNameCollisions() { assertThat(group.counter(name)).isSameAs(counter1); group.close(); } - - @Test - public void testAddAndRemoveMetricGroups() { - AbstractMetricGroup metricGroup = - GenericMetricGroup.createGenericMetricGroup("myTable", "commit"); - assertThat(Metrics.getInstance().getMetricGroups()).containsExactly(metricGroup); - metricGroup.close(); - assertThat(Metrics.getInstance().getMetricGroups()).isEmpty(); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricRepositoryTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricRepositoryTest.java new file mode 100644 index 000000000000..8b7d47705416 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricRepositoryTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link MetricRepository}. */ +public class MetricRepositoryTest { + @Test + public void testAddAndRemoveMetricGroups() { + MetricRepository metricRepository = new MetricRepository("test", "test"); + assertThat(Metrics.getInstance().getMetricRepositorys()) + .containsOnlyKeys(String.format("%s-%s", "test", "test")); + metricRepository.close(); + assertThat(Metrics.getInstance().getMetricRepositorys()).isEmpty(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index 860116081d98..8ea89c21c029 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -20,15 +20,22 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.metrics.MetricGroup; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Queue; /** An abstract class for table write operator. */ public abstract class TableWriteOperator extends PrepareCommitOperator { @@ -41,6 +48,8 @@ public abstract class TableWriteOperator extends PrepareCommitOperator { int task = containLogSystem ? ChannelComputer.select(bucket, numTasks) : ChannelComputer.select(partition, bucket, numTasks); - return task == getRuntimeContext().getIndexOfThisSubtask(); + return task == subtaskIndex; }; + metricRepository = + this.table.metricRepository( + table.name(), + String.format( + "%s-%d-%d", + "WriterMetric", + getRuntimeContext().getAttemptNumber(), + subtaskIndex)); initStateAndWriter( context, stateFilter, getContainingTask().getEnvironment().getIOManager(), commitUser); + registerMetrics(metricRepository); + } + + private void registerMetrics(MetricRepository metricRepository) { + OperatorMetricGroup flinkMetricGroup = getMetricGroup(); + Queue metricGroups = metricRepository.getMetricGroups(); + while (!metricGroups.isEmpty()) { + Map metrics = metricGroups.poll().getMetrics(); + for (Map.Entry metricWithName : metrics.entrySet()) { + String name = metricWithName.getKey(); + Metric metric = metricWithName.getValue(); + switch (metric.getMetricType()) { + case GAUGE: + Gauge gauge = (Gauge) metric; + flinkMetricGroup.gauge(name, gauge::getValue); + break; + default: + // waiting for completing. + } + } + } } @VisibleForTesting @@ -106,6 +145,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void close() throws Exception { + metricRepository.close(); super.close(); if (write != null) { write.close(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 16e1942cd5c9..0b3e4fe3ef8d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -31,6 +31,7 @@ import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.metrics.MetricRepository; import org.apache.paimon.operation.KeyValueFileStoreRead; import org.apache.paimon.operation.KeyValueFileStoreWrite; import org.apache.paimon.options.Options; @@ -194,7 +195,8 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc null, // not used, we only create an empty writer null, options, - EXTRACTOR) + EXTRACTOR, + new MetricRepository(tablePath.getName(), "default")) .createWriterContainer(partition, bucket, true) .writer; ((MemoryOwner) writer)