Skip to content

Commit

Permalink
Integrating onHeap store aware cache with IRC
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Dec 29, 2023
1 parent 39c4108 commit 4eed280
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 23 deletions.
21 changes: 21 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/CacheType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

public enum CacheType {

ON_HEAP("on_heap"),
TIERED("tiered");

private final String cacheType;

CacheType(String cacheType) {
this.cacheType = cacheType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.cache.CacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification;
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
import org.opensearch.common.cache.store.enums.CacheStoreType;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -78,7 +81,7 @@
*
* @opensearch.internal
*/
public final class IndicesRequestCache implements RemovalListener<IndicesRequestCache.Key, BytesReference>, Closeable {
public final class IndicesRequestCache implements StoreAwareCacheEventListener<IndicesRequestCache.Key, BytesReference>, Closeable {

private static final Logger logger = LogManager.getLogger(IndicesRequestCache.class);

Expand All @@ -103,24 +106,58 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
Property.NodeScope
);

public static final Setting<String> INDICES_REQUEST_CACHE_TYPE = Setting.simpleString(
"indices.requests.cache" + ".type",
Property.NodeScope
);

private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
private final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
private final ByteSizeValue size;
private final TimeValue expire;
private final Cache<Key, BytesReference> cache;
private final ICache<Key, BytesReference> cache;

IndicesRequestCache(Settings settings) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
CacheBuilder<Key, BytesReference> cacheBuilder = CacheBuilder.<Key, BytesReference>builder()
.setMaximumWeight(sizeInBytes)
.weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed())
.removalListener(this);
String cacheTypeInString = INDICES_REQUEST_CACHE_TYPE.get(settings);
StoreAwareCacheBuilder<Key, BytesReference> openSearchOnHeapCacheBuilder = new OpenSearchOnHeapCache.Builder<Key, BytesReference>()
.setMaximumWeightInBytes(sizeInBytes)
.setWeigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed());
if (expire != null) {
cacheBuilder.setExpireAfterAccess(expire);
openSearchOnHeapCacheBuilder.setExpireAfterAccess(expire);
}
cache = cacheBuilder.build();
openSearchOnHeapCacheBuilder.setEventListener(this);
cache = openSearchOnHeapCacheBuilder.build();
// if (cacheTypeInString == null || cacheTypeInString.isBlank()) {
// openSearchOnHeapCacheBuilder.setEventListener(this);
// cache = openSearchOnHeapCacheBuilder.build();
// } else {
// CacheType cacheType;
// try {
// cacheType = CacheType.valueOf(cacheTypeInString);
// } catch (IllegalArgumentException ex) {
// openSearchOnHeapCacheBuilder.setEventListener(this);
// cache = openSearchOnHeapCacheBuilder.build();
// return;
// }
// switch (cacheType) {
// case ON_HEAP:
// openSearchOnHeapCacheBuilder.setEventListener(this);
// cache = openSearchOnHeapCacheBuilder.build();
// break;
// case TIERED:
// cache =
// new TieredSpilloverCache.Builder<Key, BytesReference>()
// .setOnHeapCacheBuilder(openSearchOnHeapCacheBuilder)
// .setListener(this)
// .build();
// default:
// throw new IllegalArgumentException("Unknown cache type");
//
// }
// }
}

@Override
Expand All @@ -133,11 +170,6 @@ void clear(CacheEntity entity) {
cleanCache();
}

@Override
public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
notification.getKey().entity.onRemoval(notification);
}

BytesReference getOrCompute(
CacheEntity cacheEntity,
CheckedSupplier<BytesReference, IOException> loader,
Expand All @@ -149,7 +181,6 @@ BytesReference getOrCompute(
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
key.entity.onMiss();
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Expand All @@ -158,8 +189,6 @@ BytesReference getOrCompute(
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
}
}
} else {
key.entity.onHit();
}
return value;
}
Expand All @@ -175,12 +204,32 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference
cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey));
}

@Override
public void onMiss(Key key, CacheStoreType cacheStoreType) {
key.entity.onMiss();
}

@Override
public void onRemoval(StoreAwareCacheRemovalNotification<Key, BytesReference> notification) {
notification.getKey().entity.onRemoval(notification);
}

@Override
public void onHit(Key key, BytesReference value, CacheStoreType cacheStoreType) {
key.entity.onHit();
}

@Override
public void onCached(Key key, BytesReference value, CacheStoreType cacheStoreType) {
key.entity.onCached(key, value);
}

/**
* Loader for the request cache
*
* @opensearch.internal
*/
private static class Loader implements CacheLoader<Key, BytesReference> {
private static class Loader implements LoadAwareCacheLoader<Key, BytesReference> {

private final CacheEntity entity;
private final CheckedSupplier<BytesReference, IOException> loader;
Expand All @@ -191,14 +240,14 @@ private static class Loader implements CacheLoader<Key, BytesReference> {
this.loader = loader;
}

@Override
public boolean isLoaded() {
return this.loaded;
}

@Override
public BytesReference load(Key key) throws Exception {
BytesReference value = loader.get();
entity.onCached(key, value);
loaded = true;
return value;
}
Expand Down Expand Up @@ -364,7 +413,7 @@ synchronized void cleanCache() {
/**
* Returns the current size of the cache
*/
int count() {
long count() {
return cache.count();
}

Expand Down

0 comments on commit 4eed280

Please sign in to comment.