From 6133ff04919826af3ff49ad3c2bf2b2b05b53adb Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 24 Apr 2024 18:29:43 -0700 Subject: [PATCH 1/5] [Tiered Caching] Expose a dynamic setting to disable/enable disk cache Signed-off-by: Sagar Upadhyaya --- .../TieredSpilloverCacheIT.java | 115 ++++++++++++++++++ .../common/tier/TieredSpilloverCache.java | 79 ++++++++---- .../tier/TieredSpilloverCachePlugin.java | 2 + .../tier/TieredSpilloverCacheSettings.java | 22 +++- .../tier/TieredSpilloverCacheTests.java | 115 ++++++++++++++++++ 5 files changed, 307 insertions(+), 26 deletions(-) diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java index 977a66c53b7e8..cbe16a690c104 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -425,6 +425,121 @@ public void testWithExplicitCacheClear() throws Exception { }, 1, TimeUnit.SECONDS); } + public void testWithDynamicDiskCacheSetting() throws Exception { + int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk. + internalCluster().startNode( + Settings.builder() + .put(defaultSettings(onHeapCacheSizeInBytes + "b")) + .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) + .build() + ); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + // Update took time policy to zero so that all entries are eligible to be cached on disk. + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.MILLISECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + int numberOfIndexedItems = randomIntBetween(5, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + long perQuerySizeInCacheInBytes = -1; + // Step 1: Hit some queries. We will see misses and queries will be cached(onto disk cache) for subsequent + // requests. + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + + // Step 2: Hit same queries again. We will see hits now. + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + long lastKnownHitCount = requestCacheStats.getHitCount(); + long lastKnownMissCount = requestCacheStats.getMissCount(); + + // Step 3: Turn off disk cache now. And hit same queries again. We should not see hits now as all queries + // were cached onto disk cache. + updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put(TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), false) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); // + // Still shows disk cache entries as explicit clear or invalidation is required to clean up disk cache. + assertEquals(lastKnownMissCount + numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount()); // No new hits being seen. + lastKnownMissCount = requestCacheStats.getMissCount(); + lastKnownHitCount = requestCacheStats.getHitCount(); + + // Step 4: Invalidate entries via refresh. + // Explicit refresh would invalidate cache entries. + refreshAndWaitForReplication(); + assertBusy(() -> { + // Explicit refresh should clear up cache entries + assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0); + }, 1, TimeUnit.SECONDS); + requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(0, lastKnownMissCount - requestCacheStats.getMissCount()); + assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount()); + } + private RequestCacheStats getRequestCacheStats(Client client, String indexName) { return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache(); } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index ae3d9f1dbcf62..c9cb869b79ff5 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -27,8 +27,9 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -38,6 +39,8 @@ import java.util.function.Function; import java.util.function.Predicate; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; + /** * This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap * and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full, @@ -67,12 +70,16 @@ public class TieredSpilloverCache implements ICache { /** * Maintains caching tiers in ascending order of cache latency. */ - private final List> cacheList; + private final Map, Boolean> cacheList; private final List> policies; + private boolean isDiskCacheEnabled; + TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null"); + Objects.requireNonNull(builder.cacheConfig, "cache config can't be null"); + Objects.requireNonNull(builder.cacheConfig.getClusterSettings(), "cluster settings can't be null"); this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null"); this.onHeapCache = builder.onHeapCacheFactory.create( @@ -80,7 +87,8 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification, V> notification) { try (ReleasableLock ignore = writeLock.acquire()) { - if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()) + if (isDiskCacheEnabled + && SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()) && evaluatePolicies(notification.getValue())) { diskCache.put(notification.getKey(), notification.getValue()); } else { @@ -103,9 +111,15 @@ && evaluatePolicies(notification.getValue())) { ); this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); - this.cacheList = Arrays.asList(onHeapCache, diskCache); + this.isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings()); + LinkedHashMap, Boolean> cacheListMap = new LinkedHashMap<>(); + cacheListMap.put(onHeapCache, true); + cacheListMap.put(diskCache, this.isDiskCacheEnabled); + this.cacheList = Collections.synchronizedMap(cacheListMap); this.dimensionNames = builder.cacheConfig.getDimensionNames(); this.policies = builder.policies; // Will never be null; builder initializes it to an empty list + builder.cacheConfig.getClusterSettings() + .addSettingsUpdateConsumer(DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType), this::enableDisableDiskCache); } // Package private for testing @@ -118,6 +132,14 @@ ICache getDiskCache() { return diskCache; } + // Package private for testing. + void enableDisableDiskCache(Boolean isDiskCacheEnabled) { + // When disk cache is disabled, we are not clearing up the disk cache entries yet as that should be part of + // separate cache/clear API. + this.cacheList.put(diskCache, isDiskCacheEnabled); + this.isDiskCacheEnabled = isDiskCacheEnabled; + } + @Override public V get(ICacheKey key) { return getValueFromTieredCache().apply(key); @@ -132,7 +154,6 @@ public void put(ICacheKey key, V value) { @Override public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { - V cacheValue = getValueFromTieredCache().apply(key); if (cacheValue == null) { // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. @@ -151,10 +172,10 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> public void invalidate(ICacheKey key) { // We are trying to invalidate the key from all caches though it would be present in only of them. // Doing this as we don't know where it is located. We could do a get from both and check that, but what will - // also trigger a hit/miss listener event, so ignoring it for now. + // also count hits/misses stats, so ignoring it for now. try (ReleasableLock ignore = writeLock.acquire()) { - for (ICache cache : cacheList) { - cache.invalidate(key); + for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + cacheEntry.getKey().invalidate(key); } } } @@ -162,8 +183,8 @@ public void invalidate(ICacheKey key) { @Override public void invalidateAll() { try (ReleasableLock ignore = writeLock.acquire()) { - for (ICache cache : cacheList) { - cache.invalidateAll(); + for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + cacheEntry.getKey().invalidateAll(); } } } @@ -175,15 +196,21 @@ public void invalidateAll() { @SuppressWarnings({ "unchecked" }) @Override public Iterable> keys() { - Iterable>[] iterables = (Iterable>[]) new Iterable[] { onHeapCache.keys(), diskCache.keys() }; - return new ConcatenatedIterables>(iterables); + List>> iterableList = new ArrayList<>(); + for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + iterableList.add(cacheEntry.getKey().keys()); + } + Iterable>[] iterables = (Iterable>[]) iterableList.toArray(new Iterable[0]); + return new ConcatenatedIterables<>(iterables); } @Override public long count() { long count = 0; - for (ICache cache : cacheList) { - count += cache.count(); + for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + // Count for all the tiers irrespective of whether they are enabled or not. As eventually + // this will turn to zero once cache is cleared up either via invalidation or manually. + count += cacheEntry.getKey().count(); } return count; } @@ -191,16 +218,19 @@ public long count() { @Override public void refresh() { try (ReleasableLock ignore = writeLock.acquire()) { - for (ICache cache : cacheList) { - cache.refresh(); + for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + if (cacheEntry.getValue()) { + cacheEntry.getKey().refresh(); + } } } } @Override public void close() throws IOException { - for (ICache cache : cacheList) { - cache.close(); + for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + // Close all the caches here irrespective of whether they are enabled or not. + cacheEntry.getKey().close(); } } @@ -212,13 +242,12 @@ public ImmutableCacheStatsHolder stats() { private Function, V> getValueFromTieredCache() { return key -> { try (ReleasableLock ignore = readLock.acquire()) { - for (ICache cache : cacheList) { - V value = cache.get(key); - if (value != null) { - // update hit stats - return value; - } else { - // update miss stats + for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + if (cacheEntry.getValue()) { + V value = cacheEntry.getKey().get(key); + if (value != null) { + return value; + } } } } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index dfd40199d859e..7105fddc9558b 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; /** @@ -54,6 +55,7 @@ public List> getSettings() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); + settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); } return settingList; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index b89e8c517a351..e8e441d6bd3a6 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -42,6 +42,14 @@ public class TieredSpilloverCacheSettings { (key) -> Setting.simpleString(key, "", NodeScope) ); + /** + * Setting to disable/enable disk cache dynamically. + */ + public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_CACHE_SETTING = Setting.suffixKeySetting( + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.enabled", + (key) -> Setting.boolSetting(key, true, NodeScope, Setting.Property.Dynamic) + ); + /** * Setting defining the minimum took time for a query to be allowed into the disk cache. */ @@ -63,17 +71,29 @@ public class TieredSpilloverCacheSettings { public static final Map> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; /** - * Fetches concrete took time policy settings. + * Stores disk cache enabled settings for various cache types as these are dynamic so that can be registered and + * retrieved accordingly. + */ + public static final Map> DISK_CACHE_ENABLED_SETTING_MAP; + + /** + * Fetches concrete took time policy and disk cache settings. */ static { Map> concreteTookTimePolicySettingMap = new HashMap<>(); + Map> diskCacheSettingMap = new HashMap<>(); for (CacheType cacheType : CacheType.values()) { concreteTookTimePolicySettingMap.put( cacheType, TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); + diskCacheSettingMap.put( + cacheType, + TIERED_SPILLOVER_DISK_CACHE_SETTING.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + ); } TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap; + DISK_CACHE_ENABLED_SETTING_MAP = diskCacheSettingMap; } /** diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index bf9f8fd22d793..1ecb63414dc68 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -42,6 +42,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; @@ -56,6 +57,7 @@ public void setup() { Settings settings = Settings.EMPTY; clusterSettings = new ClusterSettings(settings, new HashSet<>()); clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)); + clusterSettings.registerSetting(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE)); } public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception { @@ -302,6 +304,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { ) .build() ) + .setClusterSettings(clusterSettings) .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize); @@ -777,6 +780,7 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio ) .build() ) + .setClusterSettings(clusterSettings) .setDimensionNames(dimensionNames) .build(); TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() @@ -1008,6 +1012,116 @@ public void testMinimumThresholdSettingValue() throws Exception { assertEquals(validDuration, concreteSetting.get(validSettings)); } + public void testPutWithDiskCacheDisabledSetting() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int keyValueSize = 50; + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .put(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), false) + .build(), + 0 + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); // Create more items than onHeap cache. + for (int iter = 0; iter < numOfItems1; iter++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + LoadAwareCacheLoader, String> loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + ICache onHeapCache = tieredSpilloverCache.getOnHeapCache(); + ICache diskCache = tieredSpilloverCache.getDiskCache(); + assertEquals(onHeapCacheSize, onHeapCache.count()); + assertEquals(0, diskCache.count()); // Disk cache shouldn't have anything considering it is disabled. + assertEquals(numOfItems1 - onHeapCacheSize, removalListener.evictionsMetric.count()); + } + + public void testGetPutAndInvalidateWithDiskCacheDisabled() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int keyValueSize = 50; + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(), + 0 + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize - 1); // Create more items than onHeap + // cache to cause spillover. + for (int iter = 0; iter < numOfItems1; iter++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + LoadAwareCacheLoader, String> loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + ICache onHeapCache = tieredSpilloverCache.getOnHeapCache(); + ICache diskCache = tieredSpilloverCache.getDiskCache(); + List> diskCacheKeys = new ArrayList<>(); + tieredSpilloverCache.getDiskCache().keys().forEach(diskCacheKeys::add); + long actualDiskCacheCount = diskCache.count(); + long actualTieredCacheCount = tieredSpilloverCache.count(); + assertEquals(onHeapCacheSize, onHeapCache.count()); + assertEquals(numOfItems1 - onHeapCacheSize, actualDiskCacheCount); + assertEquals(0, removalListener.evictionsMetric.count()); + assertEquals(numOfItems1, actualTieredCacheCount); + for (ICacheKey diskKey : diskCacheKeys) { + assertNotNull(tieredSpilloverCache.get(diskKey)); + } + + tieredSpilloverCache.enableDisableDiskCache(false); // Disable disk cache now. + int numOfItems2 = totalSize - numOfItems1; + for (int iter = 0; iter < numOfItems2; iter++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + LoadAwareCacheLoader, String> loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + for (ICacheKey diskKey : diskCacheKeys) { + assertNull(tieredSpilloverCache.get(diskKey)); // Considering disk cache is disabled, we shouldn't find + // these keys. + } + assertEquals(onHeapCacheSize, onHeapCache.count()); // Should remain same. + assertEquals(0, diskCache.count() - actualDiskCacheCount); // Considering it is disabled now, shouldn't cache + // any more items. + assertEquals(numOfItems2, removalListener.evictionsMetric.count()); // Considering onHeap cache was already + // full, we should all existing onHeap entries being evicted. + assertEquals(0, tieredSpilloverCache.count() - actualTieredCacheCount); // Count still returns disk cache + // entries count as they haven't been cleared yet. + long lastKnownTieredCacheEntriesCount = tieredSpilloverCache.count(); + + // Clear up disk cache keys. + for (ICacheKey diskKey : diskCacheKeys) { + tieredSpilloverCache.invalidate(diskKey); + } + assertEquals(0, diskCache.count()); + assertEquals(lastKnownTieredCacheEntriesCount - diskCacheKeys.size(), tieredSpilloverCache.count()); + + tieredSpilloverCache.invalidateAll(); // Clear up all the keys. + assertEquals(0, tieredSpilloverCache.count()); + } + private List getMockDimensions() { List dims = new ArrayList<>(); for (String dimensionName : dimensionNames) { @@ -1121,6 +1235,7 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .put(settings) .build() ) + .setClusterSettings(clusterSettings) .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); From a9fc92e9f786758274034c87b86e944bda9105c5 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 24 Apr 2024 18:45:06 -0700 Subject: [PATCH 2/5] Putting tiered cache settings behind feature flag Signed-off-by: Sagar Upadhyaya --- .../tier/TieredSpilloverCachePlugin.java | 33 +++++++++++++------ .../tier/TieredSpilloverCachePluginTests.java | 11 ++++++- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 7105fddc9558b..0f92298c4dc2d 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -11,6 +11,8 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.Plugin; @@ -31,10 +33,15 @@ public class TieredSpilloverCachePlugin extends Plugin implements CachePlugin { */ public static final String TIERED_CACHE_SPILLOVER_PLUGIN_NAME = "tieredSpilloverCachePlugin"; + private final Settings settings; + /** * Default constructor + * @param settings settings */ - public TieredSpilloverCachePlugin() {} + public TieredSpilloverCachePlugin(Settings settings) { + this.settings = settings; + } @Override public Map getCacheFactoryMap() { @@ -47,15 +54,21 @@ public Map getCacheFactoryMap() { @Override public List> getSettings() { List> settingList = new ArrayList<>(); - for (CacheType cacheType : CacheType.values()) { - settingList.add( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) - ); - settingList.add( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) - ); - settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); - settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); + if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) { + for (CacheType cacheType : CacheType.values()) { + settingList.add( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( + cacheType.getSettingPrefix() + ) + ); + settingList.add( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( + cacheType.getSettingPrefix() + ) + ); + settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); + settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); + } } return settingList; } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java index 1172a48e97c6a..4a96ffe2069ec 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java @@ -9,6 +9,8 @@ package org.opensearch.cache.common.tier; import org.opensearch.common.cache.ICache; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchTestCase; import java.util.Map; @@ -16,9 +18,16 @@ public class TieredSpilloverCachePluginTests extends OpenSearchTestCase { public void testGetCacheFactoryMap() { - TieredSpilloverCachePlugin tieredSpilloverCachePlugin = new TieredSpilloverCachePlugin(); + TieredSpilloverCachePlugin tieredSpilloverCachePlugin = new TieredSpilloverCachePlugin(Settings.EMPTY); Map map = tieredSpilloverCachePlugin.getCacheFactoryMap(); assertNotNull(map.get(TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME)); assertEquals(TieredSpilloverCachePlugin.TIERED_CACHE_SPILLOVER_PLUGIN_NAME, tieredSpilloverCachePlugin.getName()); } + + public void testGetSettingsWithFeatureFlagOn() { + TieredSpilloverCachePlugin tieredSpilloverCachePlugin = new TieredSpilloverCachePlugin( + Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE_SETTING.getKey(), true).build() + ); + assertFalse(tieredSpilloverCachePlugin.getSettings().isEmpty()); + } } From 122cc22369134fe55fcdb595895837fad88768f6 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 24 Apr 2024 18:47:57 -0700 Subject: [PATCH 3/5] Adding a changelog Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22c3e0d787f58..5d5f2e7407b3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- [Tiered Caching] Expose a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) From 1a6ccf50ef0a30c06079260a97edded192c7a008 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 25 Apr 2024 16:53:52 -0700 Subject: [PATCH 4/5] Addressing Sorabh's comments Signed-off-by: Sagar Upadhyaya --- .../common/tier/TieredSpilloverCache.java | 33 ++++++++----------- .../tier/TieredSpilloverCachePlugin.java | 25 +++++--------- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index c9cb869b79ff5..34f17df751d7a 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -70,11 +70,9 @@ public class TieredSpilloverCache implements ICache { /** * Maintains caching tiers in ascending order of cache latency. */ - private final Map, Boolean> cacheList; + private final Map, Boolean> caches; private final List> policies; - private boolean isDiskCacheEnabled; - TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null"); @@ -87,7 +85,7 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification, V> notification) { try (ReleasableLock ignore = writeLock.acquire()) { - if (isDiskCacheEnabled + if (caches.get(diskCache) && SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()) && evaluatePolicies(notification.getValue())) { diskCache.put(notification.getKey(), notification.getValue()); @@ -111,11 +109,11 @@ && evaluatePolicies(notification.getValue())) { ); this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); - this.isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings()); + Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings()); LinkedHashMap, Boolean> cacheListMap = new LinkedHashMap<>(); cacheListMap.put(onHeapCache, true); - cacheListMap.put(diskCache, this.isDiskCacheEnabled); - this.cacheList = Collections.synchronizedMap(cacheListMap); + cacheListMap.put(diskCache, isDiskCacheEnabled); + this.caches = Collections.synchronizedMap(cacheListMap); this.dimensionNames = builder.cacheConfig.getDimensionNames(); this.policies = builder.policies; // Will never be null; builder initializes it to an empty list builder.cacheConfig.getClusterSettings() @@ -136,8 +134,7 @@ ICache getDiskCache() { void enableDisableDiskCache(Boolean isDiskCacheEnabled) { // When disk cache is disabled, we are not clearing up the disk cache entries yet as that should be part of // separate cache/clear API. - this.cacheList.put(diskCache, isDiskCacheEnabled); - this.isDiskCacheEnabled = isDiskCacheEnabled; + this.caches.put(diskCache, isDiskCacheEnabled); } @Override @@ -174,7 +171,7 @@ public void invalidate(ICacheKey key) { // Doing this as we don't know where it is located. We could do a get from both and check that, but what will // also count hits/misses stats, so ignoring it for now. try (ReleasableLock ignore = writeLock.acquire()) { - for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { cacheEntry.getKey().invalidate(key); } } @@ -183,7 +180,7 @@ public void invalidate(ICacheKey key) { @Override public void invalidateAll() { try (ReleasableLock ignore = writeLock.acquire()) { - for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { cacheEntry.getKey().invalidateAll(); } } @@ -197,7 +194,7 @@ public void invalidateAll() { @Override public Iterable> keys() { List>> iterableList = new ArrayList<>(); - for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { iterableList.add(cacheEntry.getKey().keys()); } Iterable>[] iterables = (Iterable>[]) iterableList.toArray(new Iterable[0]); @@ -207,7 +204,7 @@ public Iterable> keys() { @Override public long count() { long count = 0; - for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { // Count for all the tiers irrespective of whether they are enabled or not. As eventually // this will turn to zero once cache is cleared up either via invalidation or manually. count += cacheEntry.getKey().count(); @@ -218,17 +215,15 @@ public long count() { @Override public void refresh() { try (ReleasableLock ignore = writeLock.acquire()) { - for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { - if (cacheEntry.getValue()) { - cacheEntry.getKey().refresh(); - } + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + cacheEntry.getKey().refresh(); } } } @Override public void close() throws IOException { - for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { // Close all the caches here irrespective of whether they are enabled or not. cacheEntry.getKey().close(); } @@ -242,7 +237,7 @@ public ImmutableCacheStatsHolder stats() { private Function, V> getValueFromTieredCache() { return key -> { try (ReleasableLock ignore = readLock.acquire()) { - for (Map.Entry, Boolean> cacheEntry : cacheList.entrySet()) { + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { if (cacheEntry.getValue()) { V value = cacheEntry.getKey().get(key); if (value != null) { diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 0f92298c4dc2d..5d6aa6b2632e7 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -12,7 +12,6 @@ import org.opensearch.common.cache.ICache; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.Plugin; @@ -54,21 +53,15 @@ public Map getCacheFactoryMap() { @Override public List> getSettings() { List> settingList = new ArrayList<>(); - if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) { - for (CacheType cacheType : CacheType.values()) { - settingList.add( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( - cacheType.getSettingPrefix() - ) - ); - settingList.add( - TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( - cacheType.getSettingPrefix() - ) - ); - settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); - settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); - } + for (CacheType cacheType : CacheType.values()) { + settingList.add( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + ); + settingList.add( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + ); + settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); + settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); } return settingList; } From 3b296d4b3c57642acaf0b67741414f37be84216a Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 25 Apr 2024 18:00:23 -0700 Subject: [PATCH 5/5] Putting new setting behind feature flag Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 2 +- .../cache/common/tier/TieredSpilloverCachePlugin.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f1e37639acec..69f557cc3a872 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) -- [Tiered Caching] Expose a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373)) +- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373)) - [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) ### Dependencies diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 5d6aa6b2632e7..1c10e51630460 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -12,6 +12,7 @@ import org.opensearch.common.cache.ICache; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.Plugin; @@ -61,7 +62,9 @@ public List> getSettings() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); - settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); + if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) { + settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); + } } return settingList; }