Skip to content

Commit

Permalink
Changes for initializing tiered cache as per setting
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Jan 2, 2024
1 parent 4eed280 commit a18536d
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.FeatureFlags;
Expand All @@ -60,6 +61,7 @@
import java.util.Collection;
import java.util.List;

import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_TYPE;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.search.aggregations.AggregationBuilders.dateRange;
Expand Down Expand Up @@ -636,6 +638,72 @@ public void testProfileDisableCache() throws Exception {
}
}

public void testWithTieredCache() throws Exception {
Settings.Builder settings = Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(),
true).put(INDICES_REQUEST_CACHE_TYPE.getKey(), CacheType.TIERED.getCacheTypeValue());
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("f", "type=date")
.setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true))
.get()
);
indexRandom(
true,
client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"),
client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z")
);
ensureSearchable("index");

// This is not a random example: serialization with time zones writes shared strings
// which used to not work well with the query cache because of the handles stream output
// see #9500
final SearchResponse r1 = client.prepareSearch("index")
.setSize(0)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(
dateHistogram("histo").field("f")
.timeZone(ZoneId.of("+01:00"))
.minDocCount(0)
.dateHistogramInterval(DateHistogramInterval.MONTH)
)
.get();
assertSearchResponse(r1);

// The cached is actually used
assertThat(
client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(),
greaterThan(0L)
);

for (int i = 0; i < 10; ++i) {
final SearchResponse r2 = client.prepareSearch("index")
.setSize(0)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(
dateHistogram("histo").field("f")
.timeZone(ZoneId.of("+01:00"))
.minDocCount(0)
.dateHistogramInterval(DateHistogramInterval.MONTH)
)
.get();
assertSearchResponse(r2);
Histogram h1 = r1.getAggregations().get("histo");
Histogram h2 = r2.getAggregations().get("histo");
final List<? extends Bucket> buckets1 = h1.getBuckets();
final List<? extends Bucket> buckets2 = h2.getBuckets();
assertEquals(buckets1.size(), buckets2.size());
for (int j = 0; j < buckets1.size(); ++j) {
final Bucket b1 = buckets1.get(j);
final Bucket b2 = buckets2.get(j);
assertEquals(b1.getKey(), b2.getKey());
assertEquals(b1.getDocCount(), b2.getDocCount());
}
}

}
private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public enum CacheType {
CacheType(String cacheType) {
this.cacheType = cacheType;
}

public String getCacheTypeValue() {
return cacheType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ private EhCacheDiskCache(Builder<K, V> builder) {
}
this.settings = Objects.requireNonNull(builder.settings, "Settings objects shouldn't be null");
Objects.requireNonNull(builder.settingPrefix, "Setting prefix shouldn't be null");
this.DISK_WRITE_MINIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.min_threads", 2, 1, 5);
this.DISK_WRITE_MAXIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.max_threads", 2, 1, 20);
this.DISK_WRITE_MINIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tier.disk.ehcache.min_threads", 2, 1, 5);
this.DISK_WRITE_MAXIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tier.disk.ehcache.max_threads", 2, 1, 20);
// Default value is 1 within EhCache.
this.DISK_WRITE_CONCURRENCY = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.concurrency", 2, 1, 3);
this.DISK_WRITE_CONCURRENCY = Setting.intSetting(builder.settingPrefix + ".tier.disk.ehcache.concurrency", 2, 1, 3);
// Default value is 16 within Ehcache.
this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + ".ehcache.disk.segments", 16, 1, 32);
this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + "tier.disk.ehcache.segments", 16, 1, 32);
this.cacheManager = buildCacheManager();
Objects.requireNonNull(builder.getEventListener(), "Listener can't be null");
this.eventListener = builder.getEventListener();
Expand Down Expand Up @@ -267,11 +267,7 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
switch (event.getType()) {
case CREATED:
count.inc();
this.eventListener.onCached(
event.getKey(),
event.getNewValue(),
CacheStoreType.DISK
);
this.eventListener.onCached(event.getKey(), event.getNewValue(), CacheStoreType.DISK);
assert event.getOldValue() == null;
break;
case EVICTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
import org.opensearch.common.cache.store.enums.CacheStoreType;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

Expand All @@ -29,6 +30,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

import static org.opensearch.common.util.FeatureFlags.INDICES_REQUEST_TIERED_CACHE_ENABLE_SETTING;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
Expand All @@ -45,6 +48,8 @@ public class TieredSpilloverCache<K, V> implements TieredCache<K, V>, StoreAware
private final Optional<StoreAwareCache<K, V>> onDiskCache;
private final StoreAwareCache<K, V> onHeapCache;
private final StoreAwareCacheEventListener<K, V> listener;

private final Settings settings;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
Expand All @@ -63,6 +68,7 @@ public class TieredSpilloverCache<K, V> implements TieredCache<K, V>, StoreAware
this.onDiskCache = Optional.empty();
}
this.listener = builder.listener;
this.settings = Objects.requireNonNull(builder.settings, "Settings object shouldn't be null");
this.cacheList = this.onDiskCache.map(diskTier -> Arrays.asList(this.onHeapCache, diskTier)).orElse(List.of(this.onHeapCache));
}

Expand Down Expand Up @@ -214,7 +220,11 @@ public void onRemoval(StoreAwareCacheRemovalNotification<K, V> notification) {
switch (notification.getCacheStoreType()) {
case ON_HEAP:
try (ReleasableLock ignore = writeLock.acquire()) {
onDiskCache.ifPresent(diskTier -> { diskTier.put(notification.getKey(), notification.getValue()); });
onDiskCache.ifPresent(diskTier -> {
if (INDICES_REQUEST_TIERED_CACHE_ENABLE_SETTING.get(settings)) {
diskTier.put(notification.getKey(), notification.getValue());
}
});
}
onDiskCache.ifPresent(
diskTier -> listener.onCached(notification.getKey(), notification.getValue(), CacheStoreType.DISK)
Expand Down Expand Up @@ -267,6 +277,7 @@ public static class Builder<K, V> {
private StoreAwareCacheBuilder<K, V> onHeapCacheBuilder;
private StoreAwareCacheBuilder<K, V> onDiskCacheBuilder;
private StoreAwareCacheEventListener<K, V> listener;
private Settings settings;

public Builder() {}

Expand All @@ -285,6 +296,11 @@ public Builder<K, V> setListener(StoreAwareCacheEventListener<K, V> listener) {
return this;
}

public Builder<K, V> setSettings(Settings settings) {
this.settings = settings;
return this;
}

public TieredSpilloverCache<K, V> build() {
return new TieredSpilloverCache<>(this);
}
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/common/util/FeatureFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ public static boolean isEnabled(Setting<Boolean> featureFlag) {
Property.NodeScope
);

public static final Setting<String> INDICES_REQUEST_CACHE_TYPE = Setting.simpleString(
"indices.requests.cache" + ".type",
Property.NodeScope
);

public static final Setting<Boolean> INDICES_REQUEST_TIERED_CACHE_ENABLE_SETTING = Setting.boolSetting(
"indices.requests.cache.tiered.enabled",
false,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Boolean> DATETIME_FORMATTER_CACHING_SETTING = Setting.boolSetting(
DATETIME_FORMATTER_CACHING,
true,
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public static class NodePath {
/** Cached FileStore from path */
public final FileStore fileStore;
public final Path fileCachePath;
public final Path indicesDiskCachePath;
/*
Cache reserved size can default to a different value depending on configuration
*/
Expand All @@ -139,6 +140,7 @@ public NodePath(Path path) throws IOException {
this.indicesPath = path.resolve(INDICES_FOLDER);
this.fileCachePath = path.resolve(CACHE_FOLDER);
this.fileStore = Environment.getFileStore(path);
this.indicesDiskCachePath = this.indicesPath.resolve(CACHE_FOLDER);
this.fileCacheReservedSize = ByteSizeValue.ZERO;
if (fileStore.supportsFileAttributeView("lucene")) {
this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number");
Expand Down Expand Up @@ -1299,6 +1301,11 @@ private static Path resolveIndexCustomLocation(String customDataPath, String ind
return resolveBaseCustomLocation(customDataPath, sharedDataPath, nodeLockId).resolve(indexUUID);
}

public Path resolveIndicesDiskCachePath(String indicesCache) {
return this.nodePaths[0].indicesDiskCachePath.resolve(indicesCache);
}


/**
* Resolve the file cache path for remote shards.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalNotification;
Expand All @@ -47,13 +48,16 @@
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
import org.opensearch.common.cache.store.enums.CacheStoreType;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;
import org.opensearch.common.cache.tier.EhCacheDiskCache;
import org.opensearch.common.cache.tier.TieredSpilloverCache;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

import java.io.Closeable;
Expand Down Expand Up @@ -111,13 +115,25 @@ public final class IndicesRequestCache implements StoreAwareCacheEventListener<I
Property.NodeScope
);

public static final Setting<TimeValue> INDICES_REQUEST_CACHE_DISK_TIER_EXPIRE = Setting.positiveTimeSetting(
"indices.requests.cache.tier.disk.expire",
new TimeValue(0),
Property.NodeScope
);

public static final Setting<ByteSizeValue> INDICES_REQUEST_CACHE_DISK_SIZE = Setting.byteSizeSetting(
"indices.requests.cache.tier.disk.size",
new ByteSizeValue(500, ByteSizeUnit.MB)
);


private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
private final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
private final ByteSizeValue size;
private final TimeValue expire;
private final ICache<Key, BytesReference> cache;

IndicesRequestCache(Settings settings) {
IndicesRequestCache(Settings settings, IndicesService indicesService) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
Expand All @@ -128,36 +144,43 @@ public final class IndicesRequestCache implements StoreAwareCacheEventListener<I
if (expire != null) {
openSearchOnHeapCacheBuilder.setExpireAfterAccess(expire);
}
openSearchOnHeapCacheBuilder.setEventListener(this);
cache = openSearchOnHeapCacheBuilder.build();
// if (cacheTypeInString == null || cacheTypeInString.isBlank()) {
// openSearchOnHeapCacheBuilder.setEventListener(this);
// cache = openSearchOnHeapCacheBuilder.build();
// } else {
// CacheType cacheType;
// try {
// cacheType = CacheType.valueOf(cacheTypeInString);
// } catch (IllegalArgumentException ex) {
// openSearchOnHeapCacheBuilder.setEventListener(this);
// cache = openSearchOnHeapCacheBuilder.build();
// return;
// }
// switch (cacheType) {
// case ON_HEAP:
// openSearchOnHeapCacheBuilder.setEventListener(this);
// cache = openSearchOnHeapCacheBuilder.build();
// break;
// case TIERED:
// cache =
// new TieredSpilloverCache.Builder<Key, BytesReference>()
// .setOnHeapCacheBuilder(openSearchOnHeapCacheBuilder)
// .setListener(this)
// .build();
// default:
// throw new IllegalArgumentException("Unknown cache type");
//
// }
// }
if (cacheTypeInString == null || cacheTypeInString.isBlank()) {
openSearchOnHeapCacheBuilder.setEventListener(this);
cache = openSearchOnHeapCacheBuilder.build();
} else {
CacheType cacheType;
try {
cacheType = CacheType.valueOf(cacheTypeInString);
} catch (IllegalArgumentException ex) {
openSearchOnHeapCacheBuilder.setEventListener(this);
cache = openSearchOnHeapCacheBuilder.build();
return;
}
switch (cacheType) {
case ON_HEAP:
openSearchOnHeapCacheBuilder.setEventListener(this);
cache = openSearchOnHeapCacheBuilder.build();
break;
case TIERED:
StoreAwareCacheBuilder<Key, BytesReference> ehCacheDiskBuilder =
new EhCacheDiskCache.Builder<Key, BytesReference>()
.setDiskCacheAlias("ehCache")
.setKeyType(Key.class)
.setValueType(BytesReference.class)
.setStoragePath(indicesService.getIndicesDiskCachePath("request_cache"))
.setSettings(settings)
.setSettingPrefix("indices.requests.cache.tier")
.setThreadPoolAlias("test")
.setMaximumWeightInBytes(INDICES_REQUEST_CACHE_DISK_SIZE.get(settings).getBytes());
cache = new TieredSpilloverCache.Builder<Key, BytesReference>().setOnHeapCacheBuilder(openSearchOnHeapCacheBuilder)
.setOnDiskCacheBuilder(ehCacheDiskBuilder)
.setListener(this)
.build();
default:
throw new IllegalArgumentException("Unknown cache type");

}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public IndicesService(
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
this.analysisRegistry = analysisRegistry;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indicesRequestCache = new IndicesRequestCache(settings);
this.indicesRequestCache = new IndicesRequestCache(settings, this);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -1717,6 +1717,10 @@ public ByteSizeValue getTotalIndexingBufferBytes() {
return indexingMemoryController.indexingBufferSize();
}

public String getIndicesDiskCachePath(String indicesCache) {
return nodeEnv.resolveIndicesDiskCachePath(indicesCache).toString();
}

/**
* Cache something calculated at the shard level.
* @param shard the shard this item is part of
Expand Down
Loading

0 comments on commit a18536d

Please sign in to comment.