From 61d26ccb84e36448d60956788b0f797274372146 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 18 Mar 2024 17:17:05 -0700 Subject: [PATCH 1/6] Ehcache IT tests Signed-off-by: Sagar Upadhyaya --- .../opensearch/cache/EhcacheDiskCacheIT.java | 159 ++++++++++++++++++ .../cache/store/disk/EhcacheDiskCache.java | 1 + .../indices/IndicesRequestCacheIT.java | 85 +++++++++- .../cache/request/ShardRequestCache.java | 2 + .../indices/IndicesRequestCache.java | 3 +- 5 files changed, 243 insertions(+), 7 deletions(-) diff --git a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java index c68455463ee3d..416ffe4d39e99 100644 --- a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java +++ b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java @@ -12,11 +12,30 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchType; +import org.opensearch.cache.store.disk.EhcacheDiskCache; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.IndicesRequestCache; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginInfo; +import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Assert; +import java.io.IOException; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -24,6 +43,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.hamcrest.Matchers.greaterThan; +import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY; +import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase { @Override @@ -31,6 +56,30 @@ protected Collection> nodePlugins() { return Arrays.asList(EhcacheCachePlugin.class); } + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + try (NodeEnvironment env = newNodeEnvironment(Settings.EMPTY)) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(DISK_STORAGE_PATH_KEY) + .getKey(), env.nodePaths()[0].indicesPath.toString() + + "/request_cache") + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public void testPluginsAreInstalled() { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); @@ -45,4 +94,114 @@ public void testPluginsAreInstalled() { pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.EhcacheCachePlugin")) ); } + + public void testSanityChecksWithIndicesRequestCache() throws InterruptedException { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("f", "type=date") + .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build()) + .get() + ); + indexRandom( + true, + client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"), + client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z") + ); + ensureSearchable("index"); + + // This is not a random example: serialization with time zones writes shared strings + // which used to not work well with the query cache because of the handles stream output + // see #9500 + final SearchResponse r1 = client.prepareSearch("index") + .setSize(0) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .addAggregation( + dateHistogram("histo").field("f") + .timeZone(ZoneId.of("+01:00")) + .minDocCount(0) + .dateHistogramInterval(DateHistogramInterval.MONTH) + ) + .get(); + assertSearchResponse(r1); + + // The cached is actually used + assertThat( + client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), + greaterThan(0L) + ); + } + + + public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + int numberOfIndexedItems = 2; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = + client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, + "hello" + iterator)).get(); + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() + + " evictions = " + requestCacheStats.getEvictions()); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = + client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, + "hello" + iterator)).get(); + assertSearchResponse(resp); + } + //System.out.println(resp.toString()); + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() + + " evictions = " + requestCacheStats.getEvictions()); // Explicit refresh would invalidate cache + refreshAndWaitForReplication(); + ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index"); + ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get(); + System.out.println("status of clear indices = " + response.getStatus().getStatus()); + Thread.sleep(5000); + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() + + " evictions = " + requestCacheStats.getEvictions()); + assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + } + + } diff --git a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java index d83a55e60fd2b..fba09dbd533bb 100644 --- a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java +++ b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java @@ -284,6 +284,7 @@ public void put(K key, V value) { */ @Override public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + System.out.println("SAGARX here!!!!!"); // Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is // not performant in case there are multiple concurrent request for same key. Below is our own custom // implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation. diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 52b4dad553180..b32664727ec87 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -35,6 +35,8 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.opensearch.action.admin.indices.alias.Alias; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; @@ -58,6 +60,7 @@ import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; @@ -76,12 +79,14 @@ public IndicesRequestCacheIT(Settings settings) { @ParametersFactory public static Collection parameters() { - return Arrays.asList( - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, - new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }, - new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() } - ); + return Collections.singleton(new Object[]{Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build()}); + +// return Arrays.asList( +// new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, +// new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, +// new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }, +// new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() } +// ); } @Override @@ -694,4 +699,72 @@ private static void assertCacheState(Client client, String index, long expectedH } + public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + int numberOfIndexedItems = 2; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = + client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, + "hello" + iterator)).get(); + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() + + " evictions = " + requestCacheStats.getEvictions()); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = + client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, + "hello" + iterator)).get(); + assertSearchResponse(resp); + } + //System.out.println(resp.toString()); + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() + + " evictions = " + requestCacheStats.getEvictions()); // Explicit refresh would invalidate cache + refreshAndWaitForReplication(); + ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index"); + ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get(); + System.out.println("status of clear indices = " + response.getStatus().getStatus()); + Thread.sleep(2000); + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() + + " evictions = " + requestCacheStats.getEvictions()); + assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + } + } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index bb35a09ccab46..ece14b130c56a 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -64,6 +64,7 @@ public void onMiss() { public void onCached(Accountable key, BytesReference value) { totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + System.out.println("Cached !!! = " + totalMetric.count() + " size = " + (key.ramBytesUsed() + value.ramBytesUsed())); } public void onRemoval(Accountable key, BytesReference value, boolean evicted) { @@ -78,5 +79,6 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) { dec += value.ramBytesUsed(); } totalMetric.dec(dec); + System.out.println("Removed !!! = " + totalMetric.count() + " size = " + dec); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 6f1181167b6b2..8789cbb414ab6 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -185,9 +185,10 @@ public final class IndicesRequestCache implements RemovalListener Date: Mon, 18 Mar 2024 17:48:36 -0700 Subject: [PATCH 2/6] Adding some logs to print key/value size Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/index/cache/request/ShardRequestCache.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index ece14b130c56a..e1cfbd74a0241 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -64,6 +64,8 @@ public void onMiss() { public void onCached(Accountable key, BytesReference value) { totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + System.out.println("key size = " + key.ramBytesUsed()); + System.out.println("value size = " + value.ramBytesUsed()); System.out.println("Cached !!! = " + totalMetric.count() + " size = " + (key.ramBytesUsed() + value.ramBytesUsed())); } @@ -79,6 +81,8 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) { dec += value.ramBytesUsed(); } totalMetric.dec(dec); + System.out.println("key size = " + key.ramBytesUsed()); + System.out.println("value size = " + value.ramBytesUsed()); System.out.println("Removed !!! = " + totalMetric.count() + " size = " + dec); } } From 1869b353ae455c38719b700ac347790740e8cddd Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 25 Mar 2024 10:36:14 -0700 Subject: [PATCH 3/6] Add ehcache related invalidation IT Signed-off-by: Sagar Upadhyaya --- .../opensearch/cache/EhcacheDiskCacheIT.java | 77 +++++++++---------- .../cache/store/disk/EhcacheDiskCache.java | 1 - .../cache/request/ShardRequestCache.java | 6 -- .../indices/IndicesRequestCache.java | 1 - 4 files changed, 38 insertions(+), 47 deletions(-) diff --git a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java index 416ffe4d39e99..bac69552cd31a 100644 --- a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java +++ b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java @@ -8,6 +8,8 @@ package org.opensearch.cache; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; @@ -17,11 +19,11 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; import org.opensearch.cache.store.disk.EhcacheDiskCache; +import org.opensearch.cache.store.disk.EhcacheThreadLeakFilter; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.settings.CacheSettings; -import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.NodeEnvironment; @@ -43,12 +45,14 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.hamcrest.Matchers.greaterThan; +import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_LISTENER_MODE_SYNC_KEY; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY; import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.greaterThan; +@ThreadLeakFilters(filters = { EhcacheThreadLeakFilter.class }) public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase { @Override @@ -66,14 +70,22 @@ protected Settings nodeSettings(int nodeOrdinal) { try (NodeEnvironment env = newNodeEnvironment(Settings.EMPTY)) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) - .get(DISK_STORAGE_PATH_KEY) - .getKey(), env.nodePaths()[0].indicesPath.toString() + - "/request_cache") + .put( + EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(DISK_STORAGE_PATH_KEY) + .getKey(), + env.nodePaths()[0].indicesPath.toString() + "/request_cache" + ) .put( CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME ) + .put( + EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(DISK_LISTENER_MODE_SYNC_KEY) + .getKey(), + true + ) .build(); } catch (IOException e) { throw new RuntimeException(e); @@ -134,7 +146,6 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio ); } - public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exception { Client client = client(); assertAcked( @@ -150,15 +161,16 @@ public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exce ) .get() ); - int numberOfIndexedItems = 2; + int numberOfIndexedItems = randomIntBetween(2, 10); for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); } ensureSearchable("index"); for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { - SearchResponse resp = - client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, - "hello" + iterator)).get(); + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); assertSearchResponse(resp); } RequestCacheStats requestCacheStats = client.admin() @@ -168,40 +180,27 @@ public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exce .get() .getTotal() .getRequestCache(); - System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() - + " evictions = " + requestCacheStats.getEvictions()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + assertTrue(requestCacheStats.getMemorySizeInBytes() > 0); for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { - SearchResponse resp = - client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, - "hello" + iterator)).get(); + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); assertSearchResponse(resp); } - //System.out.println(resp.toString()); - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() - + " evictions = " + requestCacheStats.getEvictions()); // Explicit refresh would invalidate cache + requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache(); + assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + // Explicit refresh would invalidate cache entries. refreshAndWaitForReplication(); ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index"); ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get(); - System.out.println("status of clear indices = " + response.getStatus().getStatus()); - Thread.sleep(5000); - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() - + " evictions = " + requestCacheStats.getEvictions()); + requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache(); assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); } - - } diff --git a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java index fb52ef9246e20..edb2c900be46c 100644 --- a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java +++ b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java @@ -290,7 +290,6 @@ public void put(K key, V value) { */ @Override public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { - System.out.println("SAGARX here!!!!!"); // Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is // not performant in case there are multiple concurrent request for same key. Below is our own custom // implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation. diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index e1cfbd74a0241..bb35a09ccab46 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -64,9 +64,6 @@ public void onMiss() { public void onCached(Accountable key, BytesReference value) { totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); - System.out.println("key size = " + key.ramBytesUsed()); - System.out.println("value size = " + value.ramBytesUsed()); - System.out.println("Cached !!! = " + totalMetric.count() + " size = " + (key.ramBytesUsed() + value.ramBytesUsed())); } public void onRemoval(Accountable key, BytesReference value, boolean evicted) { @@ -81,8 +78,5 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) { dec += value.ramBytesUsed(); } totalMetric.dec(dec); - System.out.println("key size = " + key.ramBytesUsed()); - System.out.println("value size = " + value.ramBytesUsed()); - System.out.println("Removed !!! = " + totalMetric.count() + " size = " + dec); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 4a937779376eb..34c8d6cf5e840 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -189,7 +189,6 @@ public void close() throws IOException { cache.invalidateAll(); cache.close(); cacheCleanupManager.close(); - cache.close(); } private double getStalenessThreshold(Settings settings) { From c387340f6c6986d8fe1307eb707a4ab630358184 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 25 Mar 2024 10:41:23 -0700 Subject: [PATCH 4/6] Remvoing unnecessary IndicesRequestCache IT Signed-off-by: Sagar Upadhyaya --- .../indices/IndicesRequestCacheIT.java | 87 ++----------------- 1 file changed, 6 insertions(+), 81 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index b32664727ec87..96961f9b40fe5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -35,8 +35,6 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.opensearch.action.admin.indices.alias.Alias; -import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; -import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; @@ -60,7 +58,6 @@ import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; @@ -79,14 +76,12 @@ public IndicesRequestCacheIT(Settings settings) { @ParametersFactory public static Collection parameters() { - return Collections.singleton(new Object[]{Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build()}); - -// return Arrays.asList( -// new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, -// new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, -// new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }, -// new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() } -// ); + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }, + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() } + ); } @Override @@ -696,75 +691,5 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); - - } - - public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exception { - Client client = client(); - assertAcked( - client.admin() - .indices() - .prepareCreate("index") - .setMapping("k", "type=keyword") - .setSettings( - Settings.builder() - .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - ) - .get() - ); - int numberOfIndexedItems = 2; - for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { - indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); - } - ensureSearchable("index"); - for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { - SearchResponse resp = - client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, - "hello" + iterator)).get(); - assertSearchResponse(resp); - } - RequestCacheStats requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() - + " evictions = " + requestCacheStats.getEvictions()); - for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { - SearchResponse resp = - client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator, - "hello" + iterator)).get(); - assertSearchResponse(resp); - } - //System.out.println(resp.toString()); - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() - + " evictions = " + requestCacheStats.getEvictions()); // Explicit refresh would invalidate cache - refreshAndWaitForReplication(); - ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index"); - ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get(); - System.out.println("status of clear indices = " + response.getStatus().getStatus()); - Thread.sleep(2000); - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes() - + " evictions = " + requestCacheStats.getEvictions()); - assertEquals(0, requestCacheStats.getMemorySizeInBytes()); } - } From ca047ff27f19cd18ba5fd0563f755107d8a15afd Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 25 Mar 2024 10:49:44 -0700 Subject: [PATCH 5/6] Indentation fix Signed-off-by: Sagar Upadhyaya --- .../java/org/opensearch/indices/IndicesRequestCacheIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 96961f9b40fe5..52b4dad553180 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -691,5 +691,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } + } From 87a49a4c890d58e12162c3fd35400793d094d3dc Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 12 Apr 2024 10:26:41 -0700 Subject: [PATCH 6/6] Added tests around expiration time and invalidation Signed-off-by: Sagar Upadhyaya --- plugins/cache-ehcache/build.gradle | 5 + .../opensearch/cache/EhcacheDiskCacheIT.java | 218 ++++++++++++++++-- .../cache/EhcacheDiskCacheSettings.java | 7 +- 3 files changed, 211 insertions(+), 19 deletions(-) diff --git a/plugins/cache-ehcache/build.gradle b/plugins/cache-ehcache/build.gradle index 65e7daaaacf26..4fc5e44f58c3a 100644 --- a/plugins/cache-ehcache/build.gradle +++ b/plugins/cache-ehcache/build.gradle @@ -95,3 +95,8 @@ test { // TODO: Adding permission in plugin-security.policy doesn't seem to work. systemProperty 'tests.security.manager', 'false' } + +internalClusterTest { + // TODO: Remove this later once we have a way. + systemProperty 'tests.security.manager', 'false' +} diff --git a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java index bac69552cd31a..909a493c0734f 100644 --- a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java +++ b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java @@ -16,6 +16,7 @@ import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; import org.opensearch.cache.store.disk.EhcacheDiskCache; @@ -25,6 +26,7 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.cache.request.RequestCacheStats; @@ -34,6 +36,7 @@ import org.opensearch.plugins.PluginInfo; import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.junit.Assert; import java.io.IOException; @@ -41,17 +44,25 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.cache.EhcacheDiskCacheSettings.DEFAULT_CACHE_SIZE_IN_BYTES; +import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_LISTENER_MODE_SYNC_KEY; +import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_MAX_SIZE_IN_BYTES_KEY; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY; +import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.greaterThan; +@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST) @ThreadLeakFilters(filters = { EhcacheThreadLeakFilter.class }) public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase { @@ -65,16 +76,17 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); } - @Override - protected Settings nodeSettings(int nodeOrdinal) { + private Settings defaultSettings(long sizeInBytes, TimeValue expirationTime) { + if (expirationTime == null) { + expirationTime = TimeValue.MAX_VALUE; + } try (NodeEnvironment env = newNodeEnvironment(Settings.EMPTY)) { return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) .put( EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(DISK_STORAGE_PATH_KEY) .getKey(), - env.nodePaths()[0].indicesPath.toString() + "/request_cache" + env.nodePaths()[0].indicesPath.toString() + "/" + UUID.randomUUID() + "/request_cache/" ) .put( CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), @@ -86,6 +98,18 @@ protected Settings nodeSettings(int nodeOrdinal) { .getKey(), true ) + .put( + EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(DISK_MAX_SIZE_IN_BYTES_KEY) + .getKey(), + sizeInBytes + ) + .put( + EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY) + .getKey(), + expirationTime + ) .build(); } catch (IOException e) { throw new RuntimeException(e); @@ -93,6 +117,7 @@ protected Settings nodeSettings(int nodeOrdinal) { } public void testPluginsAreInstalled() { + internalCluster().startNode(Settings.builder().put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null)).build()); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); @@ -108,13 +133,20 @@ public void testPluginsAreInstalled() { } public void testSanityChecksWithIndicesRequestCache() throws InterruptedException { + internalCluster().startNode(Settings.builder().put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null)).build()); Client client = client(); assertAcked( client.admin() .indices() .prepareCreate("index") .setMapping("f", "type=date") - .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build()) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .build() + ) .get() ); indexRandom( @@ -146,7 +178,13 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio ); } - public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exception { + public void testInvalidationWithIndicesRequestCache() throws Exception { + internalCluster().startNode( + Settings.builder() + .put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null)) + .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) + .build() + ); Client client = client(); assertAcked( client.admin() @@ -158,32 +196,36 @@ public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exce .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) ) .get() ); - int numberOfIndexedItems = randomIntBetween(2, 10); + int numberOfIndexedItems = randomIntBetween(5, 10); for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); } ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + long perQuerySizeInCacheInBytes = -1; for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { SearchResponse resp = client.prepareSearch("index") .setRequestCache(true) .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } assertSearchResponse(resp); } - RequestCacheStats requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); assertEquals(0, requestCacheStats.getHitCount()); assertEquals(0, requestCacheStats.getEvictions()); - assertTrue(requestCacheStats.getMemorySizeInBytes() > 0); + assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes()); for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { SearchResponse resp = client.prepareSearch("index") .setRequestCache(true) @@ -191,16 +233,156 @@ public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exce .get(); assertSearchResponse(resp); } - requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache(); + requestCacheStats = getRequestCacheStats(client, "index"); assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes()); + assertEquals(0, requestCacheStats.getEvictions()); // Explicit refresh would invalidate cache entries. refreshAndWaitForReplication(); + assertBusy(() -> { + // Explicit refresh should clear up cache entries + assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0); + }, 1, TimeUnit.SECONDS); + requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + // Hits and misses stats shouldn't get cleared up. + assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + } + + public void testExplicitCacheClearWithIndicesRequestCache() throws Exception { + internalCluster().startNode( + Settings.builder() + .put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, null)) + .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) + .build() + ); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + int numberOfIndexedItems = randomIntBetween(5, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + + long perQuerySizeInCacheInBytes = -1; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes()); + + // Explicit clear the cache. ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index"); ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get(); - requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache(); - assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + assertNoFailures(response); + + assertBusy(() -> { + // All entries should get cleared up. + assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0); + }, 1, TimeUnit.SECONDS); + } + + public void testEvictionsFlowWithExpirationTime() throws Exception { + internalCluster().startNode( + Settings.builder() + .put(defaultSettings(DEFAULT_CACHE_SIZE_IN_BYTES, new TimeValue(0))) // Immediately evict items after + // access + .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) + .build() + ); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + int numberOfIndexedItems = 2;// randomIntBetween(5, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + + long perQuerySizeInCacheInBytes = -1; + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(0, requestCacheStats.getHitCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(perQuerySizeInCacheInBytes * numberOfIndexedItems, requestCacheStats.getMemorySizeInBytes()); + assertEquals(0, requestCacheStats.getEvictions()); + + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + // Now that we have access the entries, they should expire after 1ms. So lets wait and verify that cache gets + // cleared up. + assertBusy(() -> { + // Explicit refresh should clear up cache entries + assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0); + }, 10, TimeUnit.MILLISECONDS); + // Validate hit and miss count. assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); } + + private RequestCacheStats getRequestCacheStats(Client client, String indexName) { + return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache(); + } } diff --git a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/EhcacheDiskCacheSettings.java b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/EhcacheDiskCacheSettings.java index 837fd6b268ce6..d173155080f6a 100644 --- a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/EhcacheDiskCacheSettings.java +++ b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/EhcacheDiskCacheSettings.java @@ -23,6 +23,11 @@ */ public class EhcacheDiskCacheSettings { + /** + * Default cache size in bytes ie 1gb. + */ + public static final long DEFAULT_CACHE_SIZE_IN_BYTES = 1073741824L; + /** * Ehcache disk write minimum threads for its pool * @@ -99,7 +104,7 @@ public class EhcacheDiskCacheSettings { */ public static final Setting.AffixSetting DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING = Setting.suffixKeySetting( EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_size_in_bytes", - (key) -> Setting.longSetting(key, 1073741824L, NodeScope) + (key) -> Setting.longSetting(key, DEFAULT_CACHE_SIZE_IN_BYTES, NodeScope) ); /**