From 5522949ca9ed910ffac5d657d5e263deb6e01a62 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 20 Nov 2024 15:18:11 -0800 Subject: [PATCH] Add threadpoolexecutor thread count setting Signed-off-by: Peter Alfonsi --- .../cache/CaffeineHeapCacheSettings.java | 16 +++++++++++- .../cache/store/CaffeineHeapCache.java | 25 ++++++++----------- .../cache/store/CaffeineHeapCacheTests.java | 14 ++++++----- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/plugins/cache-caffeine/src/main/java/org/opensearch/cache/CaffeineHeapCacheSettings.java b/plugins/cache-caffeine/src/main/java/org/opensearch/cache/CaffeineHeapCacheSettings.java index 08bd7a918742d..3fcc086e9e402 100644 --- a/plugins/cache-caffeine/src/main/java/org/opensearch/cache/CaffeineHeapCacheSettings.java +++ b/plugins/cache-caffeine/src/main/java/org/opensearch/cache/CaffeineHeapCacheSettings.java @@ -41,14 +41,28 @@ public class CaffeineHeapCacheSettings { (key) -> Setting.positiveTimeSetting(key, TimeValue.MAX_VALUE, Setting.Property.NodeScope) ); + /** + * Setting defining number of threads in caffeine cleanup threadpool. + * + * Setting pattern: {cache_type}.caffeine_heap.cleanup_threads + */ + public static final Setting.AffixSetting CLEANUP_THREADS_SETTING = Setting.suffixKeySetting( + CaffeineHeapCache.CaffeineHeapCacheFactory.NAME + ".cleanup_threads", + (key) -> Setting.intSetting(key, 3, 1, Setting.Property.NodeScope) + ); + public static final String MAXIMUM_SIZE_IN_BYTES_KEY = "maximum_size_in_bytes"; public static final String EXPIRE_AFTER_ACCESS_KEY = "expire_after_access"; + public static final String CLEANUP_THREADS_KEY = "cleanup_threads"; + private static final Map> KEY_SETTING_MAP = Map.of( MAXIMUM_SIZE_IN_BYTES_KEY, MAXIMUM_SIZE_IN_BYTES_SETTING, EXPIRE_AFTER_ACCESS_KEY, - EXPIRE_AFTER_ACCESS_SETTING + EXPIRE_AFTER_ACCESS_SETTING, + CLEANUP_THREADS_KEY, + CLEANUP_THREADS_SETTING ); public static final Map>> CACHE_TYPE_MAP = getCacheTypeMap(); diff --git a/plugins/cache-caffeine/src/main/java/org/opensearch/cache/store/CaffeineHeapCache.java b/plugins/cache-caffeine/src/main/java/org/opensearch/cache/store/CaffeineHeapCache.java index 3b4de2b536f06..15ff83d1911a1 100644 --- a/plugins/cache-caffeine/src/main/java/org/opensearch/cache/store/CaffeineHeapCache.java +++ b/plugins/cache-caffeine/src/main/java/org/opensearch/cache/store/CaffeineHeapCache.java @@ -23,7 +23,6 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; -import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.stats.CacheStatsHolder; import org.opensearch.common.cache.stats.DefaultCacheStatsHolder; import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; @@ -42,10 +41,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Function; import java.util.function.ToLongBiFunction; @@ -72,8 +69,7 @@ private CaffeineHeapCache(Builder builder) { this.caffeineRemovalListener = new CaffeineRemovalListener( Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null") ); - - this.executor = Executors.newFixedThreadPool(3); + this.executor = Executors.newFixedThreadPool(builder.numCleanupThreads); cache = AccessController.doPrivileged( (PrivilegedAction, V>>) () -> Caffeine.newBuilder() @@ -234,9 +230,14 @@ public ICache create(CacheConfig config, CacheType cacheType, Map> settingList = CaffeineHeapCacheSettings.getSettingListForCacheType(cacheType); Settings settings = config.getSettings(); boolean statsTrackingEnabled = statsTrackingEnabled(config.getSettings(), config.getStatsTrackingEnabled()); - ICacheBuilder builder = new CaffeineHeapCache.Builder().setDimensionNames(config.getDimensionNames()) + ICacheBuilder builder = new CaffeineHeapCache.Builder().setNumCleanupThreads( + (int) settingList.get(CaffeineHeapCacheSettings.CLEANUP_THREADS_KEY).get(settings) + ) + .setDimensionNames(config.getDimensionNames()) .setStatsTrackingEnabled(statsTrackingEnabled) - .setMaximumWeightInBytes(((ByteSizeValue) settingList.get(CaffeineHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes()) + .setMaximumWeightInBytes( + ((ByteSizeValue) settingList.get(CaffeineHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes() + ) .setExpireAfterAccess(((TimeValue) settingList.get(CaffeineHeapCacheSettings.EXPIRE_AFTER_ACCESS_KEY).get(settings))) .setWeigher(config.getWeigher()) .setRemovalListener(config.getRemovalListener()); @@ -262,7 +263,7 @@ private boolean statsTrackingEnabled(Settings settings, boolean statsTrackingEna public static class Builder extends ICacheBuilder { private List dimensionNames; - private Executor executor = ForkJoinPool.commonPool(); + private int numCleanupThreads; public Builder() {} @@ -271,15 +272,11 @@ public Builder setDimensionNames(List dimensionNames) { return this; } - public Builder setExecutor(Executor executor) { - this.executor = executor; + public Builder setNumCleanupThreads(int numCleanupThreads) { + this.numCleanupThreads = numCleanupThreads; return this; } - public Executor getExecutor() { - return executor; - } - public CaffeineHeapCache build() { return new CaffeineHeapCache<>(this); } diff --git a/plugins/cache-caffeine/src/test/java/org/opensearch/cache/store/CaffeineHeapCacheTests.java b/plugins/cache-caffeine/src/test/java/org/opensearch/cache/store/CaffeineHeapCacheTests.java index 966bed90a0238..429d38d82f800 100644 --- a/plugins/cache-caffeine/src/test/java/org/opensearch/cache/store/CaffeineHeapCacheTests.java +++ b/plugins/cache-caffeine/src/test/java/org/opensearch/cache/store/CaffeineHeapCacheTests.java @@ -415,7 +415,7 @@ public void testReplace() throws Exception { // Replace old values with new, differently-sized values. Ensure that size changes accordingly. for (Map.Entry, String> entry : keyValueMap.entrySet()) { - //waitForStats(caffeineTest); + // waitForStats(caffeineTest); long current_size = caffeineTest.stats().getTotalItems(); long current_size_in_bytes = caffeineTest.stats().getTotalSizeInBytes(); String old_value = entry.getValue(); @@ -425,7 +425,10 @@ public void testReplace() throws Exception { keyValueMap.put(key, new_value); waitForStats(caffeineTest); // TODO: this can be very slow assertEquals(current_size, caffeineTest.stats().getTotalItems()); - assertEquals(current_size_in_bytes - Integer.parseInt(old_value) + Integer.parseInt(new_value), caffeineTest.stats().getTotalSizeInBytes()); + assertEquals( + current_size_in_bytes - Integer.parseInt(old_value) + Integer.parseInt(new_value), + caffeineTest.stats().getTotalSizeInBytes() + ); } caffeineTest.close(); } @@ -458,7 +461,7 @@ public void testIteratorRemove() throws Exception { assertNull(caffeineTest.get(next)); } ((CaffeineHeapCache) caffeineTest).cleanUp(); - //Thread.sleep(1000); + // Thread.sleep(1000); waitForStats(caffeineTest); assertEquals(0, caffeineTest.count()); assertEquals(0, caffeineTest.stats().getTotalSizeInBytes()); @@ -497,11 +500,10 @@ void waitForStats(ICache caffeineCache) { ((CaffeineHeapCache) caffeineCache).cleanUp(); ThreadPoolExecutor executor = ((CaffeineHeapCache) caffeineCache).getExecutor(); // TODO: Gross hack - not good! - while (executor.getActiveCount() != 0 || !executor.getQueue().isEmpty()){ + while (executor.getActiveCount() != 0 || !executor.getQueue().isEmpty()) { try { Thread.sleep(500); - } catch (InterruptedException e) { - } + } catch (InterruptedException e) {} } }