Skip to content

Commit

Permalink
writer metric
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Oct 16, 2023
1 parent d2ddaa9 commit fd121e3
Show file tree
Hide file tree
Showing 19 changed files with 210 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
Expand Down Expand Up @@ -57,6 +58,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
protected final long schemaId;
protected final CoreOptions options;
protected final RowType partitionType;
protected final MetricRegister metricRegister;

@Nullable private final SegmentsCache<String> writeManifestCache;

Expand All @@ -65,7 +67,8 @@ public AbstractFileStore(
SchemaManager schemaManager,
long schemaId,
CoreOptions options,
RowType partitionType) {
RowType partitionType,
MetricRegister metricRegister) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -76,6 +79,7 @@ public AbstractFileStore(
writeManifestCache.getBytes() == 0
? null
: new SegmentsCache<>(options.pageSize(), writeManifestCache);
this.metricRegister = metricRegister;
}

public FileStorePathFactory pathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
Expand Down Expand Up @@ -51,8 +52,9 @@ public AppendOnlyFileStore(
CoreOptions options,
RowType partitionType,
RowType bucketKeyType,
RowType rowType) {
super(fileIO, schemaManager, schemaId, options, partitionType);
RowType rowType,
MetricRegister metricRegister) {
super(fileIO, schemaManager, schemaId, options, partitionType, metricRegister);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
}
Expand Down Expand Up @@ -95,7 +97,8 @@ public AppendOnlyFileStoreWrite newWrite(
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
options);
options,
metricRegister);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
Expand Down Expand Up @@ -77,8 +78,9 @@ public KeyValueFileStore(
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory) {
super(fileIO, schemaManager, schemaId, options, partitionType);
MergeFunctionFactory<KeyValue> mfFactory,
MetricRegister metricRegister) {
super(fileIO, schemaManager, schemaId, options, partitionType, metricRegister);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
Expand Down Expand Up @@ -147,7 +149,8 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor);
keyValueFieldsExtractor,
metricRegister);
}

private Map<String, FileStorePathFactory> format2PathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class MemoryPoolFactory {

private Iterable<MemoryOwner> owners;

private long memoryPreemptCount;

public MemoryPoolFactory(MemorySegmentPool innerPool) {
this.innerPool = innerPool;
this.totalPages = innerPool.freePages();
Expand All @@ -57,6 +59,10 @@ public void notifyNewOwner(MemoryOwner owner) {
owner.setMemoryPool(createSubPool(owner));
}

public long getMemoryPreemptCount() {
return memoryPreemptCount;
}

@VisibleForTesting
public Iterable<MemoryOwner> memoryOwners() {
return owners;
Expand All @@ -81,6 +87,7 @@ private void preemptMemory(MemoryOwner owner) {
if (max != null) {
try {
max.flushMemory();
memoryPreemptCount++;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public abstract class AbstractMetricGroup implements MetricGroup {

public AbstractMetricGroup(Map<String, String> tags) {
this.tags = tags;
Metrics.getInstance().addGroup(this);
// Metrics.getInstance().addGroup(this); we add table level repository instance.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.paimon.metrics;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class MetricRegister {
private final Queue<MetricGroup> metricGroups = new ConcurrentLinkedQueue<>();
private final String tableName;
private final String metricGroupName;

public MetricRegister(String tableName, String metricGroupName) {
this.tableName = tableName;
this.metricGroupName = metricGroupName;
}

public String getTableName() {
return tableName;
}

public String getMetricGroupName() {
return metricGroupName;
}

public void registerMetricGroup(MetricGroup metricGroup) {
metricGroups.offer(metricGroup);
}

public Queue<MetricGroup> getMetricGroups() {
return metricGroups;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics.groups;

import org.apache.paimon.metrics.AbstractMetricGroup;

import java.util.HashMap;
import java.util.Map;

/** A Writer MetricGroup {@link org.apache.paimon.metrics.MetricGroup} */
public class WriterMetricGroup extends AbstractMetricGroup {

private final String groupName;

WriterMetricGroup(final Map<String, String> tags, final String groupName) {
super(tags);
this.groupName = groupName;
}

public static WriterMetricGroup createWriterMetricGroup(
final String table, final String groupName, final Map<String, String> externTags) {
Map<String, String> tags = new HashMap<>(externTags);
tags.put("table", table);
return new WriterMetricGroup(tags, groupName);
}

@Override
public String getGroupName() {
return groupName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.metrics.groups.WriterMetricGroup;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
Expand All @@ -36,6 +38,8 @@
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,17 +78,32 @@ public abstract class AbstractFileStoreWrite<T>
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;

protected MetricRegister metricRegister;
protected MetricGroup writeMetricGroup;

protected AbstractFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<T> indexFactory) {
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable MetricRegister metricRegister) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;

this.writers = new HashMap<>();
this.metricRegister = metricRegister;
if (metricRegister != null) {
Map<String, String> externTags = Maps.newHashMap();
externTags.put("commitUser", commitUser);
writeMetricGroup =
WriterMetricGroup.createWriterMetricGroup(
metricRegister.getTableName(),
metricRegister.getMetricGroupName(),
externTags);
metricRegister.registerMetricGroup(writeMetricGroup);
}
}

@Override
Expand All @@ -93,11 +112,6 @@ public FileStoreWrite<T> withIOManager(IOManager ioManager) {
return this;
}

@Override
public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
return this;
}

@Override
public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
this.ignorePreviousFiles = ignorePreviousFiles;
Expand Down Expand Up @@ -368,6 +382,10 @@ public ExecutorService getCompactExecutor() {
return lazyCompactExecutor;
}

protected boolean needCollectMetric() {
return metricRegister != null;
}

protected void notifyNewWriter(RecordWriter<T> writer) {}

protected abstract RecordWriter<T> createWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -81,8 +82,9 @@ public AppendOnlyFileStoreWrite(
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options) {
super(commitUser, snapshotManager, scan, options, null);
CoreOptions options,
MetricRegister metricRegister) {
super(commitUser, snapshotManager, scan, options, null, metricRegister);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -101,8 +102,9 @@ public KeyValueFileStoreWrite(
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
CoreOptions options,
KeyValueFieldsExtractor extractor) {
super(commitUser, snapshotManager, scan, options, indexFactory);
KeyValueFieldsExtractor extractor,
MetricRegister metricRegister) {
super(commitUser, snapshotManager, scan, options, indexFactory, metricRegister);
this.fileIO = fileIO;
this.keyType = keyType;
this.valueType = valueType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;

Expand Down Expand Up @@ -57,8 +58,9 @@ public MemoryFileStoreWrite(
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
@Nullable IndexMaintainer.Factory<T> indexFactory) {
super(commitUser, snapshotManager, scan, indexFactory);
@Nullable IndexMaintainer.Factory<T> indexFactory,
MetricRegister metricRegister) {
super(commitUser, snapshotManager, scan, indexFactory, metricRegister);
this.options = options;
this.cacheManager =
new CacheManager(
Expand All @@ -69,6 +71,9 @@ public MemoryFileStoreWrite(
@Override
public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
this.writeBufferPool = memoryPoolFactory.addOwners(this::memoryOwners);
if (needCollectMetric()) {
writeMetricGroup.gauge("MemoryPreemptCount", memoryPoolFactory::getMemoryPreemptCount);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
import org.apache.paimon.metrics.MetricRegister;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -77,14 +78,15 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
protected final Path path;
protected final TableSchema tableSchema;
protected final CatalogEnvironment catalogEnvironment;
protected transient MetricRegister metricRegister;

public AbstractFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.path = path;
this.path = Preconditions.checkNotNull(path);
if (!tableSchema.options().containsKey(PATH.key())) {
// make sure table is always available
Map<String, String> newOptions = new HashMap<>(tableSchema.options());
Expand Down Expand Up @@ -458,4 +460,15 @@ private RollbackHelper rollbackHelper() {
store().newSnapshotDeletion(),
store().newTagDeletion());
}

@Override
public MetricRegister metricRegister(String tableName, String metricGroupName) {
this.metricRegister = new MetricRegister(tableName, metricGroupName);
// todo shall we add a register logic for process level metric repository
// like Metrics.instance.register(table+metricGroupName,metricRegister)
return this.metricRegister;
}

// and then deregister from process level metric repository

}
Loading

0 comments on commit fd121e3

Please sign in to comment.