Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Cache tier policies #12542

Merged
merged 24 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642))
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.cache.common.policy;

import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
* The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed
* to decide whether to admit the value.
* @param <V> The type of data consumed by test().
*/
public class TookTimePolicy<V> implements Predicate<V> {
/**
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
*/
private final TimeValue threshold;

/**
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
*/
private final Function<V, CachedQueryResult.PolicyValues> cachedResultParser;

/**
* Constructs a took time policy.
* @param threshold the threshold
* @param cachedResultParser the function providing policy values
*/
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
if (threshold.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
}
sohami marked this conversation as resolved.
Show resolved Hide resolved
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
}

/**
* Check whether to admit data.
* @param data the input argument
* @return whether to admit the data
*/
public boolean test(V data) {
long tookTimeNanos;
try {
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos();
} catch (Exception e) {
// If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data
return false;
}

TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos);
return tookTime.compareTo(threshold) >= 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** A package for policies controlling what can enter caches. */
package org.opensearch.cache.common.policy;
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,31 @@

package org.opensearch.cache.common.tier;

import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
Expand All @@ -52,6 +57,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final List<Predicate<V>> policies;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Expand All @@ -63,21 +69,27 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
if (evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
}
}
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix
// coming soon
.build(),
builder.cacheType,
builder.cacheFactories

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

// Package private for testing
Expand Down Expand Up @@ -192,6 +204,15 @@ private Function<K, V> getValueFromTieredCache() {
};
}

boolean evaluatePolicies(V value) {
for (Predicate<V> policy : policies) {
if (!policy.test(value)) {
return false;
}
}
return true;
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down Expand Up @@ -231,11 +252,21 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
);
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);

TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
.get(settings);
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
config.getCachedResultParser(),
"Cached result parser fn can't be null"
);

return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
.build();
}

Expand All @@ -257,6 +288,7 @@ public static class Builder<K, V> {
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
private final ArrayList<Predicate<V>> policies = new ArrayList<>();

/**
* Default constructor
Expand Down Expand Up @@ -323,6 +355,26 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
return this;
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* @param policy the policy
* @return builder
*/
public Builder<K, V> addPolicy(Predicate<V> policy) {
this.policies.add(policy);
return this;
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* @param policies the policies
* @return builder
*/
public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
this.policies.addAll(policies);
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()

Check warning on line 56 in modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java

View check run for this annotation

Codecov / codecov/patch

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java#L54-L56

Added lines #L54 - L56 were not covered by tests
)
);
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.opensearch.cache.common.tier;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.concurrent.TimeUnit;

import static org.opensearch.common.settings.Setting.Property.NodeScope;

Expand Down Expand Up @@ -36,6 +39,21 @@ public class TieredSpilloverCacheSettings {
(key) -> Setting.simpleString(key, "", NodeScope)
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
TimeValue.ZERO, // Minimum value for this setting
NodeScope
)
);
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
// Will be tuned further with future benchmarks.

/**
* Default constructor
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.policy;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Random;
import java.util.function.Function;

public class TookTimePolicyTests extends OpenSearchTestCase {
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
try {
return CachedQueryResult.getPolicyValues(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
};

private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
return new TookTimePolicy<>(threshold, transformationFunction);
}

public void testTookTimePolicy() throws Exception {
double threshMillis = 10;
long shortMillis = (long) (0.9 * threshMillis);
long longMillis = (long) (1.5 * threshMillis);
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(new TimeValue((long) threshMillis));
BytesReference shortTime = getValidPolicyInput(shortMillis * 1000000);
BytesReference longTime = getValidPolicyInput(longMillis * 1000000);

boolean shortResult = tookTimePolicy.test(shortTime);
assertFalse(shortResult);
boolean longResult = tookTimePolicy.test(longTime);
assertTrue(longResult);

TookTimePolicy<BytesReference> disabledPolicy = getTookTimePolicy(TimeValue.ZERO);
shortResult = disabledPolicy.test(shortTime);
assertTrue(shortResult);
longResult = disabledPolicy.test(longTime);
assertTrue(longResult);
}

public void testNegativeOneInput() throws Exception {
// PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(TimeValue.ZERO);
BytesReference minusOne = getValidPolicyInput(-1L);
assertFalse(tookTimePolicy.test(minusOne));
}

public void testInvalidThreshold() throws Exception {
assertThrows(IllegalArgumentException.class, () -> getTookTimePolicy(TimeValue.MINUS_ONE));
}

private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException {
// When it's used in the cache, the policy will receive BytesReferences which come from
// serializing a CachedQueryResult.
CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos);
BytesStreamOutput out = new BytesStreamOutput();
cachedQueryResult.writeToNoId(out);
return out.bytes();
}

private QuerySearchResult getQSR() {
// We can't mock the QSR with mockito because the class is final. Construct a real one
QuerySearchResult mockQSR = new QuerySearchResult();

// duplicated from DfsQueryPhaseTests.java
mockQSR.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }),
2.0F
),
new DocValueFormat[0]
);
return mockQSR;
}

private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException {
Random rand = Randomness.get();
byte[] bytes = new byte[numBytes];
rand.nextBytes(bytes);
out.writeBytes(bytes);
}
}
Loading
Loading