Skip to content

Commit

Permalink
[Bugfix] [Tiered Caching] Fixes issues when integrating tiered cache …
Browse files Browse the repository at this point in the history
…with disk cache (opensearch-project#13784)



---------

Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
2 people authored and parv0201 committed Jun 10, 2024
1 parent d1d7c08 commit d353d97
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setKeySerializer(builder.cacheConfig.getKeySerializer())
.setValueSerializer(builder.cacheConfig.getValueSerializer())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setStatsTrackingEnabled(false)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnable
@Override
@SuppressWarnings({ "unchecked" })
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
// As we can't directly IT with the tiered cache and ehcache, check that we receive non-null serializers, as an ehcache disk
// cache would require.
assert config.getKeySerializer() != null;
assert config.getValueSerializer() != null;
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
.setMaxSize(maxSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
Expand All @@ -32,6 +33,8 @@
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -166,6 +169,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
Expand Down Expand Up @@ -318,6 +323,8 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setDimensionNames(dimensionNames)
.setSettings(
Settings.builder()
Expand Down Expand Up @@ -830,6 +837,8 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio
.setKeyType(String.class)
.setWeigher((k, v) -> 150)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -1014,6 +1023,8 @@ public void testTookTimePolicyFromFactory() throws Exception {
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setMaxSizeInBytes(onHeapCacheSize * keyValueSize)
.setDimensionNames(dimensionNames)
Expand Down Expand Up @@ -1415,6 +1426,8 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -1479,4 +1492,26 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache<?, ?> t
}
return snapshot;
}

// Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin
static class StringSerializer implements Serializer<String, byte[]> {
private final Charset charset = StandardCharsets.UTF_8;

@Override
public byte[] serialize(String object) {
return object.getBytes(charset);
}

@Override
public String deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new String(bytes, charset);
}

public boolean equals(String object, byte[] bytes) {
return object.equals(deserialize(bytes));
}
}
}
18 changes: 4 additions & 14 deletions plugins/cache-ehcache/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ versions << [

dependencies {
api "org.ehcache:ehcache:${versions.ehcache}"
api "org.slf4j:slf4j-api:${versions.slf4j}"
}

thirdPartyAudit {
Expand Down Expand Up @@ -78,10 +79,9 @@ thirdPartyAudit {
'org.osgi.framework.BundleActivator',
'org.osgi.framework.BundleContext',
'org.osgi.framework.ServiceReference',
'org.slf4j.Logger',
'org.slf4j.LoggerFactory',
'org.slf4j.Marker',
'org.slf4j.event.Level'
'org.slf4j.impl.StaticLoggerBinder',
'org.slf4j.impl.StaticMDCBinder',
'org.slf4j.impl.StaticMarkerBinder'
)
}

Expand All @@ -90,13 +90,3 @@ tasks.named("bundlePlugin").configure {
into 'config'
}
}

test {
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}

internalClusterTest {
// TODO: Remove this later once we have a way.
systemProperty 'tests.security.manager', 'false'
}
1 change: 1 addition & 0 deletions plugins/cache-ehcache/licenses/slf4j-api-1.7.36.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
6c62681a2f655b49963a5983b8b0950a6120ae14
21 changes: 21 additions & 0 deletions plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Copyright (c) 2004-2022 QOS.ch
All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
Expand Down Expand Up @@ -175,57 +177,60 @@ private EhcacheDiskCache(Builder<K, V> builder) {

@SuppressWarnings({ "rawtypes" })
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
try {
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
// Creating the cache requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
try {
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
)
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
// before V hits ehcache.
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;
}
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;
}
});
}

private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
Expand All @@ -252,25 +257,28 @@ Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> getCompletableFutur
@SuppressForbidden(reason = "Ehcache uses File.io")
private PersistentCacheManager buildCacheManager() {
// In case we use multiple ehCaches, we can define this cache manager at a global level.
return CacheManagerBuilder.newCacheManagerBuilder()
.with(CacheManagerBuilder.persistence(new File(storagePath)))

.using(
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
// like event listeners
.pool(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MIN_THREADS_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
.get(settings)
)
.build()
)
.build(true);
// Creating the cache manager also requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<PersistentCacheManager>) () -> {
return CacheManagerBuilder.newCacheManagerBuilder()
.with(CacheManagerBuilder.persistence(new File(storagePath)))

.using(
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
// like event listeners
.pool(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MIN_THREADS_KEY)
.get(settings),
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
.get(settings)
)
.build()
)
.build(true);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
grant {
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc";
permission java.lang.RuntimePermission "createClassLoader";
permission java.lang.RuntimePermission "accessDeclaredMembers";
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
permission java.lang.RuntimePermission "getClassLoader";
};

0 comments on commit d353d97

Please sign in to comment.