From bebfc3ade7e8e0377435a046b4ce3521af0a7a67 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 8 Apr 2024 10:33:09 -0700 Subject: [PATCH] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 666 +++++------------- 1 file changed, 183 insertions(+), 483 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index fe7ee052e80e1..7796644ab7689 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -38,7 +38,6 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -49,7 +48,6 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.module.CacheModule; -import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; @@ -72,6 +70,8 @@ import org.opensearch.node.Node; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -87,31 +87,39 @@ import static org.mockito.Mockito.when; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { + private ThreadPool threadPool; + private IndexWriter writer; + private Directory dir; + private IndicesRequestCache cache; + private IndexShard indexShard; + private ThreadPool getThreadPool() { return new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default tracer tests").build()); } - public void testBasicOperationsCache() throws Exception { - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + @Before + public void setup() throws IOException { + dir = newDirectory(); + writer = new IndexWriter(dir, newIndexWriterConfig()); + indexShard = createIndex("test").getShard(0); + } + @After + public void cleanup() throws IOException { + IOUtils.close(writer, dir, cache); + terminate(threadPool); + } + + public void testBasicOperationsCache() throws Exception { + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -123,7 +131,7 @@ public void testBasicOperationsCache() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -149,33 +157,21 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testBasicOperationsCacheWithFeatureFlag() throws Exception { - IndexShard indexShard = createIndex("test").getShard(0); - CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - cacheService, - threadPool - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -187,7 +183,7 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -213,43 +209,28 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testCacheDifferentReaders() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + if (randomBoolean()) { writer.flush(); IOUtils.close(writer); writer = new IndexWriter(dir, newIndexWriterConfig()); } writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); ShardRequestCache requestCacheStats = entity.stats(); assertEquals("foo", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -264,7 +245,7 @@ public void testCacheDifferentReaders() throws Exception { // cache the second IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); - value = cache.getOrCompute(entity, loader, secondReader, termBytes); + value = cache.getOrCompute(entity, loader, secondReader, getTermBytes()); requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -277,7 +258,7 @@ public void testCacheDifferentReaders() throws Exception { secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); - value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + value = cache.getOrCompute(secondEntity, loader, secondReader, getTermBytes()); requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -288,7 +269,7 @@ public void testCacheDifferentReaders() throws Exception { entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(2, requestCacheStats.stats().getHitCount()); @@ -321,8 +302,7 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); assertEquals(0, cache.numRegisteredCloseListeners()); } @@ -350,38 +330,18 @@ public void testCacheCleanupThresholdSettingValidator_Invalid_Percentage() { } public void testCacheCleanupBasedOnZeroThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup @@ -393,43 +353,22 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { cache.cacheCleanupManager.cleanCache(); // cleanup should remove the stale-key assertEquals(1, cache.count()); - - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup @@ -445,44 +384,23 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); assertEquals(1, cache.count()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } // when a cache entry that is Stale is evicted for any reason, we have to deduct the count from our staleness count public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - ShardId shardId = new ShardId("foo", "bar", 1); - + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache from 2 different readers - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // assert no stale keys are accounted so far @@ -494,53 +412,29 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( // cache count should not be affected assertEquals(2, cache.count()); - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - ((IndexShard) secondEntity.getCacheIdentity()).shardId(), - termBytes, - getReaderCacheKeyId(reader) - ); + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); - cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); + cache.onRemoval(new RemovalNotification(key, getTermBytes(), RemovalReason.EVICTED)); // eviction of previous stale key from the cache should decrement staleKeysCount in iRC assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } // when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStaleCount() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup @@ -552,103 +446,59 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStal assertEquals(2, cache.count()); // evict entry from second reader (this reader is not closed) - String readerCacheKeyId = getReaderCacheKeyId(secondReader); - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - ((IndexShard) secondEntity.getCacheIdentity()).shardId(), - termBytes, - readerCacheKeyId - ); + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader)); - cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); + cache.onRemoval(new RemovalNotification(key, getTermBytes(), RemovalReason.EVICTED)); staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC assertEquals(1, staleKeysCount.get()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } // when a cache entry that is NOT Stale is evicted WITHOUT its reader closing, we should NOT deduct it from staleness count - public void testStaleCount_WithoutReaderClosing_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Exception { + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache from 2 different readers - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); + // no keys are stale assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); // create notification for removal of non-stale entry - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - ((IndexShard) secondEntity.getCacheIdentity()).shardId(), - termBytes, - getReaderCacheKeyId(reader) - ); - cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.EVICTED)); + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); + cache.onRemoval(new RemovalNotification(key, getTermBytes(), RemovalReason.EVICTED)); // stale keys count should stay zero assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); - IOUtils.close(reader, secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader, secondReader); } - public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + // test staleness count based on removal notifications + public void testStaleCount_OnRemovalNotifications() throws Exception { + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // no stale keys so far @@ -666,42 +516,23 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold( assertEquals(1, cache.count()); assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + cache = getIndicesRequestCache(settings, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup @@ -717,116 +548,31 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); assertEquals(2, cache.count()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); - } - - // test staleness count based on removal notifications - public void testStaleCount_OnRemovalNotifications() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - - // Get 5 entries into the cache - int totalKeys = 5; - IndicesService.IndexShardCacheEntity entity = null; - TermQueryBuilder termQuery = null; - BytesReference termBytes = null; - for (int i = 1; i <= totalKeys; i++) { - termQuery = new TermQueryBuilder("id", "" + i); - termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - assertEquals(i, cache.count()); - } - // no keys are stale yet - assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); - // closing the reader should make all keys stale - reader.close(); - assertEquals(totalKeys, cache.cacheCleanupManager.getStaleKeysCount().get()); - - String readerCacheKeyId = getReaderCacheKeyId(reader); - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - ((IndexShard) entity.getCacheIdentity()).shardId(), - termBytes, - readerCacheKeyId - ); - - int staleCount = cache.cacheCleanupManager.getStaleKeysCount().get(); - // Notification for Replaced should not deduct the staleCount - cache.onRemoval(new RemovalNotification(key, termBytes, RemovalReason.REPLACED)); - // stale keys count should stay the same - assertEquals(staleCount, cache.cacheCleanupManager.getStaleKeysCount().get()); - - // Notification for all but Replaced should deduct the staleCount - RemovalReason[] reasons = { RemovalReason.INVALIDATED, RemovalReason.EVICTED, RemovalReason.EXPLICIT, RemovalReason.CAPACITY }; - for (RemovalReason reason : reasons) { - cache.onRemoval(new RemovalNotification(key, termBytes, reason)); - assertEquals(--staleCount, cache.cacheCleanupManager.getStaleKeysCount().get()); - } - - IOUtils.close(writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } // test the cleanupKeyToCountMap are set appropriately when both readers are closed public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ShardId shardId = indexShard.shardId(); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (currentShardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - writer.addDocument(newDoc(0, "foo")); + cache = getIndicesRequestCache(settings, threadPool); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + writer.addDocument(newDoc(0, "foo")); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, shardId); + DirectoryReader secondReader = getReader(writer, shardId); // Get 2 entries into the cache from 2 different readers - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); // test the mappings - String readerCacheKeyId = getReaderCacheKeyId(reader); ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); - assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(readerCacheKeyId)); + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader))); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); // test the mapping - readerCacheKeyId = getReaderCacheKeyId(secondReader); assertEquals(2, cache.count()); - assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(readerCacheKeyId)); + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); // Close the reader, to create stale entries reader.close(); @@ -836,12 +582,10 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { assertEquals(2, cache.count()); // test the mapping - readerCacheKeyId = getReaderCacheKeyId(reader); - assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(readerCacheKeyId)); + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); // second reader's mapping should not be affected - readerCacheKeyId = getReaderCacheKeyId(secondReader); - assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(readerCacheKeyId)); + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); // Close the second reader secondReader.close(); @@ -851,11 +595,39 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { assertEquals(2, cache.count()); // test the mapping - readerCacheKeyId = getReaderCacheKeyId(secondReader); - assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(readerCacheKeyId)); + // since all the readers of this shard is closed, + // the cleanupKeyToCountMap should have no entries + assertEquals(0, cleanupKeyToCountMap.size()); + } - IOUtils.close(writer, dir, cache); - terminate(threadPool); + private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException { + return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + } + + private IndicesRequestCache getIndicesRequestCache(Settings settings, ThreadPool threadPool) { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + return new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); + } + + private Loader getLoader(DirectoryReader reader) { + return new Loader(reader, 0); + } + + private IndicesService.IndexShardCacheEntity getEntity(IndexShard indexShard) { + return new IndicesService.IndexShardCacheEntity(indexShard); + } + + private BytesReference getTermBytes() throws IOException { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + return XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); } private String getReaderCacheKeyId(DirectoryReader reader) { @@ -867,118 +639,73 @@ private String getReaderCacheKeyId(DirectoryReader reader) { public void testEviction() throws Exception { final ByteSizeValue size; { - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - + DirectoryReader reader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader secondLoader = new Loader(secondReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); - terminate(threadPool); } - IndexShard indexShard = createIndex("test1").getShard(0); - ThreadPool threadPool = getThreadPool(); + indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + dir = newDirectory(); + writer = new IndexWriter(dir, newIndexWriterConfig()); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - + DirectoryReader reader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader secondLoader = new Loader(secondReader, 0); - + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader thirdLoader = new Loader(thirdReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(getEntity(indexShard), getLoader(thirdReader), thirdReader, getTermBytes()); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); assertEquals(1, indexShard.requestCache().stats().getEvictions()); - IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader, secondReader, thirdReader); } public void testClearAllEntityIdentity() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); - - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); - DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader thirdReader = getReader(writer, indexShard.shardId()); + ; IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(createIndex("test1").getShard(0)); Loader thirdLoader = new Loader(thirdReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, getTermBytes()); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); RequestCacheStats requestCacheStats = entity.stats().stats(); @@ -989,14 +716,13 @@ public void testClearAllEntityIdentity() throws Exception { cache.cacheCleanupManager.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity - value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, getTermBytes()); requestCacheStats = entity.stats().stats(); requestCacheStats.add(thirddEntity.stats().stats()); assertEquals(hitCount + 1, requestCacheStats.getHitCount()); assertEquals("baz", value3.streamInput().readString()); - IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader, secondReader, thirdReader); } public Iterable newDoc(int id, String value) { @@ -1032,34 +758,18 @@ public BytesReference get() { throw new RuntimeException(e); } } - } public void testInvalidate() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + IndicesRequestCache cache = getIndicesRequestCache(Settings.EMPTY, threadPool); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = entity.stats(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -1071,7 +781,7 @@ public void testInvalidate() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -1085,8 +795,8 @@ public void testInvalidate() throws Exception { // load again after invalidate entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - cache.invalidate(entity, reader, termBytes); - value = cache.getOrCompute(entity, loader, reader, termBytes); + cache.invalidate(entity, reader, getTermBytes()); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -1111,16 +821,11 @@ public void testInvalidate() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testEqualsKey() throws IOException { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - Directory dir = newDirectory(); - IndexWriterConfig config = newIndexWriterConfig(); - IndexWriter writer = new IndexWriter(dir, config); ShardId shardId = new ShardId("foo", "bar", 1); ShardId shardId1 = new ShardId("foo1", "bar1", 2); IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); @@ -1147,13 +852,9 @@ public void testEqualsKey() throws IOException { } public void testSerializationDeserializationOfCacheKey() throws Exception { - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndexService indexService = createIndex("test"); - IndexShard indexShard = indexService.getShard(0); IndicesService.IndexShardCacheEntity shardCacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); String readerCacheKeyId = UUID.randomUUID().toString(); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), termBytes, readerCacheKeyId); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), readerCacheKeyId); BytesReference bytesReference = null; try (BytesStreamOutput out = new BytesStreamOutput()) { key1.writeTo(out); @@ -1165,8 +866,7 @@ public void testSerializationDeserializationOfCacheKey() throws Exception { assertEquals(readerCacheKeyId, key2.readerCacheKeyId); assertEquals(((IndexShard) shardCacheEntity.getCacheIdentity()).shardId(), key2.shardId); - assertEquals(termBytes, key2.value); - + assertEquals(getTermBytes(), key2.value); } private class TestBytesReference extends AbstractBytesReference {