Skip to content

Commit

Permalink
Moving feature flag setting handling logic to CacheService by maintai…
Browse files Browse the repository at this point in the history
…ning backward compatibility

Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Mar 7, 2024
1 parent 1f85b27 commit 4f58d5a
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
Expand Down Expand Up @@ -254,6 +256,11 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
.setRemovalListener(removalListener)
.setSettings(
Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
Expand Down Expand Up @@ -442,6 +449,10 @@ public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception {
diskCacheSize,
removalListener,
Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
Expand Down Expand Up @@ -746,6 +757,11 @@ public void testConcurrencyForEvictionFlow() throws Exception {
.setRemovalListener(removalListener)
.setSettings(
Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
Expand Down Expand Up @@ -856,7 +872,16 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setSettings(settings)
.setSettings(
Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(settings)
.build()
)
.build();

ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,12 @@ public IndicesRequestCacheIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() },
new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() }
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build();
}

@Override
protected boolean useRandomReplicationStrategy() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -44,8 +46,13 @@ public <K, V> ICache<K, V> createCache(CacheConfig<K, V> config, CacheType cache
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
if (storeName == null || storeName.isBlank()) {
throw new IllegalArgumentException("No configuration exists for cache type: " + cacheType);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
// Condition 1: In case feature flag is off, we default to onHeap.
// Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older
// settings, so we again fallback to onHeap to maintain backward compatibility.
// It is guaranteed that we will have this store name registered, so
// should be safe.
storeName = OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME;
}
if (!cacheStoreTypeFactories.containsKey(storeName)) {
throw new IllegalArgumentException("No store name: [" + storeName + "] is registered for cache type: " + cacheType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Setting;

/**
Expand All @@ -26,10 +25,10 @@ public class CacheSettings {
*/
public static final Setting.AffixSetting<String> CACHE_TYPE_STORE_NAME = Setting.suffixKeySetting(
"store.name",
(key) -> Setting.simpleString(key, OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, Setting.Property.NodeScope)
(key) -> Setting.simpleString(key, "", Setting.Property.NodeScope)
);

public static Setting<String> getConcreteSettingForCacheType(CacheType cacheType) {
public static Setting<String> getConcreteStoreNameSettingForCacheType(CacheType cacheType) {
return CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;

import java.util.Map;

import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_KEY;
import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;

/**
Expand Down Expand Up @@ -111,9 +115,22 @@ public static class OpenSearchOnHeapCacheFactory implements Factory {
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
Map<String, Setting<?>> settingList = OpenSearchOnHeapCacheSettings.getSettingListForCacheType(cacheType);
Settings settings = config.getSettings();
return new Builder<K, V>().setMaximumWeightInBytes(
ICacheBuilder<K, V> builder = new Builder<K, V>().setMaximumWeightInBytes(
((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes()
).setWeigher(config.getWeigher()).setRemovalListener(config.getRemovalListener()).build();
)
.setExpireAfterAccess(((TimeValue) settingList.get(EXPIRE_AFTER_ACCESS_KEY).get(settings)))
.setWeigher(config.getWeigher())
.setRemovalListener(config.getRemovalListener());
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
// For backward compatibility as the user intent is to use older settings.
builder.setMaximumWeightInBytes(config.getMaxSizeInBytes());
builder.setExpireAfterAccess(config.getExpireAfterAccess());
}
return builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.function.ToLongBiFunction;

Expand Down Expand Up @@ -41,12 +42,24 @@ public class CacheConfig<K, V> {

private final RemovalListener<K, V> removalListener;

/**
* Max size in bytes for the cache. This is needed for backward compatibility.
*/
private final long maxSizeInBytes;

/**
* Defines the expiration time for a cache entry. This is needed for backward compatibility.
*/
private final TimeValue expireAfterAccess;

private CacheConfig(Builder<K, V> builder) {
this.keyType = builder.keyType;
this.valueType = builder.valueType;
this.settings = builder.settings;
this.removalListener = builder.removalListener;
this.weigher = builder.weigher;
this.maxSizeInBytes = builder.maxSizeInBytes;
this.expireAfterAccess = builder.expireAfterAccess;
}

public Class<K> getKeyType() {
Expand All @@ -69,6 +82,14 @@ public ToLongBiFunction<K, V> getWeigher() {
return weigher;
}

public Long getMaxSizeInBytes() {
return maxSizeInBytes;
}

public TimeValue getExpireAfterAccess() {
return expireAfterAccess;
}

/**
* Builder class to build Cache config related parameters.
* @param <K> Type of key.
Expand All @@ -86,6 +107,10 @@ public static class Builder<K, V> {

private ToLongBiFunction<K, V> weigher;

private long maxSizeInBytes;

private TimeValue expireAfterAccess;

public Builder() {}

public Builder<K, V> setSettings(Settings settings) {
Expand Down Expand Up @@ -113,6 +138,16 @@ public Builder<K, V> setWeigher(ToLongBiFunction<K, V> weigher) {
return this;
}

public Builder<K, V> setMaxSizeInBytes(long sizeInBytes) {
this.maxSizeInBytes = sizeInBytes;
return this;
}

public Builder<K, V> setExpireAfterAccess(TimeValue expireAfterAccess) {
this.expireAfterAccess = expireAfterAccess;
return this;
}

public CacheConfig<K, V> build() {
return new CacheConfig<>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;

import java.util.HashMap;
Expand All @@ -33,9 +34,25 @@ public class OpenSearchOnHeapCacheSettings {
(key) -> Setting.memorySizeSetting(key, "1%", NodeScope)
);

/**
* Setting to define expire after access.
*
* Setting pattern: {cache_type}.opensearch_onheap.expire
*/
public static final Setting.AffixSetting<TimeValue> EXPIRE_AFTER_ACCESS_SETTING = Setting.suffixKeySetting(
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ".expire",
(key) -> Setting.positiveTimeSetting(key, TimeValue.MAX_VALUE, Setting.Property.NodeScope)
);

public static final String MAXIMUM_SIZE_IN_BYTES_KEY = "maximum_size_in_bytes";
public static final String EXPIRE_AFTER_ACCESS_KEY = "expire_after_access";

private static final Map<String, Setting.AffixSetting<?>> KEY_SETTING_MAP = Map.of(MAXIMUM_SIZE_IN_BYTES_KEY, MAXIMUM_SIZE_IN_BYTES);
private static final Map<String, Setting.AffixSetting<?>> KEY_SETTING_MAP = Map.of(
MAXIMUM_SIZE_IN_BYTES_KEY,
MAXIMUM_SIZE_IN_BYTES,
EXPIRE_AFTER_ACCESS_KEY,
EXPIRE_AFTER_ACCESS_SETTING
);

public static final Map<CacheType, Map<String, Setting<?>>> CACHE_TYPE_MAP = getCacheTypeMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,6 @@ public void apply(Settings value, Settings current, Settings previous) {
TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING
),
List.of(FeatureFlags.PLUGGABLE_CACHE),
List.of(CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE))
List.of(CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE))
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -131,26 +128,17 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
long sizeInBytes = size.getBytes();
ToLongBiFunction<Key, BytesReference> weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed();
this.cacheEntityLookup = cacheEntityFunction;
if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) {
this.cache = cacheService.createCache(
new CacheConfig.Builder<Key, BytesReference>().setSettings(settings)
.setWeigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed())
.setValueType(BytesReference.class)
.setKeyType(Key.class)
.setRemovalListener(this)
.build(),
CacheType.INDICES_REQUEST_CACHE
);
} else {
ICacheBuilder<Key, BytesReference> builder = new OpenSearchOnHeapCache.Builder<Key, BytesReference>().setSettings(settings)
.setMaximumWeightInBytes(sizeInBytes)
this.cache = cacheService.createCache(
new CacheConfig.Builder<Key, BytesReference>().setSettings(settings)
.setWeigher(weigher)
.setRemovalListener(this);
if (expire != null) {
builder.setExpireAfterAccess(expire);
}
this.cache = builder.build();
}
.setValueType(BytesReference.class)
.setKeyType(Key.class)
.setRemovalListener(this)
.setMaxSizeInBytes(sizeInBytes) // for backward compatibility
.setExpireAfterAccess(expire) // for backward compatibility
.build(),
CacheType.INDICES_REQUEST_CACHE
);
}

@Override
Expand Down
Loading

0 comments on commit 4f58d5a

Please sign in to comment.