Skip to content

Commit

Permalink
[core] Introduce CacheStats and expose scanStats
Browse files Browse the repository at this point in the history
  • Loading branch information
mxdzs0612 committed Dec 13, 2024
1 parent 00a36e3 commit 1213bad
Show file tree
Hide file tree
Showing 19 changed files with 278 additions and 34 deletions.
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Controls whether the catalog will cache databases, tables and manifests.</td>
<td>Controls whether the catalog will cache databases, tables, manifests and partitions.</td>
</tr>
<tr>
<td><h5>cache.expiration-interval</h5></td>
Expand Down Expand Up @@ -74,6 +74,12 @@
<td>Integer</td>
<td>Controls the max number for snapshots per table in the catalog are cached.</td>
</tr>
<tr>
<td><h5>cache.cache-stats-enabled</h5></td>
<td style="word-wrap: break-word;">20</td>
<td>Integer</td>
<td>Controls whether the catalog cache stats are enabled.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class CatalogOptions {
.booleanType()
.defaultValue(true)
.withDescription(
"Controls whether the catalog will cache databases, tables and manifests.");
"Controls whether the catalog will cache databases, tables, manifests and partitions.");

public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
key("cache.expiration-interval")
Expand Down Expand Up @@ -128,6 +128,12 @@ public class CatalogOptions {
.withDescription(
"Controls the max number for snapshots per table in the catalog are cached.");

public static final ConfigOption<Boolean> CACHE_STATS_ENABLED =
key("cache.cache-stats-enabled")
.booleanType()
.defaultValue(false)
.withDescription("Controls whether the catalog cache stats are enabled.");

public static final ConfigOption<Boolean> ALLOW_UPPER_CASE =
ConfigOptions.key("allow-upper-case")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected AbstractFileStore(
this.catalogEnvironment = catalogEnvironment;
this.writeManifestCache =
SegmentsCache.create(
options.pageSize(), options.writeManifestCache(), Long.MAX_VALUE);
options.pageSize(), options.writeManifestCache(), Long.MAX_VALUE, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.metrics.CatalogCacheStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -48,6 +49,7 @@
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.apache.paimon.options.CatalogOptions.CACHE_STATS_ENABLED;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** A {@link Catalog} to cache databases and tables and manifests. */
Expand All @@ -63,14 +65,17 @@ public class CachingCatalog extends DelegateCatalog {
// partition cache will affect data latency
@Nullable protected final Cache<Identifier, List<PartitionEntry>> partitionCache;

private final CatalogCacheStats catalogCacheStats;

public CachingCatalog(Catalog wrapped) {
this(
wrapped,
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
CACHE_PARTITION_MAX_NUM.defaultValue(),
CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue());
CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue(),
CACHE_STATS_ENABLED.defaultValue());
}

public CachingCatalog(
Expand All @@ -79,14 +84,16 @@ public CachingCatalog(
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
long cachedPartitionMaxNum,
int snapshotMaxNumPerTable) {
int snapshotMaxNumPerTable,
boolean cacheStatsEnabled) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
cachedPartitionMaxNum,
snapshotMaxNumPerTable,
cacheStatsEnabled,
Ticker.systemTicker());
}

Expand All @@ -97,6 +104,7 @@ public CachingCatalog(
long manifestCacheThreshold,
long cachedPartitionMaxNum,
int snapshotMaxNumPerTable,
boolean cacheStatsEnabled,
Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
Expand All @@ -121,7 +129,8 @@ public CachingCatalog(
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
this.manifestCache =
SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold, cacheStatsEnabled);
this.partitionCache =
cachedPartitionMaxNum == 0
? null
Expand All @@ -135,6 +144,7 @@ public CachingCatalog(
.maximumWeight(cachedPartitionMaxNum)
.ticker(ticker)
.build();
this.catalogCacheStats = cacheStatsEnabled ? new CatalogCacheStats() : null;
}

public static Catalog tryToCreate(Catalog catalog, Options options) {
Expand All @@ -156,7 +166,8 @@ public static Catalog tryToCreate(Catalog catalog, Options options) {
manifestMaxMemory,
manifestThreshold,
options.get(CACHE_PARTITION_MAX_NUM),
options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE));
options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE),
options.get(CACHE_STATS_ENABLED));
}

@Override
Expand Down Expand Up @@ -305,6 +316,23 @@ public void invalidateTable(Identifier identifier) {
}
}

public CatalogCacheStats getCacheStats() {
catalogCacheStats.setDatabaseCacheSize(databaseCache.estimatedSize());
catalogCacheStats.setTableCacheSize(tableCache.estimatedSize());
if (manifestCache != null) {
catalogCacheStats.setManifestCacheSize(manifestCache.getSegmentCacheSize());
}
if (partitionCache != null) {
int partitionSize = 0;
for (Map.Entry<Identifier, List<PartitionEntry>> entry :
partitionCache.asMap().entrySet()) {
partitionSize += entry.getValue().size();
}
catalogCacheStats.setPartitionCacheSize(partitionSize);
}
return catalogCacheStats;
}

// ================================== refresh ================================================
// following caches will affect the latency of table, so refresh method is provided for engine

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ private ManifestsReader.Result readManifests() {
return manifestsReader.read(specifiedSnapshot, scanMode);
}

public ScanMetrics getScanMetrics() {
return scanMetrics;
}

// ------------------------------------------------------------------------
// Start Thread Safe Methods: The following methods need to be thread safe because they will be
// called by multiple threads
Expand Down Expand Up @@ -425,26 +429,32 @@ private List<ManifestEntry> readManifest(
ManifestFileMeta manifest,
@Nullable Filter<InternalRow> additionalFilter,
@Nullable Filter<ManifestEntry> additionalTFilter) {
ManifestFile manifestFile = manifestFileFactory.create();
List<ManifestEntry> entries =
manifestFileFactory
.create()
.read(
manifest.fileName(),
manifest.fileSize(),
createCacheRowFilter(),
createEntryRowFilter().and(additionalFilter),
entry ->
(additionalTFilter == null || additionalTFilter.test(entry))
&& (manifestEntryFilter == null
|| manifestEntryFilter.test(entry))
&& filterByStats(entry));
manifestFile.read(
manifest.fileName(),
manifest.fileSize(),
createCacheRowFilter(),
createEntryRowFilter().and(additionalFilter),
entry ->
(additionalTFilter == null || additionalTFilter.test(entry))
&& (manifestEntryFilter == null
|| manifestEntryFilter.test(entry))
&& filterByStats(entry));
if (dropStats) {
List<ManifestEntry> copied = new ArrayList<>(entries.size());
for (ManifestEntry entry : entries) {
copied.add(dropStats(entry));
}
entries = copied;
}
if (scanMetrics != null && manifestFile.hitCache() != null) {
if (manifestFile.hitCache()) {
this.scanMetrics.getObjectCacheStats().increaseHitObject();
} else {
this.scanMetrics.getObjectCacheStats().increaseMissedObject();
}
}
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.metrics;

/** Statistics for cache of a caching catalog. */
public class CatalogCacheStats {
private long databaseCacheSize;
private long tableCacheSize;
private long manifestCacheSize;
private long partitionCacheSize;

public CatalogCacheStats() {}

public long getDatabaseCacheSize() {
return databaseCacheSize;
}

public void setDatabaseCacheSize(long databaseCacheSize) {
this.databaseCacheSize = databaseCacheSize;
}

public long getTableCacheSize() {
return tableCacheSize;
}

public void setTableCacheSize(long tableCacheSize) {
this.tableCacheSize = tableCacheSize;
}

public long getManifestCacheSize() {
return manifestCacheSize;
}

public void setManifestCacheSize(long manifestCacheSize) {
this.manifestCacheSize = manifestCacheSize;
}

public long getPartitionCacheSize() {
return partitionCacheSize;
}

public void setPartitionCacheSize(long partitionCacheSize) {
this.partitionCacheSize = partitionCacheSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.metrics;

import java.util.concurrent.atomic.AtomicLong;

/** Statistics for object cache of a caching catalog. */
public class ObjectCacheStats {
private final AtomicLong hitObject;
private final AtomicLong missedObject;

public ObjectCacheStats() {
this.hitObject = new AtomicLong(0);
this.missedObject = new AtomicLong(0);
}

public AtomicLong getHitObject() {
return hitObject;
}

public void increaseHitObject() {
this.hitObject.incrementAndGet();
}

public AtomicLong getMissedObject() {
return missedObject;
}

public void increaseMissedObject() {
this.missedObject.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ public class ScanMetrics {
public static final String GROUP_NAME = "scan";

private final MetricGroup metricGroup;
private Histogram durationHistogram;

private ScanStats latestScan;
private ObjectCacheStats objectCacheStats;

public ScanMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
this.objectCacheStats = new ObjectCacheStats();
registerGenericScanMetrics();
}

Expand All @@ -41,10 +46,6 @@ public MetricGroup getMetricGroup() {
return metricGroup;
}

private Histogram durationHistogram;

private ScanStats latestScan;

public static final String LAST_SCAN_DURATION = "lastScanDuration";
public static final String SCAN_DURATION = "scanDuration";
public static final String LAST_SCANNED_MANIFESTS = "lastScannedManifests";
Expand Down Expand Up @@ -72,4 +73,16 @@ public void reportScan(ScanStats scanStats) {
latestScan = scanStats;
durationHistogram.update(scanStats.getDuration());
}

public ScanStats getLatestScan() {
return latestScan;
}

public ObjectCacheStats getObjectCacheStats() {
return objectCacheStats;
}

public void setObjectCacheStats(ObjectCacheStats objectCacheStats) {
this.objectCacheStats = objectCacheStats;
}
}
Loading

0 comments on commit 1213bad

Please sign in to comment.