From e1bf33e2caea0c221377a3c38acd81d84237b373 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 12 Jan 2024 09:27:05 -0800 Subject: [PATCH] Changes to add computeIfAbsent functionality for ehcache disk cache Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/common/cache/ICache.java | 5 + .../cache/store/OpenSearchOnHeapCache.java | 20 +- .../common/cache/tier/EhCacheDiskCache.java | 208 +++++++++++++--- .../cache/tier/TieredSpilloverCache.java | 32 ++- .../cache/tier/EhCacheDiskCacheTests.java | 232 ++++++++++++++++-- .../cache/tier/TieredSpilloverCacheTests.java | 6 + 6 files changed, 440 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/ICache.java b/server/src/main/java/org/opensearch/common/cache/ICache.java index c6ea5fca1a8fe..a66a266e5d5c8 100644 --- a/server/src/main/java/org/opensearch/common/cache/ICache.java +++ b/server/src/main/java/org/opensearch/common/cache/ICache.java @@ -8,6 +8,8 @@ package org.opensearch.common.cache; +import org.opensearch.common.cache.stats.CacheStats; + /** * Represents a cache interface. * @param Type of key. @@ -31,4 +33,7 @@ public interface ICache { long count(); void refresh(); + void close(); + + CacheStats stats(); } diff --git a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java index c81f9f5e3b809..da8acb48936a8 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java @@ -13,6 +13,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.stats.CacheStats; import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; import org.opensearch.common.cache.store.enums.CacheStoreType; import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; @@ -30,6 +31,8 @@ public class OpenSearchOnHeapCache implements StoreAwareCache, Remov private final StoreAwareCacheEventListener eventListener; + private final CacheStats stats = new OpenSearchOnHeapCacheStats(); + public OpenSearchOnHeapCache(Builder builder) { CacheBuilder cacheBuilder = CacheBuilder.builder() .setMaximumWeight(builder.getMaxWeightInBytes()) @@ -88,7 +91,7 @@ public Iterable keys() { @Override public long count() { - return cache.count(); + return stats.count(); } @Override @@ -104,6 +107,11 @@ public CacheStoreType getTierType() { @Override public void close() {} + @Override + public CacheStats stats() { + return stats; + } + @Override public void onRemoval(RemovalNotification notification) { eventListener.onRemoval( @@ -116,6 +124,16 @@ public void onRemoval(RemovalNotification notification) { ); } + /** + * Stats for opensearch on heap cache. + */ + class OpenSearchOnHeapCacheStats implements CacheStats { + @Override + public long count() { + return cache.count(); + } + } + /** * Builder object * @param Type of key diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCache.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCache.java index 02c1103adcc85..1c15d33ce6d94 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCache.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCache.java @@ -8,14 +8,20 @@ package org.opensearch.common.cache.tier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.ehcache.spi.loaderwriter.CacheLoadingException; +import org.ehcache.spi.loaderwriter.CacheWritingException; import org.opensearch.OpenSearchException; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.stats.CacheStats; import org.opensearch.common.cache.store.StoreAwareCache; import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; import org.opensearch.common.cache.store.enums.CacheStoreType; import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -24,8 +30,15 @@ import java.io.File; import java.time.Duration; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.function.Supplier; import org.ehcache.Cache; @@ -42,9 +55,20 @@ import org.ehcache.event.EventType; import org.ehcache.expiry.ExpiryPolicy; import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; +import org.opensearch.common.util.concurrent.ReleasableLock; +/** + * This variant of disk cache uses Ehcache underneath. + * @param Type of key. + * @param Type of value. + * + * @opensearch.experimental + * + */ public class EhCacheDiskCache implements StoreAwareCache { + private static final Logger logger = LogManager.getLogger(EhCacheDiskCache.class); + // A Cache manager can create many caches. private final PersistentCacheManager cacheManager; @@ -59,14 +83,14 @@ public class EhCacheDiskCache implements StoreAwareCache { private final TimeValue expireAfterAccess; + private final DiskCacheStats stats = new DiskCacheStats(); + private final EhCacheEventListener ehCacheEventListener; private final String threadPoolAlias; private final Settings settings; - private CounterMetric count = new CounterMetric(); - private final static String DISK_CACHE_ALIAS = "ehDiskCache"; private final static String THREAD_POOL_ALIAS_PREFIX = "ehcachePool"; @@ -86,11 +110,17 @@ public class EhCacheDiskCache implements StoreAwareCache { public final Setting DISK_WRITE_CONCURRENCY; // Defines how many segments the disk cache is separated into. Higher number achieves greater concurrency but - // will hold that many file pointers. + // will hold that many file pointers. Default is 16. public final Setting DISK_SEGMENTS; private final StoreAwareCacheEventListener eventListener; + /** + * Used in computeIfAbsent to synchronize loading of a given key. This is needed as ehcache doesn't provide a + * computeIfAbsent method. + */ + Map>> completableFutureMap = new ConcurrentHashMap<>(); + private EhCacheDiskCache(Builder builder) { this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null"); this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null"); @@ -107,12 +137,12 @@ private EhCacheDiskCache(Builder builder) { } this.settings = Objects.requireNonNull(builder.settings, "Settings objects shouldn't be null"); Objects.requireNonNull(builder.settingPrefix, "Setting prefix shouldn't be null"); - this.DISK_WRITE_MINIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.min_threads", 2, 1, 5); - this.DISK_WRITE_MAXIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.max_threads", 2, 1, 20); + this.DISK_WRITE_MINIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tier.disk.ehcache.min_threads", 2, 1, 5); + this.DISK_WRITE_MAXIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tier.disk.ehcache.max_threads", 2, 1, 20); // Default value is 1 within EhCache. - this.DISK_WRITE_CONCURRENCY = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.concurrency", 2, 1, 3); + this.DISK_WRITE_CONCURRENCY = Setting.intSetting(builder.settingPrefix + ".tier.disk.ehcache.concurrency", 2, 1, 3); // Default value is 16 within Ehcache. - this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + ".ehcache.disk.segments", 16, 1, 32); + this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + "tier.disk.ehcache.segments", 16, 1, 32); this.cacheManager = buildCacheManager(); Objects.requireNonNull(builder.getEventListener(), "Listener can't be null"); this.eventListener = builder.getEventListener(); @@ -138,25 +168,25 @@ private Cache buildCache(Duration expireAfterAccess, Builder builder return this.cacheManager.createCache( DISK_CACHE_ALIAS, CacheConfigurationBuilder.newCacheConfigurationBuilder( - this.keyType, - this.valueType, - ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) - ).withExpiry(new ExpiryPolicy() { - @Override - public Duration getExpiryForCreation(K key, V value) { - return INFINITE; - } - - @Override - public Duration getExpiryForAccess(K key, Supplier value) { - return expireAfterAccess; - } - - @Override - public Duration getExpiryForUpdate(K key, Supplier oldValue, V newValue) { - return INFINITE; - } - }) + this.keyType, + this.valueType, + ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) + ).withExpiry(new ExpiryPolicy<>() { + @Override + public Duration getExpiryForCreation(K key, V value) { + return INFINITE; + } + + @Override + public Duration getExpiryForAccess(K key, Supplier value) { + return expireAfterAccess; + } + + @Override + public Duration getExpiryForUpdate(K key, Supplier oldValue, V newValue) { + return INFINITE; + } + }) .withService(getListenerConfiguration(builder)) .withService( new OffHeapDiskStoreConfiguration( @@ -184,10 +214,23 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder< } } + // Package private for testing + Map>> getCompletableFutureMap() { + return completableFutureMap; + } + @Override public V get(K key) { + if (key == null) { + throw new IllegalArgumentException("Key passed to ehcache disk cache was null."); + } // Optimize it by adding key store. - V value = cache.get(key); + V value; + try { + value = cache.get(key); + } catch (CacheLoadingException ex) { + throw new OpenSearchException("Exception occurred while trying to fetch item from ehcache disk cache"); + } if (value != null) { eventListener.onHit(key, value, CacheStoreType.DISK); } else { @@ -198,19 +241,93 @@ public V get(K key) { @Override public void put(K key, V value) { - cache.put(key, value); + try { + cache.put(key, value); + } catch (CacheWritingException ex) { + throw new OpenSearchException("Exception occurred while put item to ehcache disk cache"); + } } @Override public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { - // Ehcache doesn't offer any such function. Will have to implement our own if needed later on. - throw new UnsupportedOperationException(); + // Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is + // not performant in case there are multiple concurrent request for same key. Below is our own custom + // implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation. + V value = get(key); + if (value == null) { + value = compute(key, loader); + } + if (!loader.isLoaded()) { + eventListener.onHit(key, value, CacheStoreType.DISK); + } else { + eventListener.onMiss(key, CacheStoreType.DISK); + eventListener.onCached(key, value, CacheStoreType.DISK); + } + return value; + } + + private V compute(K key, LoadAwareCacheLoader loader) throws Exception { + // A future that returns a pair of key/value. + CompletableFuture> completableFuture = new CompletableFuture<>(); + // Only one of the threads will succeed putting a future into map for the same key. + // Rest will fetch existing future. + CompletableFuture> future = completableFutureMap.putIfAbsent(key, completableFuture); + // Handler to handle results post processing. Takes a tuple or exception as an input and returns + // the value. Also before returning value, puts the value in cache. + BiFunction, Throwable, V> handler = (pair, ex) -> { + V value = null; + if (pair != null) { + put(pair.v1(), pair.v2()); + value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should + // be safe to assume if we got no exception and reached here. + } + completableFutureMap.remove(key); // Remove key from map as not needed anymore. + return value; + }; + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V value; + try { + value = loader.load(key); + } catch (Exception ex) { + future.completeExceptionally(ex); + throw new ExecutionException(ex); + } + if (value == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Tuple<>(key, value)); + } + + } else { + completableValue = future.handle(handler); + } + V value; + try { + value = completableValue.get(); + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("Future completed exceptionally but no error thrown"); + } + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + return value; } @Override public void invalidate(K key) { // There seems to be an thread leak issue while calling this and then closing cache. - cache.remove(key); + try { + cache.remove(key); + } catch (CacheWritingException ex) { + // Handle + throw new RuntimeException(ex); + } } @Override @@ -225,12 +342,12 @@ public Iterable keys() { @Override public long count() { - return count.count(); + return stats.count(); } @Override public void refresh() { - // TODO + // TODO: ehcache doesn't provide a way to refresh a cache. } @Override @@ -249,6 +366,23 @@ public void close() { } } + @Override + public CacheStats stats() { + return stats; + } + + /** + * Stats related to disk cache. + */ + class DiskCacheStats implements CacheStats { + private CounterMetric count = new CounterMetric(); + + @Override + public long count() { + return count.count(); + } + } + /** * Wrapper over Ehcache original listener to listen to desired events and notify desired subscribers. * @param Type of key @@ -266,7 +400,7 @@ class EhCacheEventListener implements CacheEventListener { public void onEvent(CacheEvent event) { switch (event.getType()) { case CREATED: - count.inc(); + stats.count.inc(); this.eventListener.onCached(event.getKey(), event.getNewValue(), CacheStoreType.DISK); assert event.getOldValue() == null; break; @@ -279,11 +413,11 @@ public void onEvent(CacheEvent event) { CacheStoreType.DISK ) ); - count.dec(); + stats.count.dec(); assert event.getNewValue() == null; break; case REMOVED: - count.dec(); + stats.count.dec(); this.eventListener.onRemoval( new StoreAwareCacheRemovalNotification<>( event.getKey(), @@ -303,7 +437,7 @@ public void onEvent(CacheEvent event) { CacheStoreType.DISK ) ); - count.dec(); + stats.count.dec(); assert event.getNewValue() == null; break; case UPDATED: diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java index 3857083d630ee..d694932514f6a 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java @@ -11,6 +11,7 @@ import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.stats.CacheStats; import org.opensearch.common.cache.store.StoreAwareCache; import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; import org.opensearch.common.cache.store.StoreAwareCacheValue; @@ -45,10 +46,12 @@ public class TieredSpilloverCache implements ICache, StoreAwareCache private final Optional> onDiskCache; private final StoreAwareCache onHeapCache; private final StoreAwareCacheEventListener listener; + private final CacheStats stats = new TieredSpillOverCacheStats(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); + /** * Maintains caching tiers in ascending order of cache latency. */ @@ -162,11 +165,7 @@ public Iterable keys() { @Override public long count() { - long totalCount = 0; - for (StoreAwareCache storeAwareCache : cacheList) { - totalCount += storeAwareCache.count(); - } - return totalCount; + return stats.count(); } @Override @@ -180,6 +179,14 @@ public void refresh() { @Override public void close() { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.close(); + } + } + + @Override + public CacheStats stats() { + return stats; } @Override @@ -238,6 +245,21 @@ private Function> getValueFromTieredCache(boolean tri }; } + /** + * Stats for tiered spillover cache. + */ + class TieredSpillOverCacheStats implements CacheStats { + + @Override + public long count() { + long totalCount = 0; + for (StoreAwareCache storeAwareCache : cacheList) { + totalCount += storeAwareCache.count(); + } + return totalCount; + } + } + /** * Builder object for tiered spillover cache. * @param Type of key diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCacheTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCacheTests.java index c699e6f454412..2757c0f485d05 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCacheTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCacheTests.java @@ -8,6 +8,7 @@ package org.opensearch.common.cache.tier; +import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.store.StoreAwareCache; import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; import org.opensearch.common.cache.store.enums.CacheStoreType; @@ -18,15 +19,21 @@ import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.EnumMap; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.CoreMatchers.instanceOf; + public class EhCacheDiskCacheTests extends OpenSearchSingleNodeTestCase { private static final int CACHE_SIZE_IN_BYTES = 1024 * 101; @@ -58,8 +65,8 @@ public void testBasicGetAndPut() throws IOException { String value = ehcacheTest.get(entry.getKey()); assertEquals(entry.getValue(), value); } - assertEquals(randomKeys, mockEventListener.enumMap.get(EventType.ON_CACHED).get()); - assertEquals(randomKeys, mockEventListener.enumMap.get(EventType.ON_HIT).get()); + assertEquals(randomKeys, mockEventListener.onCachedCount.get()); + assertEquals(randomKeys, mockEventListener.onHitCount.get()); // Validate misses int expectedNumberOfMisses = randomIntBetween(10, 200); @@ -67,7 +74,7 @@ public void testBasicGetAndPut() throws IOException { ehcacheTest.get(UUID.randomUUID().toString()); } - assertEquals(expectedNumberOfMisses, mockEventListener.enumMap.get(EventType.ON_MISS).get()); + assertEquals(expectedNumberOfMisses, mockEventListener.onMissCount.get()); ehcacheTest.close(); } } @@ -110,7 +117,7 @@ public void testConcurrentPut() throws Exception { String value = ehcacheTest.get(entry.getKey()); assertEquals(entry.getValue(), value); } - assertEquals(randomKeys, mockEventListener.enumMap.get(EventType.ON_CACHED).get()); + assertEquals(randomKeys, mockEventListener.onCachedCount.get()); ehcacheTest.close(); } } @@ -154,7 +161,7 @@ public void testEhcacheParallelGets() throws Exception { } phaser.arriveAndAwaitAdvance(); // Will trigger parallel puts above. countDownLatch.await(); // Wait for all threads to finish - assertEquals(randomKeys, mockEventListener.enumMap.get(EventType.ON_HIT).get()); + assertEquals(randomKeys, mockEventListener.onHitCount.get()); ehcacheTest.close(); } } @@ -218,11 +225,201 @@ public void testEvictions() throws Exception { String key = "Key" + i; ehcacheTest.put(key, value); } - assertTrue(mockEventListener.enumMap.get(EventType.ON_REMOVAL).get() > 0); + assertTrue(mockEventListener.onRemovalCount.get() > 0); + ehcacheTest.close(); + } + } + + public void testComputeIfAbsentConcurrently() throws Exception { + Settings settings = Settings.builder().build(); + MockEventListener mockEventListener = new MockEventListener<>(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + StoreAwareCache ehcacheTest = new EhCacheDiskCache.Builder().setKeyType(String.class) + .setValueType(String.class) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setSettingPrefix(SETTING_PREFIX) + .setIsEventListenerModeSync(true) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setEventListener(mockEventListener) + .build(); + + int numberOfRequest = randomIntBetween(200, 400); + String key = UUID.randomUUID().toString(); + String value = "dummy"; + Thread[] threads = new Thread[numberOfRequest]; + Phaser phaser = new Phaser(numberOfRequest + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfRequest); + + List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + // Try to hit different request with the same key concurrently. Verify value is only loaded once. + for(int i = 0; i < numberOfRequest; i++) { + threads[i] = new Thread(() -> { + LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(String key) { + isLoaded = true; + return value; + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + phaser.arriveAndAwaitAdvance(); + try { + assertEquals(value, ehcacheTest.computeIfAbsent(key, loadAwareCacheLoader)); + } catch (Exception e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + int numberOfTimesValueLoaded = 0; + for (int i = 0; i < numberOfRequest; i++) { + if (loadAwareCacheLoaderList.get(i).isLoaded()) { + numberOfTimesValueLoaded++; + } + } + assertEquals(1, numberOfTimesValueLoaded); + assertEquals(0, ((EhCacheDiskCache) ehcacheTest).getCompletableFutureMap().size()); + assertEquals(1, mockEventListener.onMissCount.get()); + assertEquals(1, mockEventListener.onCachedCount.get()); + assertEquals(numberOfRequest - 1, mockEventListener.onHitCount.get()); ehcacheTest.close(); } } + public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception { + Settings settings = Settings.builder().build(); + MockEventListener mockEventListener = new MockEventListener<>(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + StoreAwareCache ehcacheTest = new EhCacheDiskCache.Builder().setKeyType(String.class) + .setValueType(String.class) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setSettingPrefix(SETTING_PREFIX) + .setIsEventListenerModeSync(true) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setEventListener(mockEventListener) + .build(); + + int numberOfRequest = randomIntBetween(200, 400); + String key = UUID.randomUUID().toString(); + Thread[] threads = new Thread[numberOfRequest]; + Phaser phaser = new Phaser(numberOfRequest + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfRequest); + + List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + // Try to hit different request with the same key concurrently. Loader throws exception. + for(int i = 0; i < numberOfRequest; i++) { + threads[i] = new Thread(() -> { + LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(String key) throws Exception { + isLoaded = true; + throw new RuntimeException("Exception"); + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + phaser.arriveAndAwaitAdvance(); + assertThrows(ExecutionException.class, () -> ehcacheTest.computeIfAbsent(key, + loadAwareCacheLoader)); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + assertEquals(0, ((EhCacheDiskCache) ehcacheTest).getCompletableFutureMap().size()); + ehcacheTest.close(); + } + } + + public void testComputeIfAbsentWithNullValueLoading() throws Exception { + Settings settings = Settings.builder().build(); + MockEventListener mockEventListener = new MockEventListener<>(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + StoreAwareCache ehcacheTest = new EhCacheDiskCache.Builder().setKeyType(String.class) + .setValueType(String.class) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setSettingPrefix(SETTING_PREFIX) + .setIsEventListenerModeSync(true) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setEventListener(mockEventListener) + .build(); + + int numberOfRequest = randomIntBetween(200, 400); + String key = UUID.randomUUID().toString(); + Thread[] threads = new Thread[numberOfRequest]; + Phaser phaser = new Phaser(numberOfRequest + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfRequest); + + List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + // Try to hit different request with the same key concurrently. Loader throws exception. + for(int i = 0; i < numberOfRequest; i++) { + threads[i] = new Thread(() -> { + LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(String key) throws Exception { + isLoaded = true; + return null; + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + phaser.arriveAndAwaitAdvance(); + try { + ehcacheTest.computeIfAbsent(key, loadAwareCacheLoader); + } catch (Exception ex) { + assertThat(ex.getCause(), instanceOf(NullPointerException.class)); + } + assertThrows(ExecutionException.class, () -> ehcacheTest.computeIfAbsent(key, + loadAwareCacheLoader)); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + assertEquals(0, ((EhCacheDiskCache) ehcacheTest).getCompletableFutureMap().size()); + ehcacheTest.close(); + } + } + + private static String generateRandomString(int length) { String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; StringBuilder randomString = new StringBuilder(length); @@ -245,41 +442,36 @@ enum EventType { class MockEventListener implements StoreAwareCacheEventListener { - EnumMap enumMap; + AtomicInteger onMissCount = new AtomicInteger(); + AtomicInteger onHitCount = new AtomicInteger(); + AtomicInteger onCachedCount = new AtomicInteger(); + AtomicInteger onRemovalCount = new AtomicInteger(); MockEventListener() { - enumMap = new EnumMap<>(EventType.class); - for (EventType eventType : EventType.values()) { - enumMap.put(eventType, new AtomicInteger()); - } } @Override public void onMiss(K key, CacheStoreType cacheStoreType) { assert cacheStoreType.equals(CacheStoreType.DISK); - AtomicInteger count = enumMap.get(EventType.ON_MISS); - count.incrementAndGet(); + onMissCount.incrementAndGet(); } @Override - public void onRemoval(StoreAwareCacheRemovalNotification notification) { + public void onRemoval(StoreAwareCacheRemovalNotification notification) { assert notification.getCacheStoreType().equals(CacheStoreType.DISK); - AtomicInteger count = enumMap.get(EventType.ON_REMOVAL); - count.incrementAndGet(); + onRemovalCount.incrementAndGet(); } @Override public void onHit(K key, V value, CacheStoreType cacheStoreType) { assert cacheStoreType.equals(CacheStoreType.DISK); - AtomicInteger count = enumMap.get(EventType.ON_HIT); - count.incrementAndGet(); + onHitCount.incrementAndGet(); } @Override public void onCached(K key, V value, CacheStoreType cacheStoreType) { assert cacheStoreType.equals(CacheStoreType.DISK); - AtomicInteger count = enumMap.get(EventType.ON_CACHED); - count.incrementAndGet(); + onCachedCount.incrementAndGet(); } } } diff --git a/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java index b9336e99d1d7a..a2a44e5224d9d 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.stats.CacheStats; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.StoreAwareCache; import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; @@ -766,6 +767,11 @@ public CacheStoreType getTierType() { @Override public void close() {} + @Override + public CacheStats stats() { + return null; + } + public static class Builder extends StoreAwareCacheBuilder { int maxSize;