Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered caching] Integrating IndicesRequestCache with CacheService controlled by a feature flag #12533

Merged
merged 9 commits into from
Mar 11, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- [Tiered caching] Integrating IndicesRequestCache with CacheService controlled by a feature flag ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))

### Dependencies
Expand Down
2 changes: 2 additions & 0 deletions modules/cache-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* compatible open source license.
*/

apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Module for caches which are optional and do not require additional security permission'
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.greaterThan;

public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.TIERED_CACHING, "true").build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.build();
}

public void testPluginsAreInstalled() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
.stream()
.flatMap(
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
)
.collect(Collectors.toList());
Assert.assertTrue(
pluginInfos.stream()
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin"))
);
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("f", "type=date")
.setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build())
.get()
);
indexRandom(
true,
client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"),
client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z")
);
ensureSearchable("index");

// This is not a random example: serialization with time zones writes shared strings
// which used to not work well with the query cache because of the handles stream output
// see #9500
final SearchResponse r1 = client.prepareSearch("index")
.setSize(0)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(
dateHistogram("histo").field("f")
.timeZone(ZoneId.of("+01:00"))
.minDocCount(0)
.dateHistogramInterval(DateHistogramInterval.MONTH)
)
.get();
assertSearchResponse(r1);

// The cached is actually used
assertThat(
client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(),
greaterThan(0L)
);
}

public static class MockDiskCachePlugin extends Plugin implements CachePlugin {

public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000));
}

@Override
public String getName() {
return "mock_disk_plugin";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MockDiskCache<K, V> implements ICache<K, V> {

Map<K, V> cache;
int maxSize;
long delay;

public MockDiskCache(int maxSize, long delay) {
this.maxSize = maxSize;
this.delay = delay;
this.cache = new ConcurrentHashMap<K, V>();
}

@Override
public V get(K key) {
V value = cache.get(key);
return value;
}

@Override
public void put(K key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
return;
}
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.cache.put(key, value);
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return value;
}

@Override
public void invalidate(K key) {
this.cache.remove(key);
}

@Override
public void invalidateAll() {
this.cache.clear();
}

@Override
public Iterable<K> keys() {
return this.cache.keySet();
}

@Override
public long count() {
return this.cache.size();
}

@Override
public void refresh() {}

@Override
public void close() {

}

public static class MockDiskCacheFactory implements Factory {

public static final String NAME = "mockDiskCache";
final long delay;
final int maxSize;

public MockDiskCacheFactory(long delay, int maxSize) {
this.delay = delay;
this.maxSize = maxSize;
}

@Override
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
return new Builder<K, V>().setMaxSize(maxSize).setDeliberateDelay(delay).build();
}

@Override
public String getCacheName() {
return NAME;
}
}

public static class Builder<K, V> extends ICacheBuilder<K, V> {

int maxSize;
long delay;

@Override
public ICache<K, V> build() {
return new MockDiskCache<K, V>(this.maxSize, this.delay);
}

public Builder<K, V> setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}

public Builder<K, V> setDeliberateDelay(long millis) {
this.delay = millis;
return this;
}
}
}
Loading
Loading