Skip to content

Commit

Permalink
Add CacheMaintainer class to perform pending cache maintenance every …
Browse files Browse the repository at this point in the history
…minute

Signed-off-by: owenhalpert <[email protected]>

# Conflicts:
#	CHANGELOG.md
  • Loading branch information
owenhalpert committed Dec 3, 2024
1 parent 9276c77 commit 06852a4
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Documentation
### Maintenance
* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236)
* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2239](https://github.com/opensearch-project/k-NN/issues/2239)
* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278]
### Refactoring
42 changes: 42 additions & 0 deletions src/main/java/org/opensearch/knn/index/CacheMaintainer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import com.google.common.cache.Cache;

import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Performs periodic maintenance for a Guava cache. The Guava cache is implemented in a way that maintenance operations (such as evicting expired
* entries) will only occur when the cache is accessed. See {@see <a href="https://github.com/google/guava/wiki/cachesexplained#timed-eviction"> Guava Cache Guide</a>}
* for more details. Thus, to perform any pending maintenance, the cleanUp method will be called periodically from a CacheMaintainer instance.
*/
public class CacheMaintainer<K, V> implements Closeable {
private final Cache<K, V> cache;
private final ScheduledExecutorService executor;
private static final int DEFAULT_INTERVAL_SECONDS = 60;

public CacheMaintainer(Cache<K, V> cache) {
this.cache = cache;
this.executor = Executors.newSingleThreadScheduledExecutor();
}

public void startMaintenance() {
executor.scheduleAtFixedRate(this::cleanCache, DEFAULT_INTERVAL_SECONDS, DEFAULT_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

public void cleanCache() {
cache.cleanUp();
}

@Override
public void close() {
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.CacheMaintainer;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.plugin.stats.StatNames;

Expand Down Expand Up @@ -51,6 +52,7 @@ public class NativeMemoryCacheManager implements Closeable {
private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ExecutorService executor;
private CacheMaintainer<String, NativeMemoryAllocation> cacheMaintainer;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;

Expand Down Expand Up @@ -87,6 +89,10 @@ private void initialize() {
}

private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

CacheBuilder<String, NativeMemoryAllocation> cacheBuilder = CacheBuilder.newBuilder()
.recordStats()
.concurrencyLevel(1)
Expand All @@ -104,6 +110,9 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
cacheCapacityReached = new AtomicBoolean(false);
accessRecencyQueue = new ConcurrentLinkedDeque<>();
cache = cacheBuilder.build();

this.cacheMaintainer = new CacheMaintainer<>(cache);
this.cacheMaintainer.startMaintenance();
}

/**
Expand Down Expand Up @@ -142,6 +151,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa
@Override
public void close() {
executor.shutdown();
if (cacheMaintainer != null) {
cacheMaintainer.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import lombok.extern.log4j.Log4j2;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.knn.index.CacheMaintainer;
import org.opensearch.knn.index.KNNSettings;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,10 +29,11 @@
* A thread-safe singleton cache that contains quantization states.
*/
@Log4j2
public class QuantizationStateCache {
public class QuantizationStateCache implements Closeable {

private static volatile QuantizationStateCache instance;
private Cache<String, QuantizationState> cache;
private CacheMaintainer<String, QuantizationState> cacheMaintainer;
@Getter
private long maxCacheSizeInKB;
@Getter
Expand Down Expand Up @@ -58,6 +61,10 @@ static QuantizationStateCache getInstance() {
}

private void buildCache() {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> {
try {
return ((QuantizationState) v).toByteArray().length;
Expand All @@ -71,6 +78,9 @@ private void buildCache() {
)
.removalListener(this::onRemoval)
.build();

this.cacheMaintainer = new CacheMaintainer<>(cache);
this.cacheMaintainer.startMaintenance();
}

synchronized void rebuildCache() {
Expand Down Expand Up @@ -129,4 +139,9 @@ private void updateEvictedDueToSizeAt() {
public void clear() {
cache.invalidateAll();
}

@Override
public void close() throws IOException {
cacheMaintainer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) {
public void clear() {
QuantizationStateCache.getInstance().clear();
}

public void close() throws IOException {
QuantizationStateCache.getInstance().close();
}
}
2 changes: 2 additions & 0 deletions src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.index.IndexService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.plugins.Plugin;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand Down Expand Up @@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() {
public void tearDown() throws Exception {
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close();
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/org/opensearch/knn/KNNTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() {
return false;
}

public void resetState() {
public void resetState() throws IOException {
// Reset all of the counters
for (KNNCounter knnCounter : KNNCounter.values()) {
knnCounter.set(0L);
Expand All @@ -83,6 +85,7 @@ public void resetState() {
// Clean up the cache
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
}

private void initKNNSettings() {
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/org/opensearch/knn/index/CacheMaintainerTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class CacheMaintainerTests {
@Test
public void testCacheEviction() throws InterruptedException {
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();

CacheMaintainer<String, String> cleaner = new CacheMaintainer<>(testCache);

testCache.put("key1", "value1");
assertEquals(testCache.size(), 1);

Thread.sleep(1500);

cleaner.cleanCache();
assertEquals(testCache.size(), 0);

cleaner.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void tearDown() throws Exception {
Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build();
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get();
NativeMemoryCacheManager.getInstance().close();
super.tearDown();
}

Expand Down Expand Up @@ -378,6 +379,7 @@ public void testCacheCapacity() {

nativeMemoryCacheManager.setCacheCapacityReached(false);
assertFalse(nativeMemoryCacheManager.isCacheCapacityReached());
nativeMemoryCacheManager.close();
}

public void testGetIndicesCacheStats() throws IOException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -417,7 +418,7 @@ public void testRebuildOnTimeExpirySettingsChange() {
assertNull("State should be null", retrievedState);
}

public void testCacheEvictionDueToSize() {
public void testCacheEvictionDueToSize() throws IOException {
String fieldName = "evictionField";
// States have size of slightly over 500 bytes so that adding two will reach the max size of 1 kb for the cache
int arrayLength = 112;
Expand Down Expand Up @@ -445,6 +446,7 @@ public void testCacheEvictionDueToSize() {
cache.addQuantizationState(fieldName, state);
cache.addQuantizationState(fieldName, state2);
cache.clear();
cache.close();
assertNotNull(cache.getEvictedDueToSizeAt());
}
}

0 comments on commit 06852a4

Please sign in to comment.