Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CacheMaintainer class to perform pending cache maintenance every minute #2308

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Documentation
### Maintenance
* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236)
* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2239](https://github.com/opensearch-project/k-NN/issues/2239)
* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278]
### Refactoring
42 changes: 42 additions & 0 deletions src/main/java/org/opensearch/knn/index/CacheMaintainer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import com.google.common.cache.Cache;

import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Performs periodic maintenance for a Guava cache. The Guava cache is implemented in a way that maintenance operations (such as evicting expired
* entries) will only occur when the cache is accessed. See {@see <a href="https://github.com/google/guava/wiki/cachesexplained#timed-eviction"> Guava Cache Guide</a>}
* for more details. Thus, to perform any pending maintenance, the cleanUp method will be called periodically from a CacheMaintainer instance.
*/
public class CacheMaintainer<K, V> implements Closeable {
private final Cache<K, V> cache;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also avoid maintaining the Cache object reference here by using a functional interface. That would also get rid of the generification of this class.

Simply pass and store the runnable reference instead of the cache as new CacheMaintainer(() -> cache.cleanUp());

Possibly also move this class to the util package and call it a ScheduledExecutor with Runnable ref and interval as parameters.

public class ScheduledExecutor implements Closeable {

...

  public ScheduledExecutor(Runnable reference, long scheduleMillis) {
  ...
  }

  ...
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kotwanikunal what do you think about creating the executor and calling scheduleAtFixedRate within each cache class instead of creating a new ScheduledExecutor class?

private final ScheduledExecutorService executor;
private static final int DEFAULT_INTERVAL_SECONDS = 60;

public CacheMaintainer(Cache<K, V> cache) {
this.cache = cache;
this.executor = Executors.newSingleThreadScheduledExecutor();
}

public void startMaintenance() {
executor.scheduleAtFixedRate(this::cleanCache, DEFAULT_INTERVAL_SECONDS, DEFAULT_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

public void cleanCache() {
cache.cleanUp();
}

@Override
public void close() {
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.CacheMaintainer;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.plugin.stats.StatNames;

Expand Down Expand Up @@ -51,6 +52,7 @@ public class NativeMemoryCacheManager implements Closeable {
private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ExecutorService executor;
private CacheMaintainer<String, NativeMemoryAllocation> cacheMaintainer;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;

Expand Down Expand Up @@ -87,6 +89,10 @@ private void initialize() {
}

private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

CacheBuilder<String, NativeMemoryAllocation> cacheBuilder = CacheBuilder.newBuilder()
.recordStats()
.concurrencyLevel(1)
Expand All @@ -104,6 +110,9 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
cacheCapacityReached = new AtomicBoolean(false);
accessRecencyQueue = new ConcurrentLinkedDeque<>();
cache = cacheBuilder.build();

this.cacheMaintainer = new CacheMaintainer<>(cache);
this.cacheMaintainer.startMaintenance();
}

/**
Expand Down Expand Up @@ -142,6 +151,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa
@Override
public void close() {
executor.shutdown();
if (cacheMaintainer != null) {
cacheMaintainer.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import lombok.extern.log4j.Log4j2;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.knn.index.CacheMaintainer;
import org.opensearch.knn.index.KNNSettings;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,10 +29,11 @@
* A thread-safe singleton cache that contains quantization states.
*/
@Log4j2
public class QuantizationStateCache {
public class QuantizationStateCache implements Closeable {

private static volatile QuantizationStateCache instance;
private Cache<String, QuantizationState> cache;
private CacheMaintainer<String, QuantizationState> cacheMaintainer;
@Getter
private long maxCacheSizeInKB;
@Getter
Expand Down Expand Up @@ -58,6 +61,10 @@ static QuantizationStateCache getInstance() {
}

private void buildCache() {
if (cacheMaintainer != null) {
cacheMaintainer.close();
}

this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> {
try {
return ((QuantizationState) v).toByteArray().length;
Expand All @@ -71,6 +78,9 @@ private void buildCache() {
)
.removalListener(this::onRemoval)
.build();

this.cacheMaintainer = new CacheMaintainer<>(cache);
this.cacheMaintainer.startMaintenance();
}

synchronized void rebuildCache() {
Expand Down Expand Up @@ -129,4 +139,9 @@ private void updateEvictedDueToSizeAt() {
public void clear() {
cache.invalidateAll();
}

@Override
public void close() throws IOException {
cacheMaintainer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) {
public void clear() {
QuantizationStateCache.getInstance().clear();
}

public void close() throws IOException {
QuantizationStateCache.getInstance().close();
}
}
2 changes: 2 additions & 0 deletions src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.index.IndexService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.plugins.Plugin;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand Down Expand Up @@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() {
public void tearDown() throws Exception {
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close();
NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close();
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/org/opensearch/knn/KNNTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() {
return false;
}

public void resetState() {
public void resetState() throws IOException {
// Reset all of the counters
for (KNNCounter knnCounter : KNNCounter.values()) {
knnCounter.set(0L);
Expand All @@ -83,6 +85,7 @@ public void resetState() {
// Clean up the cache
NativeMemoryCacheManager.getInstance().invalidateAll();
NativeMemoryCacheManager.getInstance().close();
QuantizationStateCacheManager.getInstance().close();
}

private void initKNNSettings() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class CacheMaintainerTests {
@Test
public void testCacheEviction() throws InterruptedException {
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();

CacheMaintainer<String, String> cleaner = new CacheMaintainer<>(testCache);

testCache.put("key1", "value1");
assertEquals(testCache.size(), 1);

Thread.sleep(1500);

cleaner.cleanCache();
assertEquals(testCache.size(), 0);

cleaner.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void tearDown() throws Exception {
Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build();
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get();
NativeMemoryCacheManager.getInstance().close();
super.tearDown();
}

Expand Down Expand Up @@ -378,6 +379,7 @@ public void testCacheCapacity() {

nativeMemoryCacheManager.setCacheCapacityReached(false);
assertFalse(nativeMemoryCacheManager.isCacheCapacityReached());
nativeMemoryCacheManager.close();
}

public void testGetIndicesCacheStats() throws IOException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -417,7 +418,7 @@ public void testRebuildOnTimeExpirySettingsChange() {
assertNull("State should be null", retrievedState);
}

public void testCacheEvictionDueToSize() {
public void testCacheEvictionDueToSize() throws IOException {
String fieldName = "evictionField";
// States have size of slightly over 500 bytes so that adding two will reach the max size of 1 kb for the cache
int arrayLength = 112;
Expand Down Expand Up @@ -445,6 +446,7 @@ public void testCacheEvictionDueToSize() {
cache.addQuantizationState(fieldName, state);
cache.addQuantizationState(fieldName, state2);
cache.clear();
cache.close();
assertNotNull(cache.getEvictedDueToSizeAt());
}
}
Loading