From 32f83c32f993103959310f9cba6d581fa9c9c0fa Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 4 Mar 2024 20:15:19 -0800 Subject: [PATCH 1/8] Integrating IndicesRequestCache with CacheService controlled by a feature flag Signed-off-by: Sagar Upadhyaya --- modules/cache-common/build.gradle | 2 + .../TieredSpilloverCacheIT.java | 150 ++++++++++++++++++ .../opensearch/cache/EhcacheDiskCacheIT.java | 48 ++++++ .../indices/IndicesRequestCacheIT.java | 6 + .../common/cache/service/CacheService.java | 2 + .../common/cache/settings/CacheSettings.java | 3 +- .../common/settings/ClusterSettings.java | 6 +- .../common/settings/FeatureFlagSettings.java | 3 +- .../opensearch/common/util/FeatureFlags.java | 7 + .../indices/IndicesRequestCache.java | 49 ++++-- .../opensearch/indices/IndicesService.java | 6 +- .../main/java/org/opensearch/node/Node.java | 3 +- .../indices/IndicesRequestCacheTests.java | 16 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- 14 files changed, 276 insertions(+), 28 deletions(-) create mode 100644 modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java create mode 100644 plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java diff --git a/modules/cache-common/build.gradle b/modules/cache-common/build.gradle index c7052896e609b..98cdec83b9ad1 100644 --- a/modules/cache-common/build.gradle +++ b/modules/cache-common/build.gradle @@ -6,6 +6,8 @@ * compatible open source license. */ +apply plugin: 'opensearch.internal-cluster-test' + opensearchplugin { description 'Module for caches which are optional and do not require additional security permission' classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin' diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java new file mode 100644 index 0000000000000..1004dd073da56 --- /dev/null +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchType; +import org.opensearch.client.Client; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.plugins.CachePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.greaterThan; + +public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + MockOnDiskCache.MockDiskCacheFactory.NAME + ) + .build(); + } + + public void testPluginsAreInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream() + .anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin")) + ); + } + + public void testSanityChecksWithIndicesRequestCache() throws InterruptedException { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("f", "type=date") + .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build()) + .get() + ); + indexRandom( + true, + client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"), + client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z") + ); + ensureSearchable("index"); + + // This is not a random example: serialization with time zones writes shared strings + // which used to not work well with the query cache because of the handles stream output + // see #9500 + final SearchResponse r1 = client.prepareSearch("index") + .setSize(0) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .addAggregation( + dateHistogram("histo").field("f") + .timeZone(ZoneId.of("+01:00")) + .minDocCount(0) + .dateHistogramInterval(DateHistogramInterval.MONTH) + ) + .get(); + assertSearchResponse(r1); + + // The cached is actually used + assertThat( + client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), + greaterThan(0L) + ); + } + + public static class MockDiskCachePlugin extends Plugin implements CachePlugin { + + public MockDiskCachePlugin() {} + + @Override + public Map getCacheFactoryMap() { + return Map.of(MockOnDiskCache.MockDiskCacheFactory.NAME, new MockOnDiskCache.MockDiskCacheFactory(0, 1000)); + } + + @Override + public String getName() { + return "mock_disk_plugin"; + } + } +} diff --git a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java new file mode 100644 index 0000000000000..c68455463ee3d --- /dev/null +++ b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache; + +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(EhcacheCachePlugin.class); + } + + public void testPluginsAreInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.EhcacheCachePlugin")) + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 82577eb1501f3..6c6646b03792a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -81,6 +82,11 @@ public static Collection parameters() { ); } + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build(); + } + @Override protected boolean useRandomReplicationStrategy() { return true; diff --git a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java index c6e970b58ea08..9de6b77877698 100644 --- a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java +++ b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java @@ -8,6 +8,7 @@ package org.opensearch.common.cache.service; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.settings.CacheSettings; @@ -21,6 +22,7 @@ /** * Service responsible to create caches. */ +@ExperimentalApi public class CacheService { private final Map cacheStoreTypeFactories; diff --git a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java index eb4563fda2275..fbc5c9b2e74e4 100644 --- a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java +++ b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.settings.Setting; /** @@ -25,7 +26,7 @@ public class CacheSettings { */ public static final Setting.AffixSetting CACHE_TYPE_STORE_NAME = Setting.suffixKeySetting( "store.name", - (key) -> Setting.simpleString(key, "", Setting.Property.NodeScope) + (key) -> Setting.simpleString(key, OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, Setting.Property.NodeScope) ); public static Setting getConcreteSettingForCacheType(CacheType cacheType) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 896a234c115b6..e3f3f2d46d2ca 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -81,6 +81,8 @@ import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -729,6 +731,8 @@ public void apply(Settings value, Settings current, Settings previous) { TelemetrySettings.METRICS_PUBLISH_INTERVAL_SETTING, TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING, TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING - ) + ), + List.of(FeatureFlags.TIERED_CACHING), + List.of(CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE)) ); } diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 47da53b52c325..c217a4cfe4ac1 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -36,6 +36,7 @@ protected FeatureFlagSettings( FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING, FeatureFlags.DOC_ID_FUZZY_SET_SETTING, - FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING + FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, + FeatureFlags.TIERED_CACHING_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index b51efeab21254..3ab95af0da61d 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -64,6 +64,11 @@ public class FeatureFlags { */ public static final String DOC_ID_FUZZY_SET = "opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled"; + /** + * Gates the functionality of tiered caching + */ + public static final String TIERED_CACHING = "opensearch.experimental.feature.tiered.caching.enabled"; + /** * Should store the settings from opensearch.yml. */ @@ -128,4 +133,6 @@ public static boolean isEnabled(Setting featureFlag) { ); public static final Setting DOC_ID_FUZZY_SET_SETTING = Setting.boolSetting(DOC_ID_FUZZY_SET, false, Property.NodeScope); + + public static final Setting TIERED_CACHING_SETTING = Setting.boolSetting(TIERED_CACHING, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 6d5c23274dbd6..a0545a52ac394 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -39,16 +39,21 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.cache.Cache; -import org.opensearch.common.cache.CacheBuilder; -import org.opensearch.common.cache.CacheLoader; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.service.CacheService; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.cache.store.builders.ICacheBuilder; +import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -69,6 +74,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.function.ToLongBiFunction; /** * The indices request cache allows to cache a shard level request stage responses, helping with improving @@ -116,22 +122,35 @@ public final class IndicesRequestCache implements RemovalListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; - private final Cache cache; + private final ICache cache; private final Function> cacheEntityLookup; - IndicesRequestCache(Settings settings, Function> cacheEntityFunction) { + IndicesRequestCache(Settings settings, Function> cacheEntityFunction, CacheService cacheService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); - CacheBuilder cacheBuilder = CacheBuilder.builder() - .setMaximumWeight(sizeInBytes) - .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) - .removalListener(this); - if (expire != null) { - cacheBuilder.setExpireAfterAccess(expire); - } - cache = cacheBuilder.build(); + ToLongBiFunction weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); this.cacheEntityLookup = cacheEntityFunction; + if (FeatureFlags.TIERED_CACHING_SETTING.get(settings)) { + this.cache = cacheService.createCache( + new CacheConfig.Builder().setSettings(settings) + .setWeigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) + .setValueType(BytesReference.class) + .setKeyType(Key.class) + .setRemovalListener(this) + .build(), + CacheType.INDICES_REQUEST_CACHE + ); + } else { + ICacheBuilder builder = new OpenSearchOnHeapCache.Builder().setSettings(settings) + .setMaximumWeightInBytes(sizeInBytes) + .setWeigher(weigher) + .setRemovalListener(this); + if (expire != null) { + builder.setExpireAfterAccess(expire); + } + this.cache = builder.build(); + } } @Override @@ -204,7 +223,7 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade * * @opensearch.internal */ - private static class Loader implements CacheLoader { + private static class Loader implements LoadAwareCacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -403,7 +422,7 @@ synchronized void cleanCache() { /** * Returns the current size of the cache */ - int count() { + long count() { return cache.count(); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index c83f2a4c5cd5d..8151c151e3968 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -62,6 +62,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -395,7 +396,8 @@ public IndicesService( Supplier repositoriesServiceSupplier, SearchRequestStats searchRequestStats, @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - RecoverySettings recoverySettings + RecoverySettings recoverySettings, + CacheService cacheService ) { this.settings = settings; this.threadPool = threadPool; @@ -412,7 +414,7 @@ public IndicesService( return Optional.empty(); } return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), cacheService); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1b8e4de3abe47..67d6d924288eb 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -823,7 +823,8 @@ protected Node( repositoriesServiceReference::get, searchRequestStats, remoteStoreStatsTrackerFactory, - recoverySettings + recoverySettings, + cacheService ); final IngestService ingestService = new IngestService( diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 73728aec12e51..9e31bfcaa4a80 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,6 +46,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; @@ -80,7 +81,8 @@ public void testBasicOperationsCache() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + mock(CacheService.class) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -146,7 +148,7 @@ public void testCacheDifferentReaders() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), mock(CacheService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -247,7 +249,8 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + mock(CacheService.class) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -274,7 +277,8 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + mock(CacheService.class) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -319,7 +323,7 @@ public void testClearAllEntityIdentity() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), mock(CacheService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -414,7 +418,7 @@ public void testInvalidate() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), mock(CacheService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 7c50e961853b5..5f62ea113c903 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2072,7 +2072,8 @@ public void onFailure(final Exception e) { repositoriesServiceReference::get, null, new RemoteStoreStatsTrackerFactory(clusterService, settings), - DefaultRecoverySettings.INSTANCE + DefaultRecoverySettings.INSTANCE, + null ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( From f1b7094b50dc7f07aee027dd2a0bd6bad808bf89 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 5 Mar 2024 10:29:40 -0800 Subject: [PATCH 2/8] Adding changelog Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c45188ba07453..243ebeec00a65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163)) - Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626)) - Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063)) +- [Tiered caching] Integrating IndicesRequestCache with CacheService controlled by a feature flag ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) From 1cafda424074fe701f83f3d546043615edfcc26f Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 5 Mar 2024 11:55:59 -0800 Subject: [PATCH 3/8] Fixing gradle build issue Signed-off-by: Sagar Upadhyaya --- .../TieredSpilloverCacheIT.java | 4 +- .../cache/common/tier/MockDiskCache.java | 133 +++++++++++++++++ .../tier/TieredSpilloverCacheTests.java | 139 ++---------------- 3 files changed, 146 insertions(+), 130 deletions(-) create mode 100644 modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java index 1004dd073da56..88f4490c9ed10 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -73,7 +73,7 @@ protected Settings nodeSettings(int nodeOrdinal) { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), - MockOnDiskCache.MockDiskCacheFactory.NAME + MockDiskCache.MockDiskCacheFactory.NAME ) .build(); } @@ -139,7 +139,7 @@ public MockDiskCachePlugin() {} @Override public Map getCacheFactoryMap() { - return Map.of(MockOnDiskCache.MockDiskCacheFactory.NAME, new MockOnDiskCache.MockDiskCacheFactory(0, 1000)); + return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000)); } @Override diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java new file mode 100644 index 0000000000000..79b57b80c3aa0 --- /dev/null +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.store.builders.ICacheBuilder; +import org.opensearch.common.cache.store.config.CacheConfig; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class MockDiskCache implements ICache { + + Map cache; + int maxSize; + long delay; + + public MockDiskCache(int maxSize, long delay) { + this.maxSize = maxSize; + this.delay = delay; + this.cache = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + V value = cache.get(key); + return value; + } + + @Override + public void put(K key, V value) { + if (this.cache.size() >= maxSize) { // For simplification + return; + } + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.cache.put(key, value); + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) { + V value = cache.computeIfAbsent(key, key1 -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return value; + } + + @Override + public void invalidate(K key) { + this.cache.remove(key); + } + + @Override + public void invalidateAll() { + this.cache.clear(); + } + + @Override + public Iterable keys() { + return this.cache.keySet(); + } + + @Override + public long count() { + return this.cache.size(); + } + + @Override + public void refresh() {} + + @Override + public void close() { + + } + + public static class MockDiskCacheFactory implements Factory { + + public static final String NAME = "mockDiskCache"; + final long delay; + final int maxSize; + + public MockDiskCacheFactory(long delay, int maxSize) { + this.delay = delay; + this.maxSize = maxSize; + } + + @Override + public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { + return new Builder().setMaxSize(maxSize).setDeliberateDelay(delay).build(); + } + + @Override + public String getCacheName() { + return NAME; + } + } + + public static class Builder extends ICacheBuilder { + + int maxSize; + long delay; + + @Override + public ICache build() { + return new MockDiskCache(this.maxSize, this.delay); + } + + public Builder setMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public Builder setDeliberateDelay(long millis) { + this.delay = millis; + return this; + } + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 7c9569f5defe2..e6d982601cf50 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -14,7 +14,6 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; -import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; @@ -25,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -105,7 +103,7 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), - MockOnDiskCache.MockDiskCacheFactory.NAME + MockDiskCache.MockDiskCacheFactory.NAME ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) @@ -126,8 +124,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ); @@ -164,7 +162,7 @@ public void testWithFactoryCreationWithOnHeapCacheNotPresent() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), - MockOnDiskCache.MockDiskCacheFactory.NAME + MockDiskCache.MockDiskCacheFactory.NAME ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) @@ -187,8 +185,8 @@ public void testWithFactoryCreationWithOnHeapCacheNotPresent() { Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ) ); @@ -232,8 +230,8 @@ public void testWithFactoryCreationWithDiskCacheNotPresent() { Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ) ); @@ -266,7 +264,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { ) .build(); - ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(0, diskCacheSize); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize); TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() .setOnHeapCacheFactory(onHeapCacheFactory) @@ -741,7 +739,7 @@ public void testConcurrencyForEvictionFlow() throws Exception { MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); - ICache.Factory diskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(500, diskCacheSize); + ICache.Factory diskCacheFactory = new MockDiskCache.MockDiskCacheFactory(500, diskCacheSize); CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) .setKeyType(String.class) .setWeigher((k, v) -> 150) @@ -861,7 +859,7 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .setSettings(settings) .build(); - ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); return new TieredSpilloverCache.Builder().setCacheType(CacheType.INDICES_REQUEST_CACHE) .setRemovalListener(removalListener) @@ -871,118 +869,3 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .build(); } } - -class MockOnDiskCache implements ICache { - - Map cache; - int maxSize; - long delay; - - MockOnDiskCache(int maxSize, long delay) { - this.maxSize = maxSize; - this.delay = delay; - this.cache = new ConcurrentHashMap(); - } - - @Override - public V get(K key) { - V value = cache.get(key); - return value; - } - - @Override - public void put(K key, V value) { - if (this.cache.size() >= maxSize) { // For simplification - return; - } - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - this.cache.put(key, value); - } - - @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) { - V value = cache.computeIfAbsent(key, key1 -> { - try { - return loader.load(key); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - return value; - } - - @Override - public void invalidate(K key) { - this.cache.remove(key); - } - - @Override - public void invalidateAll() { - this.cache.clear(); - } - - @Override - public Iterable keys() { - return this.cache.keySet(); - } - - @Override - public long count() { - return this.cache.size(); - } - - @Override - public void refresh() {} - - @Override - public void close() { - - } - - public static class MockDiskCacheFactory implements Factory { - - static final String NAME = "mockDiskCache"; - final long delay; - final int maxSize; - - MockDiskCacheFactory(long delay, int maxSize) { - this.delay = delay; - this.maxSize = maxSize; - } - - @Override - public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { - return new Builder().setMaxSize(maxSize).setDeliberateDelay(delay).build(); - } - - @Override - public String getCacheName() { - return NAME; - } - } - - public static class Builder extends ICacheBuilder { - - int maxSize; - long delay; - - @Override - public ICache build() { - return new MockOnDiskCache(this.maxSize, this.delay); - } - - public Builder setMaxSize(int maxSize) { - this.maxSize = maxSize; - return this; - } - - public Builder setDeliberateDelay(long millis) { - this.delay = millis; - return this; - } - } -} From 0eff4a578a27a22e93bf12780b79d8de463ae049 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 5 Mar 2024 13:01:58 -0800 Subject: [PATCH 4/8] Fixing CacheService test Signed-off-by: Sagar Upadhyaya --- .../cache/service/CacheServiceTests.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java index 9d39f8a43ea58..3824476091f77 100644 --- a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; @@ -61,7 +62,24 @@ public void testWithCreateCacheWithNoStoreNamePresentForCacheType() { IllegalArgumentException.class, () -> cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE) ); - assertEquals("No configuration exists for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + assertEquals("No store name: [opensearch_onheap] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + } + + public void testWithCreateCacheWithDefaultStoreNameForIRC() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + Map factoryMap = Map.of("cache1", factory1); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + CacheModule cacheModule = new CacheModule(List.of(mockPlugin1), Settings.EMPTY); + CacheConfig config = mock(CacheConfig.class); + when(config.getSettings()).thenReturn(Settings.EMPTY); + when(config.getWeigher()).thenReturn((k, v) -> 100); + when(config.getRemovalListener()).thenReturn(mock(RemovalListener.class)); + + CacheService cacheService = cacheModule.getCacheService(); + ICache iCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertTrue(iCache instanceof OpenSearchOnHeapCache); } public void testWithCreateCacheWithInvalidStoreNameAssociatedForCacheType() { From 6af885c0db1b6d2a76553e6ad352c9236e067714 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 5 Mar 2024 14:29:39 -0800 Subject: [PATCH 5/8] Adding UT in IndicesRequestCache with feature flag for more coverage Signed-off-by: Sagar Upadhyaya --- .../indices/IndicesRequestCacheTests.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 9e31bfcaa4a80..e4f455c4f2b47 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,10 +46,12 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; @@ -68,6 +70,7 @@ import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Optional; import java.util.UUID; @@ -137,6 +140,67 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.numRegisteredCloseListeners()); } + public void testBasicOperationsCacheWithFeatureFlag() throws Exception { + IndexShard indexShard = createIndex("test").getShard(0); + CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build(), + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + cacheService + ); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + + // initial cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertFalse(loader.loadedFromCache); + assertEquals(1, cache.count()); + + // cache hit + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + value = cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(1, cache.count()); + assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); + assertEquals(1, cache.numRegisteredCloseListeners()); + + // Closing the cache doesn't modify an already returned CacheEntity + if (randomBoolean()) { + reader.close(); + } else { + indexShard.close("test", true, true); // closed shard but reader is still open + cache.clear(entity); + } + cache.cleanCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(0, cache.count()); + assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); + + IOUtils.close(reader, writer, dir, cache); + assertEquals(0, cache.numRegisteredCloseListeners()); + } + public void testCacheDifferentReaders() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); From 1f85b278bb6a1ab35317672d11522e3e8bd22e6e Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 6 Mar 2024 09:44:51 -0800 Subject: [PATCH 6/8] Updating changelog and renaming feature flag setting Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 2 +- distribution/src/config/opensearch.yml | 4 ++++ .../TieredSpilloverCacheIT.java | 2 +- .../java/org/opensearch/indices/IndicesRequestCacheIT.java | 2 +- .../org/opensearch/common/settings/ClusterSettings.java | 2 +- .../opensearch/common/settings/FeatureFlagSettings.java | 2 +- .../main/java/org/opensearch/common/util/FeatureFlags.java | 7 ++++--- .../java/org/opensearch/indices/IndicesRequestCache.java | 2 +- .../org/opensearch/indices/IndicesRequestCacheTests.java | 2 +- 9 files changed, 15 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 43b2771e23e2a..0a64c4ccf2545 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,7 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163)) - Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626)) - Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063)) -- [Tiered caching] Integrating IndicesRequestCache with CacheService controlled by a feature flag ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533)) +- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533)) - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) ### Dependencies diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index ebffdde0f3699..10bab9b3fce92 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -121,3 +121,7 @@ ${path.logs} # Once there is no observed impact on performance, this feature flag can be removed. # #opensearch.experimental.optimization.datetime_formatter_caching.enabled: false +# +# Gates the functionality of enabling Opensearch to use pluggable caches with respective store names via setting. +# +#opensearch.experimental.feature.pluggable.caching.enabled: false diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java index 88f4490c9ed10..7f6d838c8c230 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -52,7 +52,7 @@ protected Collection> nodePlugins() { @Override protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build(); + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); } @Override diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 6c6646b03792a..4146765c70a14 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -84,7 +84,7 @@ public static Collection parameters() { @Override protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build(); + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); } @Override diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index e3f3f2d46d2ca..12068869d5583 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -732,7 +732,7 @@ public void apply(Settings value, Settings current, Settings previous) { TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING, TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING ), - List.of(FeatureFlags.TIERED_CACHING), + List.of(FeatureFlags.PLUGGABLE_CACHE), List.of(CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE)) ); } diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index c217a4cfe4ac1..4cf7f22c014dd 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -37,6 +37,6 @@ protected FeatureFlagSettings( FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING, FeatureFlags.DOC_ID_FUZZY_SET_SETTING, FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, - FeatureFlags.TIERED_CACHING_SETTING + FeatureFlags.PLUGGABLE_CACHE_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 3ab95af0da61d..9e202a5bfd143 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -65,9 +65,10 @@ public class FeatureFlags { public static final String DOC_ID_FUZZY_SET = "opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled"; /** - * Gates the functionality of tiered caching + * Gates the functionality of pluggable cache. + * Enables OpenSearch to use pluggable caches with respective store names via setting. */ - public static final String TIERED_CACHING = "opensearch.experimental.feature.tiered.caching.enabled"; + public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled"; /** * Should store the settings from opensearch.yml. @@ -134,5 +135,5 @@ public static boolean isEnabled(Setting featureFlag) { public static final Setting DOC_ID_FUZZY_SET_SETTING = Setting.boolSetting(DOC_ID_FUZZY_SET, false, Property.NodeScope); - public static final Setting TIERED_CACHING_SETTING = Setting.boolSetting(TIERED_CACHING, false, Property.NodeScope); + public static final Setting PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index a0545a52ac394..fa766b3b4aa46 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -131,7 +131,7 @@ public final class IndicesRequestCache implements RemovalListener weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); this.cacheEntityLookup = cacheEntityFunction; - if (FeatureFlags.TIERED_CACHING_SETTING.get(settings)) { + if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) { this.cache = cacheService.createCache( new CacheConfig.Builder().setSettings(settings) .setWeigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index e4f455c4f2b47..41679dcbecccf 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -144,7 +144,7 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build(), + Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), cacheService ); From 4f58d5abae3297ef7dad53eb9ca7e2eb1f3e16d8 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 7 Mar 2024 12:57:58 -0800 Subject: [PATCH 7/8] Moving feature flag setting handling logic to CacheService by maintaining backward compatibility Signed-off-by: Sagar Upadhyaya --- .../TieredSpilloverCacheIT.java | 2 +- .../tier/TieredSpilloverCacheTests.java | 27 ++++++- .../indices/IndicesRequestCacheIT.java | 9 +-- .../common/cache/service/CacheService.java | 11 ++- .../common/cache/settings/CacheSettings.java | 5 +- .../cache/store/OpenSearchOnHeapCache.java | 21 +++++- .../cache/store/config/CacheConfig.java | 35 +++++++++ .../OpenSearchOnHeapCacheSettings.java | 19 ++++- .../common/settings/ClusterSettings.java | 2 +- .../indices/IndicesRequestCache.java | 32 +++----- .../cache/service/CacheServiceTests.java | 74 ++++++++++++++++--- .../indices/IndicesRequestCacheTests.java | 12 +-- 12 files changed, 192 insertions(+), 57 deletions(-) diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java index 7f6d838c8c230..568ac4d188c51 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -60,7 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put( - CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME ) .put( diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index e6d982601cf50..2f7938934300e 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -13,11 +13,13 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -254,6 +256,11 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { .setRemovalListener(removalListener) .setSettings( Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -442,6 +449,10 @@ public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception { diskCacheSize, removalListener, Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -746,6 +757,11 @@ public void testConcurrencyForEvictionFlow() throws Exception { .setRemovalListener(removalListener) .setSettings( Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -856,7 +872,16 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .setKeyType(String.class) .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) - .setSettings(settings) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 4146765c70a14..52b4dad553180 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -78,15 +78,12 @@ public IndicesRequestCacheIT(Settings settings) { public static Collection parameters() { return Arrays.asList( new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }, + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() } ); } - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); - } - @Override protected boolean useRandomReplicationStrategy() { return true; diff --git a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java index 9de6b77877698..b6710e5e4b424 100644 --- a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java +++ b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java @@ -12,9 +12,11 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import java.util.HashMap; import java.util.Map; @@ -44,8 +46,13 @@ public ICache createCache(CacheConfig config, CacheType cache cacheType.getSettingPrefix() ); String storeName = cacheSettingForCacheType.get(settings); - if (storeName == null || storeName.isBlank()) { - throw new IllegalArgumentException("No configuration exists for cache type: " + cacheType); + if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) { + // Condition 1: In case feature flag is off, we default to onHeap. + // Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older + // settings, so we again fallback to onHeap to maintain backward compatibility. + // It is guaranteed that we will have this store name registered, so + // should be safe. + storeName = OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME; } if (!cacheStoreTypeFactories.containsKey(storeName)) { throw new IllegalArgumentException("No store name: [" + storeName + "] is registered for cache type: " + cacheType); diff --git a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java index fbc5c9b2e74e4..43a047f0f22c6 100644 --- a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java +++ b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java @@ -10,7 +10,6 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; -import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.settings.Setting; /** @@ -26,10 +25,10 @@ public class CacheSettings { */ public static final Setting.AffixSetting CACHE_TYPE_STORE_NAME = Setting.suffixKeySetting( "store.name", - (key) -> Setting.simpleString(key, OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, Setting.Property.NodeScope) + (key) -> Setting.simpleString(key, "", Setting.Property.NodeScope) ); - public static Setting getConcreteSettingForCacheType(CacheType cacheType) { + public static Setting getConcreteStoreNameSettingForCacheType(CacheType cacheType) { return CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()); } } diff --git a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java index d218903de5b6d..c9bec4ba47def 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java @@ -15,15 +15,19 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeValue; import java.util.Map; +import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_KEY; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; /** @@ -111,9 +115,22 @@ public static class OpenSearchOnHeapCacheFactory implements Factory { public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { Map> settingList = OpenSearchOnHeapCacheSettings.getSettingListForCacheType(cacheType); Settings settings = config.getSettings(); - return new Builder().setMaximumWeightInBytes( + ICacheBuilder builder = new Builder().setMaximumWeightInBytes( ((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes() - ).setWeigher(config.getWeigher()).setRemovalListener(config.getRemovalListener()).build(); + ) + .setExpireAfterAccess(((TimeValue) settingList.get(EXPIRE_AFTER_ACCESS_KEY).get(settings))) + .setWeigher(config.getWeigher()) + .setRemovalListener(config.getRemovalListener()); + Setting cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace( + cacheType.getSettingPrefix() + ); + String storeName = cacheSettingForCacheType.get(settings); + if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) { + // For backward compatibility as the user intent is to use older settings. + builder.setMaximumWeightInBytes(config.getMaxSizeInBytes()); + builder.setExpireAfterAccess(config.getExpireAfterAccess()); + } + return builder.build(); } @Override diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 6fefea6578fb9..fa82e9be72e6e 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import java.util.function.ToLongBiFunction; @@ -41,12 +42,24 @@ public class CacheConfig { private final RemovalListener removalListener; + /** + * Max size in bytes for the cache. This is needed for backward compatibility. + */ + private final long maxSizeInBytes; + + /** + * Defines the expiration time for a cache entry. This is needed for backward compatibility. + */ + private final TimeValue expireAfterAccess; + private CacheConfig(Builder builder) { this.keyType = builder.keyType; this.valueType = builder.valueType; this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; + this.maxSizeInBytes = builder.maxSizeInBytes; + this.expireAfterAccess = builder.expireAfterAccess; } public Class getKeyType() { @@ -69,6 +82,14 @@ public ToLongBiFunction getWeigher() { return weigher; } + public Long getMaxSizeInBytes() { + return maxSizeInBytes; + } + + public TimeValue getExpireAfterAccess() { + return expireAfterAccess; + } + /** * Builder class to build Cache config related parameters. * @param Type of key. @@ -86,6 +107,10 @@ public static class Builder { private ToLongBiFunction weigher; + private long maxSizeInBytes; + + private TimeValue expireAfterAccess; + public Builder() {} public Builder setSettings(Settings settings) { @@ -113,6 +138,16 @@ public Builder setWeigher(ToLongBiFunction weigher) { return this; } + public Builder setMaxSizeInBytes(long sizeInBytes) { + this.maxSizeInBytes = sizeInBytes; + return this; + } + + public Builder setExpireAfterAccess(TimeValue expireAfterAccess) { + this.expireAfterAccess = expireAfterAccess; + return this; + } + public CacheConfig build() { return new CacheConfig<>(this); } diff --git a/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java b/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java index bfd2d937fb430..5a2964ad011bf 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java +++ b/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java @@ -11,6 +11,7 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeValue; import java.util.HashMap; @@ -33,9 +34,25 @@ public class OpenSearchOnHeapCacheSettings { (key) -> Setting.memorySizeSetting(key, "1%", NodeScope) ); + /** + * Setting to define expire after access. + * + * Setting pattern: {cache_type}.opensearch_onheap.expire + */ + public static final Setting.AffixSetting EXPIRE_AFTER_ACCESS_SETTING = Setting.suffixKeySetting( + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ".expire", + (key) -> Setting.positiveTimeSetting(key, TimeValue.MAX_VALUE, Setting.Property.NodeScope) + ); + public static final String MAXIMUM_SIZE_IN_BYTES_KEY = "maximum_size_in_bytes"; + public static final String EXPIRE_AFTER_ACCESS_KEY = "expire_after_access"; - private static final Map> KEY_SETTING_MAP = Map.of(MAXIMUM_SIZE_IN_BYTES_KEY, MAXIMUM_SIZE_IN_BYTES); + private static final Map> KEY_SETTING_MAP = Map.of( + MAXIMUM_SIZE_IN_BYTES_KEY, + MAXIMUM_SIZE_IN_BYTES, + EXPIRE_AFTER_ACCESS_KEY, + EXPIRE_AFTER_ACCESS_SETTING + ); public static final Map>> CACHE_TYPE_MAP = getCacheTypeMap(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 12068869d5583..8177caf12cfbc 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -733,6 +733,6 @@ public void apply(Settings value, Settings current, Settings previous) { TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING ), List.of(FeatureFlags.PLUGGABLE_CACHE), - List.of(CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE)) + List.of(CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE)) ); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index fa766b3b4aa46..92fb278c946f1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -45,15 +45,12 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.service.CacheService; -import org.opensearch.common.cache.store.OpenSearchOnHeapCache; -import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -131,26 +128,17 @@ public final class IndicesRequestCache implements RemovalListener weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); this.cacheEntityLookup = cacheEntityFunction; - if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) { - this.cache = cacheService.createCache( - new CacheConfig.Builder().setSettings(settings) - .setWeigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) - .setValueType(BytesReference.class) - .setKeyType(Key.class) - .setRemovalListener(this) - .build(), - CacheType.INDICES_REQUEST_CACHE - ); - } else { - ICacheBuilder builder = new OpenSearchOnHeapCache.Builder().setSettings(settings) - .setMaximumWeightInBytes(sizeInBytes) + this.cache = cacheService.createCache( + new CacheConfig.Builder().setSettings(settings) .setWeigher(weigher) - .setRemovalListener(this); - if (expire != null) { - builder.setExpireAfterAccess(expire); - } - this.cache = builder.build(); - } + .setValueType(BytesReference.class) + .setKeyType(Key.class) + .setRemovalListener(this) + .setMaxSizeInBytes(sizeInBytes) // for backward compatibility + .setExpireAfterAccess(expire) // for backward compatibility + .build(), + CacheType.INDICES_REQUEST_CACHE + ); } @Override diff --git a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java index 3824476091f77..b355161f6f310 100644 --- a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java @@ -17,13 +17,13 @@ import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.plugins.CachePlugin; import org.opensearch.test.OpenSearchTestCase; import java.util.List; import java.util.Map; -import static junit.framework.TestCase.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -34,22 +34,72 @@ public class CacheServiceTests extends OpenSearchTestCase { public void testWithCreateCacheForIndicesRequestCacheType() { CachePlugin mockPlugin1 = mock(CachePlugin.class); ICache.Factory factory1 = mock(ICache.Factory.class); - Map factoryMap = Map.of("cache1", factory1); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); - Setting indicesRequestCacheSetting = CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); - - CacheModule cacheModule = new CacheModule( - List.of(mockPlugin1), + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + CacheService cacheService = new CacheService( + factoryMap, Settings.builder().put(indicesRequestCacheSetting.getKey(), "cache1").build() ); CacheConfig config = mock(CacheConfig.class); - ICache onHeapCache = mock(OpenSearchOnHeapCache.class); - when(factory1.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(onHeapCache); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(onHeapCacheFactory.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); - CacheService cacheService = cacheModule.getCacheService(); ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); - assertEquals(onHeapCache, ircCache); + assertEquals(mockOnHeapCache, ircCache); + } + + public void testWithCreateCacheForIndicesRequestCacheTypeWithFeatureFlagTrue() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + CacheService cacheService = new CacheService( + factoryMap, + Settings.builder().put(indicesRequestCacheSetting.getKey(), "cache1").put(FeatureFlags.PLUGGABLE_CACHE, "true").build() + ); + CacheConfig config = mock(CacheConfig.class); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(factory1.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); + + ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertEquals(mockOnHeapCache, ircCache); + } + + public void testWithCreateCacheForIndicesRequestCacheTypeWithFeatureFlagTrueAndStoreNameIsNull() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + CacheService cacheService = new CacheService(factoryMap, Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build()); + CacheConfig config = mock(CacheConfig.class); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(onHeapCacheFactory.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); + + ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertEquals(mockOnHeapCache, ircCache); } public void testWithCreateCacheWithNoStoreNamePresentForCacheType() { @@ -84,7 +134,7 @@ public void testWithCreateCacheWithDefaultStoreNameForIRC() { public void testWithCreateCacheWithInvalidStoreNameAssociatedForCacheType() { ICache.Factory factory1 = mock(ICache.Factory.class); - Setting indicesRequestCacheSetting = CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); Map factoryMap = Map.of("cache1", factory1); CacheService cacheService = new CacheService( factoryMap, @@ -99,6 +149,6 @@ public void testWithCreateCacheWithInvalidStoreNameAssociatedForCacheType() { IllegalArgumentException.class, () -> cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE) ); - assertEquals("No store name: [cache] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + assertEquals("No store name: [opensearch_onheap] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 41679dcbecccf..b9cbbb2c65162 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -85,7 +85,7 @@ public void testBasicOperationsCache() throws Exception { IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - mock(CacheService.class) + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -212,7 +212,7 @@ public void testCacheDifferentReaders() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), mock(CacheService.class)); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -314,7 +314,7 @@ public void testEviction() throws Exception { IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - mock(CacheService.class) + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -342,7 +342,7 @@ public void testEviction() throws Exception { IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - mock(CacheService.class) + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -387,7 +387,7 @@ public void testClearAllEntityIdentity() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), mock(CacheService.class)); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -482,7 +482,7 @@ public void testInvalidate() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), mock(CacheService.class)); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); From b046c6f76ba476ad566fff91a80cd246753f9fc6 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 7 Mar 2024 14:08:45 -0800 Subject: [PATCH 8/8] Fixing broken UTs Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/snapshots/SnapshotResiliencyTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 5f62ea113c903..33f3577e15c52 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -157,6 +157,7 @@ import org.opensearch.common.CheckedConsumer; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; +import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; @@ -239,6 +240,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -2073,7 +2075,7 @@ public void onFailure(final Exception e) { null, new RemoteStoreStatsTrackerFactory(clusterService, settings), DefaultRecoverySettings.INSTANCE, - null + new CacheModule(new ArrayList<>(), settings).getCacheService() ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService(