Skip to content

Commit

Permalink
Add threadpoolexecutor thread count setting
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Nov 20, 2024
1 parent 347530e commit 5522949
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,28 @@ public class CaffeineHeapCacheSettings {
(key) -> Setting.positiveTimeSetting(key, TimeValue.MAX_VALUE, Setting.Property.NodeScope)
);

/**
* Setting defining number of threads in caffeine cleanup threadpool.
*
* Setting pattern: {cache_type}.caffeine_heap.cleanup_threads
*/
public static final Setting.AffixSetting<Integer> CLEANUP_THREADS_SETTING = Setting.suffixKeySetting(
CaffeineHeapCache.CaffeineHeapCacheFactory.NAME + ".cleanup_threads",
(key) -> Setting.intSetting(key, 3, 1, Setting.Property.NodeScope)
);

public static final String MAXIMUM_SIZE_IN_BYTES_KEY = "maximum_size_in_bytes";
public static final String EXPIRE_AFTER_ACCESS_KEY = "expire_after_access";

public static final String CLEANUP_THREADS_KEY = "cleanup_threads";

private static final Map<String, Setting.AffixSetting<?>> KEY_SETTING_MAP = Map.of(
MAXIMUM_SIZE_IN_BYTES_KEY,
MAXIMUM_SIZE_IN_BYTES_SETTING,
EXPIRE_AFTER_ACCESS_KEY,
EXPIRE_AFTER_ACCESS_SETTING
EXPIRE_AFTER_ACCESS_SETTING,
CLEANUP_THREADS_KEY,
CLEANUP_THREADS_SETTING
);

public static final Map<CacheType, Map<String, Setting<?>>> CACHE_TYPE_MAP = getCacheTypeMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
Expand All @@ -42,10 +41,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import java.util.function.ToLongBiFunction;
Expand All @@ -72,8 +69,7 @@ private CaffeineHeapCache(Builder<K, V> builder) {
this.caffeineRemovalListener = new CaffeineRemovalListener(
Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null")
);

this.executor = Executors.newFixedThreadPool(3);
this.executor = Executors.newFixedThreadPool(builder.numCleanupThreads);

cache = AccessController.doPrivileged(
(PrivilegedAction<Cache<ICacheKey<K>, V>>) () -> Caffeine.newBuilder()
Expand Down Expand Up @@ -234,9 +230,14 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
Map<String, Setting<?>> settingList = CaffeineHeapCacheSettings.getSettingListForCacheType(cacheType);
Settings settings = config.getSettings();
boolean statsTrackingEnabled = statsTrackingEnabled(config.getSettings(), config.getStatsTrackingEnabled());
ICacheBuilder<K, V> builder = new CaffeineHeapCache.Builder<K, V>().setDimensionNames(config.getDimensionNames())
ICacheBuilder<K, V> builder = new CaffeineHeapCache.Builder<K, V>().setNumCleanupThreads(
(int) settingList.get(CaffeineHeapCacheSettings.CLEANUP_THREADS_KEY).get(settings)
)
.setDimensionNames(config.getDimensionNames())
.setStatsTrackingEnabled(statsTrackingEnabled)
.setMaximumWeightInBytes(((ByteSizeValue) settingList.get(CaffeineHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes())
.setMaximumWeightInBytes(
((ByteSizeValue) settingList.get(CaffeineHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes()
)
.setExpireAfterAccess(((TimeValue) settingList.get(CaffeineHeapCacheSettings.EXPIRE_AFTER_ACCESS_KEY).get(settings)))
.setWeigher(config.getWeigher())
.setRemovalListener(config.getRemovalListener());
Expand All @@ -262,7 +263,7 @@ private boolean statsTrackingEnabled(Settings settings, boolean statsTrackingEna

public static class Builder<K, V> extends ICacheBuilder<K, V> {
private List<String> dimensionNames;
private Executor executor = ForkJoinPool.commonPool();
private int numCleanupThreads;

public Builder() {}

Expand All @@ -271,15 +272,11 @@ public Builder<K, V> setDimensionNames(List<String> dimensionNames) {
return this;
}

public Builder<K, V> setExecutor(Executor executor) {
this.executor = executor;
public Builder<K, V> setNumCleanupThreads(int numCleanupThreads) {
this.numCleanupThreads = numCleanupThreads;
return this;
}

public Executor getExecutor() {
return executor;
}

public CaffeineHeapCache<K, V> build() {
return new CaffeineHeapCache<>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public void testReplace() throws Exception {

// Replace old values with new, differently-sized values. Ensure that size changes accordingly.
for (Map.Entry<ICacheKey<String>, String> entry : keyValueMap.entrySet()) {
//waitForStats(caffeineTest);
// waitForStats(caffeineTest);
long current_size = caffeineTest.stats().getTotalItems();
long current_size_in_bytes = caffeineTest.stats().getTotalSizeInBytes();
String old_value = entry.getValue();
Expand All @@ -425,7 +425,10 @@ public void testReplace() throws Exception {
keyValueMap.put(key, new_value);
waitForStats(caffeineTest); // TODO: this can be very slow
assertEquals(current_size, caffeineTest.stats().getTotalItems());
assertEquals(current_size_in_bytes - Integer.parseInt(old_value) + Integer.parseInt(new_value), caffeineTest.stats().getTotalSizeInBytes());
assertEquals(
current_size_in_bytes - Integer.parseInt(old_value) + Integer.parseInt(new_value),
caffeineTest.stats().getTotalSizeInBytes()
);
}
caffeineTest.close();
}
Expand Down Expand Up @@ -458,7 +461,7 @@ public void testIteratorRemove() throws Exception {
assertNull(caffeineTest.get(next));
}
((CaffeineHeapCache<String, String>) caffeineTest).cleanUp();
//Thread.sleep(1000);
// Thread.sleep(1000);
waitForStats(caffeineTest);
assertEquals(0, caffeineTest.count());
assertEquals(0, caffeineTest.stats().getTotalSizeInBytes());
Expand Down Expand Up @@ -497,11 +500,10 @@ void waitForStats(ICache<String, String> caffeineCache) {
((CaffeineHeapCache<String, String>) caffeineCache).cleanUp();
ThreadPoolExecutor executor = ((CaffeineHeapCache<String, String>) caffeineCache).getExecutor();
// TODO: Gross hack - not good!
while (executor.getActiveCount() != 0 || !executor.getQueue().isEmpty()){
while (executor.getActiveCount() != 0 || !executor.getQueue().isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
} catch (InterruptedException e) {}
}
}

Expand Down

0 comments on commit 5522949

Please sign in to comment.