From 1832df9c075b1f80a5007a48841db5002015fe59 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 2 Jan 2024 23:49:31 -0800 Subject: [PATCH] Adding javadoc and more test for cache.put Signed-off-by: Sagar Upadhyaya --- .../cache/store/StoreAwareCacheValue.java | 2 +- .../cache/tier/TieredSpilloverCache.java | 22 +- .../cache/tier/TieredSpilloverCacheTests.java | 207 +++++++----------- 3 files changed, 100 insertions(+), 131 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java index e911b1041ba31..4fbbbbfebfaa7 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java +++ b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java @@ -29,7 +29,7 @@ public V getValue() { return value; } - public CacheStoreType getSource() { + public CacheStoreType getCacheStoreType() { return source; } } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java index f9d7726af16f0..26fa12ee4b726 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java @@ -97,7 +97,7 @@ public void put(K key, V value) { public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { // We are skipping calling event listeners at this step as we do another get inside below computeIfAbsent. // Where we might end up calling onMiss twice for a key not present in onHeap cache. - // Similary we might end up calling both onMiss and onHit for a key, in case we are received concurrent + // Similary we might end up calling both onMiss and onHit for a key, in case we are receiving concurrent // requests for the same key which requires loading only once. StoreAwareCacheValue cacheValue = getValueFromTieredCache(false).apply(key); if (cacheValue == null) { @@ -117,8 +117,8 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Except } return value; } - listener.onHit(key, cacheValue.getValue(), cacheValue.getSource()); - if (cacheValue.getSource().equals(CacheStoreType.DISK)) { + listener.onHit(key, cacheValue.getValue(), cacheValue.getCacheStoreType()); + if (cacheValue.getCacheStoreType().equals(CacheStoreType.DISK)) { listener.onMiss(key, CacheStoreType.ON_HEAP); } return cacheValue.getValue(); @@ -145,6 +145,10 @@ public void invalidateAll() { } } + /** + * Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache. + * @return An iterable over (onHeap + disk) keys + */ @Override public Iterable keys() { Iterable onDiskKeysIterable; @@ -167,11 +171,19 @@ public long count() { @Override public void refresh() { - for (StoreAwareCache storeAwareCache : cacheList) { - storeAwareCache.refresh(); + try (ReleasableLock ignore = writeLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.refresh(); + } } } + /** + * Provides an iteration over keys based on desired on cacheStoreType. This is not protected from any mutations + * to the cache. + * @param type Type of cacheStoreType + * @return An iterable over desired CacheStoreType keys + */ @Override public Iterable cacheKeys(CacheStoreType type) { switch (type) { diff --git a/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java index 0dbec1db302f4..d412da2cd497c 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java @@ -222,9 +222,9 @@ public void testWithDiskTierNull() throws Exception { int onHeapCacheSize = randomIntBetween(10, 30); MockCacheEventListener eventListener = new MockCacheEventListener(); - StoreAwareCacheBuilder onHeapCacheBuilder = new MockOnHeapCache.Builder().setMaxSize( - onHeapCacheSize - ); + StoreAwareCacheBuilder onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder() + .setMaximumWeightInBytes(onHeapCacheSize * 20) + .setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() .setOnHeapCacheBuilder(onHeapCacheBuilder) .setListener(eventListener) @@ -259,6 +259,69 @@ public void testPut() { assertEquals(1, tieredSpilloverCache.count()); } + public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(200, 400); + int diskCacheSize = randomIntBetween(450, 800); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + for (int i = 0; i < onHeapCacheSize; i++) { + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) throws Exception { + return UUID.randomUUID().toString(); + } + }); + } + + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(0, tieredSpilloverCache.getOnDiskCache().get().count()); + + // Again try to put OnHeap cache capacity amount of new items. + List newKeyList = new ArrayList<>(); + for (int i = 0; i < onHeapCacheSize; i++) { + newKeyList.add(UUID.randomUUID().toString()); + } + + for (int i = 0; i < newKeyList.size(); i++) { + tieredSpilloverCache.computeIfAbsent(newKeyList.get(i), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) { + return UUID.randomUUID().toString(); + } + }); + } + + // Verify that new items are part of onHeap cache. + List actualOnHeapCacheKeys = new ArrayList<>(); + tieredSpilloverCache.cacheKeys(CacheStoreType.ON_HEAP).forEach(actualOnHeapCacheKeys::add); + + assertEquals(newKeyList.size(), actualOnHeapCacheKeys.size()); + for (int i = 0; i < actualOnHeapCacheKeys.size(); i++) { + assertTrue(newKeyList.contains(actualOnHeapCacheKeys.get(i))); + } + + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnDiskCache().get().count()); + } + public void testInvalidate() { int onHeapCacheSize = 1; int diskCacheSize = 10; @@ -288,7 +351,7 @@ public void testInvalidate() { tieredSpilloverCache.put(key2, UUID.randomUUID().toString()); assertEquals(2, tieredSpilloverCache.count()); // Again invalidate older key - tieredSpilloverCache.invalidate(key2); + tieredSpilloverCache.invalidate(key); assertEquals(1, eventListener.enumMap.get(CacheStoreType.DISK).invalidationMetric.count()); assertEquals(1, tieredSpilloverCache.count()); } @@ -305,21 +368,22 @@ public void testCacheKeys() throws Exception { eventListener, 0 ); - // Put values in cache more than it's size and cause evictions from onHeap. - int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); List onHeapKeys = new ArrayList<>(); List diskTierKeys = new ArrayList<>(); - for (int iter = 0; iter < numOfItems1; iter++) { + // During first round add onHeapCacheSize entries. Will go to onHeap cache initially. + for (int i = 0; i < onHeapCacheSize; i++) { String key = UUID.randomUUID().toString(); - if (iter > (onHeapCacheSize - 1)) { - // All these are bound to go to disk based cache. - diskTierKeys.add(key); - } else { - onHeapKeys.add(key); - } - LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); - tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader); + diskTierKeys.add(key); + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader()); + } + // In another round, add another onHeapCacheSize entries. These will go to onHeap and above ones will be + // evicted to onDisk cache. + for (int i = 0; i < onHeapCacheSize; i++) { + String key = UUID.randomUUID().toString(); + onHeapKeys.add(key); + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader()); } + List actualOnHeapKeys = new ArrayList<>(); List actualOnDiskKeys = new ArrayList<>(); Iterable onHeapiterable = tieredSpilloverCache.cacheKeys(CacheStoreType.ON_HEAP); @@ -607,9 +671,9 @@ private TieredSpilloverCache intializeTieredSpilloverCache( ) { StoreAwareCacheBuilder diskCacheBuilder = new MockOnDiskCache.Builder().setMaxSize(diksCacheSize) .setDeliberateDelay(diskDeliberateDelay); - StoreAwareCacheBuilder onHeapCacheBuilder = new MockOnHeapCache.Builder().setMaxSize( - onHeapCacheSize - ); + StoreAwareCacheBuilder onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder() + .setMaximumWeightInBytes(onHeapCacheSize * 20) + .setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries return new TieredSpilloverCache.Builder().setOnHeapCacheBuilder(onHeapCacheBuilder) .setOnDiskCacheBuilder(diskCacheBuilder) .setListener(eventListener) @@ -728,110 +792,3 @@ public Builder setDeliberateDelay(long millis) { } } } - -class MockOnHeapCache implements StoreAwareCache { - - Map cache; - int maxSize; - StoreAwareCacheEventListener eventListener; - - MockOnHeapCache(int size, StoreAwareCacheEventListener eventListener) { - maxSize = size; - this.cache = new ConcurrentHashMap(); - this.eventListener = eventListener; - } - - @Override - public V get(K key) { - V value = cache.get(key); - if (value != null) { - eventListener.onHit(key, value, CacheStoreType.ON_HEAP); - } else { - eventListener.onMiss(key, CacheStoreType.ON_HEAP); - } - return value; - } - - @Override - public void put(K key, V value) { - if (this.cache.size() >= maxSize) { - eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, value, RemovalReason.EVICTED, CacheStoreType.ON_HEAP)); - return; - } - this.cache.put(key, value); - eventListener.onCached(key, value, CacheStoreType.ON_HEAP); - } - - @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { - if (this.cache.size() >= maxSize) { // If it exceeds, just notify for evict. - eventListener.onRemoval( - new StoreAwareCacheRemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, CacheStoreType.ON_HEAP) - ); - return loader.load(key); - } - V value = cache.computeIfAbsent(key, key1 -> { - try { - return loader.load(key); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - if (!loader.isLoaded()) { - eventListener.onHit(key, value, CacheStoreType.ON_HEAP); - } else { - eventListener.onMiss(key, CacheStoreType.ON_HEAP); - eventListener.onCached(key, value, CacheStoreType.ON_HEAP); - } - return value; - } - - @Override - public void invalidate(K key) { - if (this.cache.containsKey(key)) { - eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, null, RemovalReason.INVALIDATED, CacheStoreType.ON_HEAP)); - } - this.cache.remove(key); - - } - - @Override - public void invalidateAll() { - this.cache.clear(); - } - - @Override - public Iterable keys() { - return this.cache.keySet(); - } - - @Override - public long count() { - return this.cache.size(); - } - - @Override - public void refresh() { - - } - - @Override - public CacheStoreType getTierType() { - return CacheStoreType.ON_HEAP; - } - - public static class Builder extends StoreAwareCacheBuilder { - - int maxSize; - - @Override - public StoreAwareCache build() { - return new MockOnHeapCache<>(maxSize, this.getEventListener()); - } - - public Builder setMaxSize(int maxSize) { - this.maxSize = maxSize; - return this; - } - } -}