Skip to content

Commit

Permalink
[Tiered Caching] Fixing flaky tiered cache test (opensearch-project#1…
Browse files Browse the repository at this point in the history
…2650)

* Fixing flaky tiered cache test

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Removing unnecessary comment

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Removing unused variable

Signed-off-by: Sagar Upadhyaya <[email protected]>

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 authored and Peter Alfonsi committed Aug 30, 2024
1 parent bb66699 commit 45fce15
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
removalListener.onRemoval(notification);
}
})
.setKeyType(builder.cacheConfig.getKeyType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand All @@ -23,9 +26,12 @@ public class MockDiskCache<K, V> implements ICache<K, V> {
int maxSize;
long delay;

public MockDiskCache(int maxSize, long delay) {
private final RemovalListener<K, V> removalListener;

public MockDiskCache(int maxSize, long delay, RemovalListener<K, V> removalListener) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<K, V>();
}

Expand All @@ -38,7 +44,7 @@ public V get(K key) {
@Override
public void put(K key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
return;
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
}
try {
Thread.sleep(delay);
Expand Down Expand Up @@ -101,7 +107,10 @@ public MockDiskCacheFactory(long delay, int maxSize) {

@Override
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
return new Builder<K, V>().setMaxSize(maxSize).setDeliberateDelay(delay).build();
return new Builder<K, V>().setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.build();
}

@Override
Expand All @@ -117,7 +126,7 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

@Override
public ICache<K, V> build() {
return new MockDiskCache<K, V>(this.maxSize, this.delay);
return new MockDiskCache<K, V>(this.maxSize, this.delay, this.getRemovalListener());
}

public Builder<K, V> setMaxSize(int maxSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = intializeTieredSpilloverCache(
onHeapCacheSize,
keyValueSize,
randomIntBetween(1, 4),
removalListener,
Settings.builder()
Expand Down Expand Up @@ -142,10 +142,6 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
LoadAwareCacheLoader<String, String> tieredCacheLoader = getLoadAwareCacheLoader();
tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader);
}
long actualDiskCacheSize = tieredSpilloverCache.getDiskCache().count();
assertEquals(actualDiskCacheSize, removalListener.evictionsMetric.count()); // Evictions from onHeap equal to
// disk cache size.

tieredSpilloverCache.getOnHeapCache().keys().forEach(onHeapKeys::add);
tieredSpilloverCache.getDiskCache().keys().forEach(diskTierKeys::add);

Expand Down Expand Up @@ -290,9 +286,6 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
LoadAwareCacheLoader<String, String> tieredCacheLoader = getLoadAwareCacheLoader();
tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader);
}
long actualDiskCacheSize = tieredSpilloverCache.getDiskCache().count();
assertEquals(actualDiskCacheSize, removalListener.evictionsMetric.count()); // Evictions from onHeap equal to
// disk cache size.

tieredSpilloverCache.getOnHeapCache().keys().forEach(onHeapKeys::add);
tieredSpilloverCache.getDiskCache().keys().forEach(diskTierKeys::add);
Expand Down Expand Up @@ -328,15 +321,15 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
}
}

public void testComputeIfAbsentWithEvictionsFromBothTier() throws Exception {
public void testComputeIfAbsentWithEvictionsFromTieredCache() throws Exception {
int onHeapCacheSize = randomIntBetween(10, 30);
int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100);
int totalSize = onHeapCacheSize + diskCacheSize;
int keyValueSize = 50;

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = intializeTieredSpilloverCache(
onHeapCacheSize,
keyValueSize,
diskCacheSize,
removalListener,
Settings.builder()
Expand All @@ -349,13 +342,13 @@ public void testComputeIfAbsentWithEvictionsFromBothTier() throws Exception {
.build(),
0
);

int numOfItems = randomIntBetween(totalSize + 1, totalSize * 3);
for (int iter = 0; iter < numOfItems; iter++) {
LoadAwareCacheLoader<String, String> tieredCacheLoader = getLoadAwareCacheLoader();
tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader);
}
assertTrue(removalListener.evictionsMetric.count() > 0);
int evictions = numOfItems - (totalSize);
assertEquals(evictions, removalListener.evictionsMetric.count());
}

public void testGetAndCount() throws Exception {
Expand All @@ -366,7 +359,7 @@ public void testGetAndCount() throws Exception {

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = intializeTieredSpilloverCache(
onHeapCacheSize,
keyValueSize,
diskCacheSize,
removalListener,
Settings.builder()
Expand Down Expand Up @@ -418,7 +411,7 @@ public void testPut() {

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = intializeTieredSpilloverCache(
onHeapCacheSize,
keyValueSize,
diskCacheSize,
removalListener,
Settings.builder()
Expand Down Expand Up @@ -519,7 +512,7 @@ public void testInvalidate() {

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = intializeTieredSpilloverCache(
onHeapCacheSize,
keyValueSize,
diskCacheSize,
removalListener,
Settings.builder()
Expand Down Expand Up @@ -744,7 +737,7 @@ public String load(String key) {
assertEquals(1, numberOfTimesKeyLoaded); // It should be loaded only once.
}

public void testConcurrencyForEvictionFlow() throws Exception {
public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exception {
int diskCacheSize = randomIntBetween(450, 800);

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Expand Down Expand Up @@ -828,7 +821,6 @@ public String load(String key) {
countDownLatch.await();
assertNotNull(actualValue.get());
countDownLatch1.await();
assertEquals(1, removalListener.evictionsMetric.count());
assertEquals(1, tieredSpilloverCache.getOnHeapCache().count());
assertEquals(1, onDiskCache.count());
assertNotNull(onDiskCache.get(keyToBeEvicted));
Expand Down Expand Up @@ -883,7 +875,6 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
.build()
)
.build();

ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize);

return new TieredSpilloverCache.Builder<String, String>().setCacheType(CacheType.INDICES_REQUEST_CACHE)
Expand Down

0 comments on commit 45fce15

Please sign in to comment.