Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce CacheStats and expose ScanStats #4678

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Controls whether the catalog will cache databases, tables and manifests.</td>
<td>Controls whether the catalog will cache databases, tables, manifests and partitions.</td>
</tr>
<tr>
<td><h5>cache.expiration-interval</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class CatalogOptions {
.booleanType()
.defaultValue(true)
.withDescription(
"Controls whether the catalog will cache databases, tables and manifests.");
"Controls whether the catalog will cache databases, tables, manifests and partitions.");

public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
key("cache.expiration-interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,30 @@ public void invalidateTable(Identifier identifier) {
}
}

public CacheSizes estimatedCacheSizes() {
long databaseCacheSize = databaseCache.estimatedSize();
long tableCacheSize = tableCache.estimatedSize();
long manifestCacheSize = 0L;
long manifestCacheBytes = 0L;
if (manifestCache != null) {
manifestCacheSize = manifestCache.getSegmentCacheSize();
manifestCacheBytes = manifestCache.getSegmentCacheBytes();
}
long partitionCacheSize = 0L;
if (partitionCache != null) {
for (Map.Entry<Identifier, List<PartitionEntry>> entry :
partitionCache.asMap().entrySet()) {
partitionCacheSize += entry.getValue().size();
}
}
return new CacheSizes(
databaseCacheSize,
tableCacheSize,
manifestCacheSize,
manifestCacheBytes,
partitionCacheSize);
}

// ================================== refresh ================================================
// following caches will affect the latency of table, so refresh method is provided for engine

Expand All @@ -314,4 +338,46 @@ public void refreshPartitions(Identifier identifier) throws TableNotExistExcepti
partitionCache.put(identifier, result);
}
}

/** Cache sizes of a caching catalog. */
public static class CacheSizes {
private final long databaseCacheSize;
private final long tableCacheSize;
private final long manifestCacheSize;
private final long manifestCacheBytes;
private final long partitionCacheSize;

public CacheSizes(
long databaseCacheSize,
long tableCacheSize,
long manifestCacheSize,
long manifestCacheBytes,
long partitionCacheSize) {
this.databaseCacheSize = databaseCacheSize;
this.tableCacheSize = tableCacheSize;
this.manifestCacheSize = manifestCacheSize;
this.manifestCacheBytes = manifestCacheBytes;
this.partitionCacheSize = partitionCacheSize;
}

public long databaseCacheSize() {
return databaseCacheSize;
}

public long tableCacheSize() {
return tableCacheSize;
}

public long manifestCacheSize() {
return manifestCacheSize;
}

public long manifestCacheBytes() {
return manifestCacheBytes;
}

public long partitionCacheSize() {
return partitionCacheSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
Expand Down Expand Up @@ -425,9 +426,11 @@ private List<ManifestEntry> readManifest(
ManifestFileMeta manifest,
@Nullable Filter<InternalRow> additionalFilter,
@Nullable Filter<ManifestEntry> additionalTFilter) {
CacheMetrics cacheMetrics = scanMetrics != null ? new CacheMetrics() : null;
List<ManifestEntry> entries =
manifestFileFactory
.create()
.withCacheMetrics(cacheMetrics)
.read(
manifest.fileName(),
manifest.fileSize(),
Expand All @@ -445,6 +448,9 @@ private List<ManifestEntry> readManifest(
}
entries = copied;
}
if (scanMetrics != null) {
scanMetrics.reportCache(cacheMetrics);
}
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.metrics;

import java.util.concurrent.atomic.AtomicLong;

/** Metrics for manifest file cache of a caching catalog. */
public class CacheMetrics {

private final AtomicLong hitObject;
private final AtomicLong missedObject;

public CacheMetrics() {
this.hitObject = new AtomicLong(0);
this.missedObject = new AtomicLong(0);
}

public AtomicLong getHitObject() {
return hitObject;
}

public void increaseHitObject() {
this.hitObject.incrementAndGet();
}

public AtomicLong getMissedObject() {
return missedObject;
}

public void increaseMissedObject() {
this.missedObject.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ public class ScanMetrics {
public static final String GROUP_NAME = "scan";

private final MetricGroup metricGroup;
private Histogram durationHistogram;

private ScanStats latestScan;
private CacheMetrics cacheMetrics;

public ScanMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
this.cacheMetrics = new CacheMetrics();
registerGenericScanMetrics();
}

Expand All @@ -41,17 +46,13 @@ public MetricGroup getMetricGroup() {
return metricGroup;
}

private Histogram durationHistogram;

private ScanStats latestScan;

public static final String LAST_SCAN_DURATION = "lastScanDuration";
public static final String SCAN_DURATION = "scanDuration";
public static final String LAST_SCANNED_MANIFESTS = "lastScannedManifests";

public static final String LAST_SCAN_SKIPPED_TABLE_FILES = "lastScanSkippedTableFiles";

public static final String LAST_SCAN_RESULTED_TABLE_FILES = "lastScanResultedTableFiles";
public static final String MANIFEST_HIT_CACHE = "manifestHitCache";
public static final String MANIFEST_MISSED_CACHE = "manifestMissedCache";

private void registerGenericScanMetrics() {
metricGroup.gauge(
Expand All @@ -66,10 +67,25 @@ private void registerGenericScanMetrics() {
metricGroup.gauge(
LAST_SCAN_RESULTED_TABLE_FILES,
() -> latestScan == null ? 0L : latestScan.getResultedTableFiles());
metricGroup.gauge(
MANIFEST_HIT_CACHE,
() -> cacheMetrics == null ? 0L : cacheMetrics.getHitObject().get());
metricGroup.gauge(
MANIFEST_MISSED_CACHE,
() -> cacheMetrics == null ? 0L : cacheMetrics.getMissedObject().get());
}

public void reportScan(ScanStats scanStats) {
latestScan = scanStats;
durationHistogram.update(scanStats.getDuration());
}

public void reportCache(CacheMetrics cacheMetrics) {
this.cacheMetrics = cacheMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set cacheMetrics again? You should use this cacheMetrics...

Copy link
Contributor Author

@mxdzs0612 mxdzs0612 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to make the code unified....

}

@VisibleForTesting
public CacheMetrics getCacheMetrics() {
return cacheMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentSource;
import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;
Expand All @@ -47,6 +48,7 @@ public class ObjectsCache<K, V> {
private final ThreadLocal<InternalRowSerializer> formatSerializer;
private final FunctionWithIOException<K, Long> fileSizeFunction;
private final BiFunctionWithIOE<K, Long, CloseableIterator<InternalRow>> reader;
private CacheMetrics cacheMetrics;

public ObjectsCache(
SegmentsCache<K> cache,
Expand All @@ -71,8 +73,14 @@ public List<V> read(
throws IOException {
Segments segments = cache.getIfPresents(key);
if (segments != null) {
if (cacheMetrics != null) {
cacheMetrics.increaseHitObject();
}
return readFromSegments(segments, readFilter, readVFilter);
} else {
if (cacheMetrics != null) {
cacheMetrics.increaseMissedObject();
}
if (fileSize == null) {
fileSize = fileSizeFunction.apply(key);
}
Expand Down Expand Up @@ -130,4 +138,8 @@ private Segments readSegments(K key, @Nullable Long fileSize, Filter<InternalRow
throw new RuntimeException(e);
}
}

public void withCacheMetrics(CacheMetrics cacheMetrics) {
this.cacheMetrics = cacheMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -207,4 +208,11 @@ public static <V> List<V> readFromIterator(
throw new RuntimeException(e);
}
}

public ObjectsFile<T> withCacheMetrics(CacheMetrics cacheMetrics) {
if (cache != null) {
cache.withCacheMetrics(cacheMetrics);
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@ public static <T> SegmentsCache<T> create(

return new SegmentsCache<>(pageSize, maxMemorySize, maxElementSize);
}

public long getSegmentCacheSize() {
return cache.estimatedSize();
}

public long getSegmentCacheBytes() {
return cache.asMap().entrySet().stream()
.mapToLong(entry -> weigh(entry.getKey(), entry.getValue()))
.sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the {@link MetricRegistryImpl}. */
public class MetricRegistryImplTest {
/** Tests for the {@link MetricGroup}. */
public class MetricGroupTest {

@Test
public void testGroupRegisterMetrics() {
MetricRegistryImpl registry = new MetricRegistryImpl();
TestMetricRegistry registry = new TestMetricRegistry();
MetricGroup group = registry.tableMetricGroup("commit", "myTable");

// these will fail is the registration is propagated
Expand All @@ -50,7 +50,7 @@ public void testGroupRegisterMetrics() {
@Test
public void testTolerateMetricNameCollisions() {
final String name = "abctestname";
MetricRegistryImpl registry = new MetricRegistryImpl();
TestMetricRegistry registry = new TestMetricRegistry();
MetricGroup group = registry.tableMetricGroup("commit", "myTable");

Counter counter = group.counter(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.util.Map;

/** Default implementation of {@link MetricRegistry}. */
public class MetricRegistryImpl extends MetricRegistry {
/** Implementation of {@link MetricRegistry} for tests. */
public class TestMetricRegistry extends MetricRegistry {

@Override
protected MetricGroup createMetricGroup(String groupName, Map<String, String> variables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricRegistryImpl;
import org.apache.paimon.metrics.TestMetricRegistry;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -234,6 +234,6 @@ private void reportAgain(CommitMetrics commitMetrics) {
}

private CommitMetrics getCommitMetrics() {
return new CommitMetrics(new MetricRegistryImpl(), TABLE_NAME);
return new CommitMetrics(new TestMetricRegistry(), TABLE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.paimon.metrics.Counter;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricRegistryImpl;
import org.apache.paimon.metrics.TestMetricRegistry;

import org.junit.jupiter.api.Test;

Expand All @@ -33,7 +33,7 @@ public class CompactionMetricsTest {

@Test
public void testReportMetrics() {
CompactionMetrics metrics = new CompactionMetrics(new MetricRegistryImpl(), "myTable");
CompactionMetrics metrics = new CompactionMetrics(new TestMetricRegistry(), "myTable");
assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(-1L);
assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(-1.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
Expand Down
Loading
Loading