Skip to content

Commit

Permalink
writer metric
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Oct 18, 2023
1 parent d2ddaa9 commit 4bb3013
Show file tree
Hide file tree
Showing 22 changed files with 285 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metrics.MetricRepository;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
Expand Down Expand Up @@ -57,6 +58,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
protected final long schemaId;
protected final CoreOptions options;
protected final RowType partitionType;
protected final MetricRepository metricRepository;

@Nullable private final SegmentsCache<String> writeManifestCache;

Expand All @@ -65,7 +67,8 @@ public AbstractFileStore(
SchemaManager schemaManager,
long schemaId,
CoreOptions options,
RowType partitionType) {
RowType partitionType,
MetricRepository metricRepository) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -76,6 +79,7 @@ public AbstractFileStore(
writeManifestCache.getBytes() == 0
? null
: new SegmentsCache<>(options.pageSize(), writeManifestCache);
this.metricRepository = metricRepository;
}

public FileStorePathFactory pathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.metrics.MetricRepository;
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
Expand Down Expand Up @@ -51,8 +52,9 @@ public AppendOnlyFileStore(
CoreOptions options,
RowType partitionType,
RowType bucketKeyType,
RowType rowType) {
super(fileIO, schemaManager, schemaId, options, partitionType);
RowType rowType,
MetricRepository metricRepository) {
super(fileIO, schemaManager, schemaId, options, partitionType, metricRepository);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
}
Expand Down Expand Up @@ -95,7 +97,8 @@ public AppendOnlyFileStoreWrite newWrite(
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
options);
options,
metricRepository);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.metrics.MetricRepository;
import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
Expand Down Expand Up @@ -77,8 +78,9 @@ public KeyValueFileStore(
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory) {
super(fileIO, schemaManager, schemaId, options, partitionType);
MergeFunctionFactory<KeyValue> mfFactory,
MetricRepository metricRepository) {
super(fileIO, schemaManager, schemaId, options, partitionType, metricRepository);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
Expand Down Expand Up @@ -147,7 +149,8 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor);
keyValueFieldsExtractor,
metricRepository);
}

private Map<String, FileStorePathFactory> format2PathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class MemoryPoolFactory {

private Iterable<MemoryOwner> owners;

private long memoryPreemptCount;

public MemoryPoolFactory(MemorySegmentPool innerPool) {
this.innerPool = innerPool;
this.totalPages = innerPool.freePages();
Expand All @@ -57,6 +59,10 @@ public void notifyNewOwner(MemoryOwner owner) {
owner.setMemoryPool(createSubPool(owner));
}

public long getMemoryPreemptCount() {
return memoryPreemptCount;
}

@VisibleForTesting
public Iterable<MemoryOwner> memoryOwners() {
return owners;
Expand All @@ -81,6 +87,7 @@ private void preemptMemory(MemoryOwner owner) {
if (max != null) {
try {
max.flushMemory();
memoryPreemptCount++;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public abstract class AbstractMetricGroup implements MetricGroup {

public AbstractMetricGroup(Map<String, String> tags) {
this.tags = tags;
Metrics.getInstance().addGroup(this);
}

@Override
Expand Down Expand Up @@ -183,7 +182,6 @@ public void close() {
if (!closed) {
closed = true;
metrics.clear();
Metrics.getInstance().removeGroup(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/** A Table level repostory for metricGroup. */
public class MetricRepository {
private final Queue<MetricGroup> metricGroups = new ConcurrentLinkedQueue<>();
private final String tableName;
private final String metricName;

private final String repositoryName;

public MetricRepository(String tableName, String metricName) {
this.tableName = tableName;
this.metricName = metricName;
this.repositoryName = String.format("%s-%s", tableName, metricName);
Metrics.getInstance().addMetricRepository(repositoryName, this);
}

public String getTableName() {
return tableName;
}

public String getMetricName() {
return metricName;
}

public void registerMetricGroup(MetricGroup metricGroup) {
metricGroups.offer(metricGroup);
}

public Queue<MetricGroup> getMetricGroups() {
return metricGroups;
}

public void close() {
Metrics.getInstance().removeMetricRepository(repositoryName);
metricGroups.clear();
}
}
18 changes: 10 additions & 8 deletions paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.paimon.metrics;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** Core of Paimon metrics system. */
public class Metrics {
Expand All @@ -28,7 +29,7 @@ public class Metrics {
* The metrics groups. All the commit & compaction & scan metric groups are collected in this
* group container, there is no need to distinguish the groups by group name for reporters.
*/
private final ConcurrentLinkedQueue<MetricGroup> metricGroups = new ConcurrentLinkedQueue<>();
private final Map<String, MetricRepository> metricRepositorys = new ConcurrentHashMap<>();

private Metrics() {}

Expand All @@ -40,17 +41,18 @@ public static Metrics getInstance() {
* Add a metric group. Which is called by metrics instances, like commit / compaction metrics
* instances.
*/
public void addGroup(AbstractMetricGroup group) {
metricGroups.add(group);
public void addMetricRepository(
String metricRepositoryName, MetricRepository metricRepository) {
metricRepositorys.put(metricRepositoryName, metricRepository);
}

/** Remove a metric group. Called when closing the corresponding instances, like committer. */
public void removeGroup(AbstractMetricGroup group) {
metricGroups.remove(group);
public void removeMetricRepository(String metricRepositoryName) {
metricRepositorys.remove(metricRepositoryName);
}

/** Get metric groups. */
public ConcurrentLinkedQueue<MetricGroup> getMetricGroups() {
return metricGroups;
public Map<String, MetricRepository> getMetricRepositorys() {
return metricRepositorys;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics.groups;

import org.apache.paimon.metrics.AbstractMetricGroup;

import java.util.HashMap;
import java.util.Map;

/** A Writer MetricGroup {@link org.apache.paimon.metrics.MetricGroup}. */
public class WriterMetricGroup extends AbstractMetricGroup {

private final String groupName;

WriterMetricGroup(final Map<String, String> tags, final String groupName) {
super(tags);
this.groupName = groupName;
}

public static WriterMetricGroup createWriterMetricGroup(
final String table, final String groupName, final Map<String, String> externTags) {
Map<String, String> tags = new HashMap<>(externTags);
tags.put("table", table);
return new WriterMetricGroup(tags, groupName);
}

@Override
public String getGroupName() {
return groupName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRepository;
import org.apache.paimon.metrics.groups.WriterMetricGroup;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
Expand All @@ -36,6 +38,8 @@
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,17 +78,32 @@ public abstract class AbstractFileStoreWrite<T>
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;

protected MetricRepository metricRepository;
protected MetricGroup writeMetricGroup;

protected AbstractFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<T> indexFactory) {
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable MetricRepository metricRepository) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;

this.writers = new HashMap<>();
this.metricRepository = metricRepository;
if (metricRepository != null) {
Map<String, String> externTags = Maps.newHashMap();
externTags.put("commitUser", commitUser);
writeMetricGroup =
WriterMetricGroup.createWriterMetricGroup(
metricRepository.getTableName(),
metricRepository.getMetricName(),
externTags);
metricRepository.registerMetricGroup(writeMetricGroup);
}
}

@Override
Expand All @@ -93,11 +112,6 @@ public FileStoreWrite<T> withIOManager(IOManager ioManager) {
return this;
}

@Override
public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
return this;
}

@Override
public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
this.ignorePreviousFiles = ignorePreviousFiles;
Expand Down Expand Up @@ -368,6 +382,10 @@ public ExecutorService getCompactExecutor() {
return lazyCompactExecutor;
}

protected boolean needCollectMetric() {
return metricRepository != null;
}

protected void notifyNewWriter(RecordWriter<T> writer) {}

protected abstract RecordWriter<T> createWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.metrics.MetricRepository;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -81,8 +82,9 @@ public AppendOnlyFileStoreWrite(
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options) {
super(commitUser, snapshotManager, scan, options, null);
CoreOptions options,
MetricRepository metricRepository) {
super(commitUser, snapshotManager, scan, options, null, metricRepository);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
Expand Down
Loading

0 comments on commit 4bb3013

Please sign in to comment.