Skip to content

Commit

Permalink
[Tiered Caching] Expose a dynamic setting to disable/enable disk cache (
Browse files Browse the repository at this point in the history
#13373)

* [Tiered Caching] Expose a dynamic setting to disable/enable disk cache

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Putting tiered cache settings behind feature flag

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Adding a changelog

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Addressing Sorabh's comments

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Putting new setting behind feature flag

Signed-off-by: Sagar Upadhyaya <[email protected]>

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: Sagar <[email protected]>
  • Loading branch information
sgup432 authored Apr 26, 2024
1 parent a69cd08 commit 81707dc
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,121 @@ public void testWithExplicitCacheClear() throws Exception {
}, 1, TimeUnit.SECONDS);
}

public void testWithDynamicDiskCacheSetting() throws Exception {
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
int numberOfIndexedItems = randomIntBetween(5, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
// Step 1: Hit some queries. We will see misses and queries will be cached(onto disk cache) for subsequent
// requests.
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}

RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(0, requestCacheStats.getHitCount());
assertEquals(0, requestCacheStats.getEvictions());

// Step 2: Hit same queries again. We will see hits now.
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
assertEquals(0, requestCacheStats.getEvictions());
long lastKnownHitCount = requestCacheStats.getHitCount();
long lastKnownMissCount = requestCacheStats.getMissCount();

// Step 3: Turn off disk cache now. And hit same queries again. We should not see hits now as all queries
// were cached onto disk cache.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), false)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());

for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); //
// Still shows disk cache entries as explicit clear or invalidation is required to clean up disk cache.
assertEquals(lastKnownMissCount + numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount()); // No new hits being seen.
lastKnownMissCount = requestCacheStats.getMissCount();
lastKnownHitCount = requestCacheStats.getHitCount();

// Step 4: Invalidate entries via refresh.
// Explicit refresh would invalidate cache entries.
refreshAndWaitForReplication();
assertBusy(() -> {
// Explicit refresh should clear up cache entries
assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0);
}, 1, TimeUnit.SECONDS);
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(0, lastKnownMissCount - requestCacheStats.getMissCount());
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount());
}

private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand All @@ -38,6 +39,8 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
Expand Down Expand Up @@ -67,20 +70,23 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
/**
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final Map<ICache<K, V>, Boolean> caches;
private final List<Predicate<V>> policies;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Objects.requireNonNull(builder.cacheConfig, "cache config can't be null");
Objects.requireNonNull(builder.cacheConfig.getClusterSettings(), "cluster settings can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
if (caches.get(diskCache)
&& SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
} else {
Expand All @@ -103,9 +109,15 @@ && evaluatePolicies(notification.getValue())) {

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);
Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings());
LinkedHashMap<ICache<K, V>, Boolean> cacheListMap = new LinkedHashMap<>();
cacheListMap.put(onHeapCache, true);
cacheListMap.put(diskCache, isDiskCacheEnabled);
this.caches = Collections.synchronizedMap(cacheListMap);
this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
builder.cacheConfig.getClusterSettings()
.addSettingsUpdateConsumer(DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType), this::enableDisableDiskCache);
}

// Package private for testing
Expand All @@ -118,6 +130,13 @@ ICache<K, V> getDiskCache() {
return diskCache;
}

// Package private for testing.
void enableDisableDiskCache(Boolean isDiskCacheEnabled) {
// When disk cache is disabled, we are not clearing up the disk cache entries yet as that should be part of
// separate cache/clear API.
this.caches.put(diskCache, isDiskCacheEnabled);
}

@Override
public V get(ICacheKey<K> key) {
return getValueFromTieredCache().apply(key);
Expand All @@ -132,7 +151,6 @@ public void put(ICacheKey<K> key, V value) {

@Override
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
Expand All @@ -151,19 +169,19 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
// also count hits/misses stats, so ignoring it for now.
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidate(key);
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().invalidate(key);
}
}
}

@Override
public void invalidateAll() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidateAll();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().invalidateAll();
}
}
}
Expand All @@ -175,32 +193,39 @@ public void invalidateAll() {
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<ICacheKey<K>> keys() {
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<ICacheKey<K>>(iterables);
List<Iterable<ICacheKey<K>>> iterableList = new ArrayList<>();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
iterableList.add(cacheEntry.getKey().keys());
}
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) iterableList.toArray(new Iterable<?>[0]);
return new ConcatenatedIterables<>(iterables);
}

@Override
public long count() {
long count = 0;
for (ICache<K, V> cache : cacheList) {
count += cache.count();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
// Count for all the tiers irrespective of whether they are enabled or not. As eventually
// this will turn to zero once cache is cleared up either via invalidation or manually.
count += cacheEntry.getKey().count();
}
return count;
}

@Override
public void refresh() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.refresh();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().refresh();
}
}
}

@Override
public void close() throws IOException {
for (ICache<K, V> cache : cacheList) {
cache.close();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
// Close all the caches here irrespective of whether they are enabled or not.
cacheEntry.getKey().close();
}
}

Expand All @@ -212,13 +237,12 @@ public ImmutableCacheStatsHolder stats() {
private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
V value = cache.get(key);
if (value != null) {
// update hit stats
return value;
} else {
// update miss stats
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
if (cacheEntry.getValue()) {
V value = cacheEntry.getKey().get(key);
if (value != null) {
return value;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
Expand All @@ -30,10 +33,15 @@ public class TieredSpilloverCachePlugin extends Plugin implements CachePlugin {
*/
public static final String TIERED_CACHE_SPILLOVER_PLUGIN_NAME = "tieredSpilloverCachePlugin";

private final Settings settings;

/**
* Default constructor
* @param settings settings
*/
public TieredSpilloverCachePlugin() {}
public TieredSpilloverCachePlugin(Settings settings) {
this.settings = settings;
}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
Expand All @@ -54,6 +62,9 @@ public List<Setting<?>> getSettings() {
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) {
settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType));
}
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public class TieredSpilloverCacheSettings {
(key) -> Setting.simpleString(key, "", NodeScope)
);

/**
* Setting to disable/enable disk cache dynamically.
*/
public static final Setting.AffixSetting<Boolean> TIERED_SPILLOVER_DISK_CACHE_SETTING = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.enabled",
(key) -> Setting.boolSetting(key, true, NodeScope, Setting.Property.Dynamic)
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
Expand All @@ -63,17 +71,29 @@ public class TieredSpilloverCacheSettings {
public static final Map<CacheType, Setting<TimeValue>> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Fetches concrete took time policy settings.
* Stores disk cache enabled settings for various cache types as these are dynamic so that can be registered and
* retrieved accordingly.
*/
public static final Map<CacheType, Setting<Boolean>> DISK_CACHE_ENABLED_SETTING_MAP;

/**
* Fetches concrete took time policy and disk cache settings.
*/
static {
Map<CacheType, Setting<TimeValue>> concreteTookTimePolicySettingMap = new HashMap<>();
Map<CacheType, Setting<Boolean>> diskCacheSettingMap = new HashMap<>();
for (CacheType cacheType : CacheType.values()) {
concreteTookTimePolicySettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
diskCacheSettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_CACHE_SETTING.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
}
TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap;
DISK_CACHE_ENABLED_SETTING_MAP = diskCacheSettingMap;
}

/**
Expand Down
Loading

0 comments on commit 81707dc

Please sign in to comment.