Skip to content

Commit

Permalink
Integrated with ehcache
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Mar 4, 2024
1 parent 810096f commit 775bd9b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;

import java.util.HashMap;
import java.util.Map;

import static org.opensearch.common.settings.Setting.Property.Dynamic;
import static org.opensearch.common.settings.Setting.Property.NodeScope;

/**
Expand Down Expand Up @@ -110,6 +112,22 @@ public class EhcacheDiskCacheSettings {
(key) -> Setting.boolSetting(key, false, NodeScope)
);

/**
* Defines whether to use an in-memory keystore to check for probable presence of keys before having to go to disk.
*/
public static final Setting.AffixSetting<Boolean> USE_RBM_KEYSTORE_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + "use_keystore",
(key) -> Setting.boolSetting(key, true, NodeScope)
);

/**
* Defines the max size of the RBM keystore if used (as a percentage of heap memory)
*/
public static final Setting.AffixSetting<ByteSizeValue> RBM_KEYSTORE_SIZE_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + "keystore_size",
(key) -> Setting.memorySizeSetting(key, "0.05%", NodeScope)
);

/**
* Key for disk segment.
*/
Expand Down Expand Up @@ -150,29 +168,30 @@ public class EhcacheDiskCacheSettings {
* Key for listener mode
*/
public static final String DISK_LISTENER_MODE_SYNC_KEY = "disk_listener_mode";
/**
* Key for whether to use RBM keystore
*/
public static final String USE_RBM_KEYSTORE_KEY = "use_keystore";
/**
* Key for the keystore size in bytes
*/
public static final String RBM_KEYSTORE_SIZE_KEY = "keystore_size";

/**
* Map of key to setting.
*/
private static final Map<String, Setting.AffixSetting<?>> KEY_SETTING_MAP = Map.of(
DISK_SEGMENT_KEY,
DISK_SEGMENTS_SETTING,
DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY,
DISK_CACHE_EXPIRE_AFTER_ACCESS_SETTING,
DISK_CACHE_ALIAS_KEY,
DISK_CACHE_ALIAS_SETTING,
DISK_WRITE_CONCURRENCY_KEY,
DISK_WRITE_CONCURRENCY_SETTING,
DISK_WRITE_MAXIMUM_THREADS_KEY,
DISK_WRITE_MAXIMUM_THREADS_SETTING,
DISK_WRITE_MIN_THREADS_KEY,
DISK_WRITE_MINIMUM_THREADS_SETTING,
DISK_STORAGE_PATH_KEY,
DISK_STORAGE_PATH_SETTING,
DISK_MAX_SIZE_IN_BYTES_KEY,
DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING,
DISK_LISTENER_MODE_SYNC_KEY,
DISK_CACHE_LISTENER_MODE_SYNC_SETTING
private static final Map<String, Setting.AffixSetting<?>> KEY_SETTING_MAP = Map.ofEntries(
Map.entry(DISK_SEGMENT_KEY, DISK_SEGMENTS_SETTING),
Map.entry(DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY, DISK_CACHE_EXPIRE_AFTER_ACCESS_SETTING),
Map.entry(DISK_CACHE_ALIAS_KEY, DISK_CACHE_ALIAS_SETTING),
Map.entry(DISK_WRITE_CONCURRENCY_KEY, DISK_WRITE_CONCURRENCY_SETTING),
Map.entry(DISK_WRITE_MAXIMUM_THREADS_KEY, DISK_WRITE_MAXIMUM_THREADS_SETTING),
Map.entry(DISK_WRITE_MIN_THREADS_KEY, DISK_WRITE_MINIMUM_THREADS_SETTING),
Map.entry(DISK_STORAGE_PATH_KEY, DISK_STORAGE_PATH_SETTING),
Map.entry(DISK_MAX_SIZE_IN_BYTES_KEY, DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING),
Map.entry(DISK_LISTENER_MODE_SYNC_KEY, DISK_CACHE_LISTENER_MODE_SYNC_SETTING),
Map.entry(USE_RBM_KEYSTORE_KEY, USE_RBM_KEYSTORE_SETTING),
Map.entry(RBM_KEYSTORE_SIZE_KEY, RBM_KEYSTORE_SIZE_SETTING)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.cache.keystore.RBMIntKeyLookupStore;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
Expand All @@ -20,6 +21,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.keystore.KeyLookupStore;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -57,6 +59,7 @@
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;
import org.ehcache.spi.loaderwriter.CacheLoadingException;
import org.ehcache.spi.loaderwriter.CacheWritingException;
import org.opensearch.core.common.unit.ByteSizeValue;

import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_ALIAS_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY;
Expand All @@ -67,6 +70,8 @@
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_WRITE_CONCURRENCY_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_WRITE_MAXIMUM_THREADS_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_WRITE_MIN_THREADS_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.RBM_KEYSTORE_SIZE_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.USE_RBM_KEYSTORE_KEY;

/**
* This variant of disk cache uses Ehcache underneath.
Expand Down Expand Up @@ -111,6 +116,8 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
*/
Map<K, CompletableFuture<Tuple<K, V>>> completableFutureMap = new ConcurrentHashMap<>();

KeyLookupStore<Integer> keystore = null;

private EhcacheDiskCache(Builder<K, V> builder) {
this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null");
this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null");
Expand Down Expand Up @@ -140,6 +147,15 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.removalListener = builder.getRemovalListener();
this.ehCacheEventListener = new EhCacheEventListener<K, V>(builder.getRemovalListener());
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
boolean useRBMKeystore = (Boolean) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(USE_RBM_KEYSTORE_KEY)
.get(settings);
if (useRBMKeystore) {
long keystoreSize = ((ByteSizeValue) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(RBM_KEYSTORE_SIZE_KEY)
.get(settings)).getBytes();
this.keystore = new RBMIntKeyLookupStore(keystoreSize);
}
}

private Cache<K, V> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
Expand Down Expand Up @@ -236,11 +252,13 @@ public V get(K key) {
if (key == null) {
throw new IllegalArgumentException("Key passed to ehcache disk cache was null.");
}
V value;
try {
value = cache.get(key);
} catch (CacheLoadingException ex) {
throw new OpenSearchException("Exception occurred while trying to fetch item from ehcache disk cache");
V value = null;
if (keystore == null || keystore.contains(key.hashCode()) || keystore.isFull()) {
try {
value = cache.get(key);
} catch (CacheLoadingException ex) {
throw new OpenSearchException("Exception occurred while trying to fetch item from ehcache disk cache");
}
}
return value;
}
Expand All @@ -254,6 +272,9 @@ public V get(K key) {
public void put(K key, V value) {
try {
cache.put(key, value);
if (keystore != null) {
keystore.add(key.hashCode());
}
} catch (CacheWritingException ex) {
throw new OpenSearchException("Exception occurred while put item to ehcache disk cache");
}
Expand Down

0 comments on commit 775bd9b

Please sign in to comment.