Skip to content

Commit

Permalink
Add ehcache related invalidation IT
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Mar 25, 2024
1 parent 02e9b84 commit 1869b35
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
Expand All @@ -17,11 +19,11 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.cache.store.disk.EhcacheDiskCache;
import org.opensearch.cache.store.disk.EhcacheThreadLeakFilter;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.NodeEnvironment;
Expand All @@ -43,12 +45,14 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_LISTENER_MODE_SYNC_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.greaterThan;

@ThreadLeakFilters(filters = { EhcacheThreadLeakFilter.class })
public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase {

@Override
Expand All @@ -66,14 +70,22 @@ protected Settings nodeSettings(int nodeOrdinal) {
try (NodeEnvironment env = newNodeEnvironment(Settings.EMPTY)) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_STORAGE_PATH_KEY)
.getKey(), env.nodePaths()[0].indicesPath.toString() +
"/request_cache")
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_STORAGE_PATH_KEY)
.getKey(),
env.nodePaths()[0].indicesPath.toString() + "/request_cache"
)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_LISTENER_MODE_SYNC_KEY)
.getKey(),
true
)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -134,7 +146,6 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
);
}


public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exception {
Client client = client();
assertAcked(
Expand All @@ -150,15 +161,16 @@ public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exce
)
.get()
);
int numberOfIndexedItems = 2;
int numberOfIndexedItems = randomIntBetween(2, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp =
client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator,
"hello" + iterator)).get();
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = client.admin()
Expand All @@ -168,40 +180,27 @@ public void testInvalidationAndCleanupLogicWithIndicesRequestCache() throws Exce
.get()
.getTotal()
.getRequestCache();
System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes()
+ " evictions = " + requestCacheStats.getEvictions());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(0, requestCacheStats.getHitCount());
assertEquals(0, requestCacheStats.getEvictions());
assertTrue(requestCacheStats.getMemorySizeInBytes() > 0);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp =
client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k" + iterator,
"hello" + iterator)).get();
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
//System.out.println(resp.toString());
requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes()
+ " evictions = " + requestCacheStats.getEvictions()); // Explicit refresh would invalidate cache
requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache();
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
// Explicit refresh would invalidate cache entries.
refreshAndWaitForReplication();
ClearIndicesCacheRequest request = new ClearIndicesCacheRequest("index");
ClearIndicesCacheResponse response = client.admin().indices().clearCache(request).get();
System.out.println("status of clear indices = " + response.getStatus().getStatus());
Thread.sleep(5000);
requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
System.out.println("hits = " + requestCacheStats.getHitCount() + " misses = " + requestCacheStats.getMissCount() + " size = " + requestCacheStats.getMemorySizeInBytes()
+ " evictions = " + requestCacheStats.getEvictions());
requestCacheStats = client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache();
assertEquals(0, requestCacheStats.getMemorySizeInBytes());
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ public void put(K key, V value) {
*/
@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
System.out.println("SAGARX here!!!!!");
// Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is
// not performant in case there are multiple concurrent request for same key. Below is our own custom
// implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ public void onMiss() {

public void onCached(Accountable key, BytesReference value) {
totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed());
System.out.println("key size = " + key.ramBytesUsed());
System.out.println("value size = " + value.ramBytesUsed());
System.out.println("Cached !!! = " + totalMetric.count() + " size = " + (key.ramBytesUsed() + value.ramBytesUsed()));
}

public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
Expand All @@ -81,8 +78,5 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
dec += value.ramBytesUsed();
}
totalMetric.dec(dec);
System.out.println("key size = " + key.ramBytesUsed());
System.out.println("value size = " + value.ramBytesUsed());
System.out.println("Removed !!! = " + totalMetric.count() + " size = " + dec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ public void close() throws IOException {
cache.invalidateAll();
cache.close();
cacheCleanupManager.close();
cache.close();
}

private double getStalenessThreshold(Settings settings) {
Expand Down

0 comments on commit 1869b35

Please sign in to comment.