Skip to content

Commit

Permalink
[Tiered Caching] Serializers for ehcache (opensearch-project#12709)
Browse files Browse the repository at this point in the history
Adds serializers and integrates them into ehcache disk cache

---------

Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
peteralfonsi and Peter Alfonsi authored Mar 18, 2024
1 parent 1f5df54 commit 21b28f2
Show file tree
Hide file tree
Showing 12 changed files with 607 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709))
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand Down Expand Up @@ -106,8 +107,11 @@ public MockDiskCacheFactory(long delay, int maxSize) {
}

@Override
@SuppressWarnings({ "unchecked" })
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
return new Builder<K, V>().setMaxSize(maxSize)
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
.setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.build();
Expand All @@ -123,6 +127,8 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

int maxSize;
long delay;
Serializer<K, byte[]> keySerializer;
Serializer<V, byte[]> valueSerializer;

@Override
public ICache<K, V> build() {
Expand All @@ -138,5 +144,16 @@ public Builder<K, V> setDeliberateDelay(long millis) {
this.delay = millis;
return this;
}

public Builder<K, V> setKeySerializer(Serializer<K, byte[]> keySerializer) {
this.keySerializer = keySerializer;
return this;
}

public Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,33 @@
package org.opensearch.cache.store.disk;

import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand All @@ -51,6 +60,8 @@ public void testBasicGetAndPut() throws IOException {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -89,6 +100,8 @@ public void testBasicGetAndPutUsingFactory() throws IOException {
new CacheConfig.Builder<String, String>().setValueType(String.class)
.setKeyType(String.class)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -149,6 +162,8 @@ public void testConcurrentPut() throws Exception {
.setIsEventListenerModeSync(true) // For accurate count
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -194,6 +209,8 @@ public void testEhcacheParallelGets() throws Exception {
.setIsEventListenerModeSync(true) // For accurate count
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -237,6 +254,8 @@ public void testEhcacheKeyIterator() throws Exception {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -274,6 +293,8 @@ public void testEvictions() throws Exception {
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -304,6 +325,8 @@ public void testComputeIfAbsentConcurrently() throws Exception {
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -373,6 +396,8 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -430,6 +455,8 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception {
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -491,6 +518,8 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -525,6 +554,50 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {

}

public void testBasicGetAndPutBytesReference() throws Exception {
Settings settings = Settings.builder().build();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, BytesReference> ehCacheDiskCachingTier = new EhcacheDiskCache.Builder<String, BytesReference>()
.setThreadPoolAlias("ehcacheTest")
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setKeySerializer(new StringSerializer())
.setValueSerializer(new BytesReferenceSerializer())
.setKeyType(String.class)
.setValueType(BytesReference.class)
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setRemovalListener(new MockRemovalListener<>())
.build();
int randomKeys = randomIntBetween(10, 100);
int valueLength = 100;
Random rand = Randomness.get();
Map<String, BytesReference> keyValueMap = new HashMap<>();
for (int i = 0; i < randomKeys; i++) {
byte[] valueBytes = new byte[valueLength];
rand.nextBytes(valueBytes);
keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes));

// Test a non-BytesArray implementation of BytesReference.
byte[] compositeBytes1 = new byte[valueLength];
byte[] compositeBytes2 = new byte[valueLength];
rand.nextBytes(compositeBytes1);
rand.nextBytes(compositeBytes2);
BytesReference composite = CompositeBytesReference.of(new BytesArray(compositeBytes1), new BytesArray(compositeBytes2));
keyValueMap.put(UUID.randomUUID().toString(), composite);
}
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
BytesReference value = ehCacheDiskCachingTier.get(entry.getKey());
assertEquals(entry.getValue(), value);
}
ehCacheDiskCachingTier.close();
}
}

private static String generateRandomString(int length) {
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder randomString = new StringBuilder(length);
Expand All @@ -546,4 +619,25 @@ public void onRemoval(RemovalNotification<K, V> notification) {
evictionMetric.inc();
}
}

static class StringSerializer implements Serializer<String, byte[]> {
private final Charset charset = StandardCharsets.UTF_8;

@Override
public byte[] serialize(String object) {
return object.getBytes(charset);
}

@Override
public String deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new String(bytes, charset);
}

public boolean equals(String object, byte[] bytes) {
return object.equals(deserialize(bytes));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.serializer;

import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;

import java.util.Arrays;

/**
* A serializer which transforms BytesReference to byte[].
* The type of BytesReference is NOT preserved after deserialization, but nothing in opensearch should care.
*/
public class BytesReferenceSerializer implements Serializer<BytesReference, byte[]> {
// This class does not get passed to ehcache itself, so it's not required that classes match after deserialization.

public BytesReferenceSerializer() {}

@Override
public byte[] serialize(BytesReference object) {
return BytesReference.toBytes(object);
}

@Override
public BytesReference deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new BytesArray(bytes);
}

@Override
public boolean equals(BytesReference object, byte[] bytes) {
return Arrays.equals(serialize(object), bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.serializer;

/**
* Defines an interface for serializers, to be used by pluggable caches.
* T is the class of the original object, and U is the serialized class.
*/
public interface Serializer<T, U> {
/**
* Serializes an object.
* @param object A non-serialized object.
* @return The serialized representation of the object.
*/
U serialize(T object);

/**
* Deserializes bytes into an object.
* @param bytes The serialized representation.
* @return The original object.
*/
T deserialize(U bytes);

/**
* Compares an object to a serialized representation of an object.
* @param object A non-serialized objet
* @param bytes Serialized representation of an object
* @return true if representing the same object, false if not
*/
boolean equals(T object, U bytes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/** A package for serializers used in caches. */
package org.opensearch.common.cache.serializer;
Loading

0 comments on commit 21b28f2

Please sign in to comment.