Skip to content

Commit

Permalink
[Tiered Caching] Stats rework (1/3): Interfaces and implementations f…
Browse files Browse the repository at this point in the history
…or individual tiers (opensearch-project#12531)

As part of tiered caching stats, changes the common ICache interface to use ICacheKey as its key. This key contains
dimensions (for example, shard ID, index name, or tier) that can be used to aggregate stats. Also changes the
CacheStats interface to store the necessary cache stats, and to support getting stats either as a total or aggregated by
these dimensions.

Integrates these changes with OpenSearchOnHeapCache and EhcacheDiskCache. The stats implementation for the
TieredSpilloverCache will be in a followup PR.

---------

Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
peteralfonsi and Peter Alfonsi committed Aug 30, 2024
1 parent 40a8d34 commit b0de406
Showing 24 changed files with 2,410 additions and 331 deletions.
Original file line number Diff line number Diff line change
@@ -12,11 +12,13 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
@@ -54,7 +56,11 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;

// The listener for removals from the spillover cache as a whole
// TODO: In TSC stats PR, each tier will have its own separate removal listener.
private final RemovalListener<ICacheKey<K>, V> removalListener;
private final List<String> dimensionNames;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
@@ -70,9 +76,9 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
@@ -87,6 +93,7 @@ && evaluatePolicies(notification.getValue())) {
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
@@ -97,7 +104,7 @@ && evaluatePolicies(notification.getValue())) {
);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

@@ -112,19 +119,19 @@ ICache<K, V> getDiskCache() {
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
@@ -141,7 +148,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
@@ -167,9 +174,9 @@ public void invalidateAll() {
*/
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<K> keys() {
Iterable<K>[] iterables = (Iterable<K>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<K>(iterables);
public Iterable<ICacheKey<K>> keys() {
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<ICacheKey<K>>(iterables);
}

@Override
@@ -197,7 +204,12 @@ public void close() throws IOException {
}
}

private Function<K, V> getValueFromTieredCache() {
@Override
public ImmutableCacheStatsHolder stats() {
return null; // TODO: in TSC stats PR
}

private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
@@ -354,7 +366,7 @@ public String getCacheName() {
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private RemovalListener<ICacheKey<K>, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
@@ -390,7 +402,7 @@ public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
public Builder<K, V> setRemovalListener(RemovalListener<ICacheKey<K>, V> removalListener) {
this.removalListener = removalListener;
return this;
}
Original file line number Diff line number Diff line change
@@ -10,11 +10,13 @@

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

@@ -25,27 +27,27 @@

public class MockDiskCache<K, V> implements ICache<K, V> {

Map<K, V> cache;
Map<ICacheKey<K>, V> cache;
int maxSize;
long delay;

private final RemovalListener<K, V> removalListener;
private final RemovalListener<ICacheKey<K>, V> removalListener;

public MockDiskCache(int maxSize, long delay, RemovalListener<K, V> removalListener) {
public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<K, V>();
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
V value = cache.get(key);
return value;
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
}
@@ -58,7 +60,7 @@ public void put(K key, V value) {
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) {
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
@@ -70,7 +72,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
this.cache.remove(key);
}

@@ -80,7 +82,7 @@ public void invalidateAll() {
}

@Override
public Iterable<K> keys() {
public Iterable<ICacheKey<K>> keys() {
return () -> new CacheKeyIterator<>(cache, removalListener);
}

@@ -92,6 +94,11 @@ public long count() {
@Override
public void refresh() {}

@Override
public ImmutableCacheStatsHolder stats() {
return null;
}

@Override
public void close() {

Loading

0 comments on commit b0de406

Please sign in to comment.