Skip to content

Commit

Permalink
Adding javadoc and more test for cache.put
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Jan 3, 2024
1 parent 7c10adf commit 1832df9
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public V getValue() {
return value;
}

public CacheStoreType getSource() {
public CacheStoreType getCacheStoreType() {
return source;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void put(K key, V value) {
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
// We are skipping calling event listeners at this step as we do another get inside below computeIfAbsent.
// Where we might end up calling onMiss twice for a key not present in onHeap cache.
// Similary we might end up calling both onMiss and onHit for a key, in case we are received concurrent
// Similary we might end up calling both onMiss and onHit for a key, in case we are receiving concurrent
// requests for the same key which requires loading only once.
StoreAwareCacheValue<V> cacheValue = getValueFromTieredCache(false).apply(key);
if (cacheValue == null) {
Expand All @@ -117,8 +117,8 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
}
return value;
}
listener.onHit(key, cacheValue.getValue(), cacheValue.getSource());
if (cacheValue.getSource().equals(CacheStoreType.DISK)) {
listener.onHit(key, cacheValue.getValue(), cacheValue.getCacheStoreType());
if (cacheValue.getCacheStoreType().equals(CacheStoreType.DISK)) {
listener.onMiss(key, CacheStoreType.ON_HEAP);
}
return cacheValue.getValue();
Expand All @@ -145,6 +145,10 @@ public void invalidateAll() {
}
}

/**
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
* @return An iterable over (onHeap + disk) keys
*/
@Override
public Iterable<K> keys() {
Iterable<K> onDiskKeysIterable;
Expand All @@ -167,11 +171,19 @@ public long count() {

@Override
public void refresh() {
for (StoreAwareCache<K, V> storeAwareCache : cacheList) {
storeAwareCache.refresh();
try (ReleasableLock ignore = writeLock.acquire()) {
for (StoreAwareCache<K, V> storeAwareCache : cacheList) {
storeAwareCache.refresh();
}
}
}

/**
* Provides an iteration over keys based on desired on cacheStoreType. This is not protected from any mutations
* to the cache.
* @param type Type of cacheStoreType
* @return An iterable over desired CacheStoreType keys
*/
@Override
public Iterable<K> cacheKeys(CacheStoreType type) {
switch (type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ public void testWithDiskTierNull() throws Exception {
int onHeapCacheSize = randomIntBetween(10, 30);
MockCacheEventListener<String, String> eventListener = new MockCacheEventListener<String, String>();

StoreAwareCacheBuilder<String, String> onHeapCacheBuilder = new MockOnHeapCache.Builder<String, String>().setMaxSize(
onHeapCacheSize
);
StoreAwareCacheBuilder<String, String> onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder<String, String>()
.setMaximumWeightInBytes(onHeapCacheSize * 20)
.setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries
TieredSpilloverCache<String, String> tieredSpilloverCache = new TieredSpilloverCache.Builder<String, String>()
.setOnHeapCacheBuilder(onHeapCacheBuilder)
.setListener(eventListener)
Expand Down Expand Up @@ -259,6 +259,69 @@ public void testPut() {
assertEquals(1, tieredSpilloverCache.count());
}

public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception {
int onHeapCacheSize = randomIntBetween(200, 400);
int diskCacheSize = randomIntBetween(450, 800);

MockCacheEventListener<String, String> eventListener = new MockCacheEventListener<>();

TieredSpilloverCache<String, String> tieredSpilloverCache = intializeTieredSpilloverCache(
onHeapCacheSize,
diskCacheSize,
eventListener,
0
);

for (int i = 0; i < onHeapCacheSize; i++) {
tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), new LoadAwareCacheLoader<>() {
@Override
public boolean isLoaded() {
return false;
}

@Override
public String load(String key) throws Exception {
return UUID.randomUUID().toString();
}
});
}

assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count());
assertEquals(0, tieredSpilloverCache.getOnDiskCache().get().count());

// Again try to put OnHeap cache capacity amount of new items.
List<String> newKeyList = new ArrayList<>();
for (int i = 0; i < onHeapCacheSize; i++) {
newKeyList.add(UUID.randomUUID().toString());
}

for (int i = 0; i < newKeyList.size(); i++) {
tieredSpilloverCache.computeIfAbsent(newKeyList.get(i), new LoadAwareCacheLoader<>() {
@Override
public boolean isLoaded() {
return false;
}

@Override
public String load(String key) {
return UUID.randomUUID().toString();
}
});
}

// Verify that new items are part of onHeap cache.
List<String> actualOnHeapCacheKeys = new ArrayList<>();
tieredSpilloverCache.cacheKeys(CacheStoreType.ON_HEAP).forEach(actualOnHeapCacheKeys::add);

assertEquals(newKeyList.size(), actualOnHeapCacheKeys.size());
for (int i = 0; i < actualOnHeapCacheKeys.size(); i++) {
assertTrue(newKeyList.contains(actualOnHeapCacheKeys.get(i)));
}

assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count());
assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnDiskCache().get().count());
}

public void testInvalidate() {
int onHeapCacheSize = 1;
int diskCacheSize = 10;
Expand Down Expand Up @@ -288,7 +351,7 @@ public void testInvalidate() {
tieredSpilloverCache.put(key2, UUID.randomUUID().toString());
assertEquals(2, tieredSpilloverCache.count());
// Again invalidate older key
tieredSpilloverCache.invalidate(key2);
tieredSpilloverCache.invalidate(key);
assertEquals(1, eventListener.enumMap.get(CacheStoreType.DISK).invalidationMetric.count());
assertEquals(1, tieredSpilloverCache.count());
}
Expand All @@ -305,21 +368,22 @@ public void testCacheKeys() throws Exception {
eventListener,
0
);
// Put values in cache more than it's size and cause evictions from onHeap.
int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize);
List<String> onHeapKeys = new ArrayList<>();
List<String> diskTierKeys = new ArrayList<>();
for (int iter = 0; iter < numOfItems1; iter++) {
// During first round add onHeapCacheSize entries. Will go to onHeap cache initially.
for (int i = 0; i < onHeapCacheSize; i++) {
String key = UUID.randomUUID().toString();
if (iter > (onHeapCacheSize - 1)) {
// All these are bound to go to disk based cache.
diskTierKeys.add(key);
} else {
onHeapKeys.add(key);
}
LoadAwareCacheLoader<String, String> tieredCacheLoader = getLoadAwareCacheLoader();
tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader);
diskTierKeys.add(key);
tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader());
}
// In another round, add another onHeapCacheSize entries. These will go to onHeap and above ones will be
// evicted to onDisk cache.
for (int i = 0; i < onHeapCacheSize; i++) {
String key = UUID.randomUUID().toString();
onHeapKeys.add(key);
tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader());
}

List<String> actualOnHeapKeys = new ArrayList<>();
List<String> actualOnDiskKeys = new ArrayList<>();
Iterable<String> onHeapiterable = tieredSpilloverCache.cacheKeys(CacheStoreType.ON_HEAP);
Expand Down Expand Up @@ -607,9 +671,9 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
) {
StoreAwareCacheBuilder<String, String> diskCacheBuilder = new MockOnDiskCache.Builder<String, String>().setMaxSize(diksCacheSize)
.setDeliberateDelay(diskDeliberateDelay);
StoreAwareCacheBuilder<String, String> onHeapCacheBuilder = new MockOnHeapCache.Builder<String, String>().setMaxSize(
onHeapCacheSize
);
StoreAwareCacheBuilder<String, String> onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder<String, String>()
.setMaximumWeightInBytes(onHeapCacheSize * 20)
.setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries
return new TieredSpilloverCache.Builder<String, String>().setOnHeapCacheBuilder(onHeapCacheBuilder)
.setOnDiskCacheBuilder(diskCacheBuilder)
.setListener(eventListener)
Expand Down Expand Up @@ -728,110 +792,3 @@ public Builder<K, V> setDeliberateDelay(long millis) {
}
}
}

class MockOnHeapCache<K, V> implements StoreAwareCache<K, V> {

Map<K, V> cache;
int maxSize;
StoreAwareCacheEventListener<K, V> eventListener;

MockOnHeapCache(int size, StoreAwareCacheEventListener<K, V> eventListener) {
maxSize = size;
this.cache = new ConcurrentHashMap<K, V>();
this.eventListener = eventListener;
}

@Override
public V get(K key) {
V value = cache.get(key);
if (value != null) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void put(K key, V value) {
if (this.cache.size() >= maxSize) {
eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, value, RemovalReason.EVICTED, CacheStoreType.ON_HEAP));
return;
}
this.cache.put(key, value);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
if (this.cache.size() >= maxSize) { // If it exceeds, just notify for evict.
eventListener.onRemoval(
new StoreAwareCacheRemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, CacheStoreType.ON_HEAP)
);
return loader.load(key);
}
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
if (!loader.isLoaded()) {
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void invalidate(K key) {
if (this.cache.containsKey(key)) {
eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, null, RemovalReason.INVALIDATED, CacheStoreType.ON_HEAP));
}
this.cache.remove(key);

}

@Override
public void invalidateAll() {
this.cache.clear();
}

@Override
public Iterable<K> keys() {
return this.cache.keySet();
}

@Override
public long count() {
return this.cache.size();
}

@Override
public void refresh() {

}

@Override
public CacheStoreType getTierType() {
return CacheStoreType.ON_HEAP;
}

public static class Builder<K, V> extends StoreAwareCacheBuilder<K, V> {

int maxSize;

@Override
public StoreAwareCache<K, V> build() {
return new MockOnHeapCache<>(maxSize, this.getEventListener());
}

public Builder<K, V> setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}
}
}

0 comments on commit 1832df9

Please sign in to comment.