Skip to content

Commit

Permalink
Fixed issue where multiple mock nodes on one JVM (in IT tests) would …
Browse files Browse the repository at this point in the history
…cause initialization errors in the cache manager
  • Loading branch information
Peter Alfonsi committed Oct 18, 2023
1 parent 7155afd commit 8510637
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,35 @@
import org.opensearch.common.cache.RemovalListener;
import org.ehcache.Cache;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;

public class EhcacheDiskCachingTier implements DiskCachingTier<IndicesRequestCache.Key, BytesReference>, RemovalListener<IndicesRequestCache.Key, BytesReference> {

public static PersistentCacheManager cacheManager;
public static HashMap<String, PersistentCacheManager> cacheManagers = new HashMap<>();
// Because of the way test cases are set up, each node may try to instantiate several disk caching tiers.
// Only one of them will be used, but there will be initialization errors when multiple cache managers try to
// use the same file path and create/get caches with the same alias. We resolve this with a static reference
// to a cache manager, which is populated if it is null and reused if it is not.
// (See https://stackoverflow.com/questions/53756412/ehcache-org-ehcache-statetransitionexception-persistence-directory-already-lo)
// To deal with IT cases, we need to create a manager per node, as otherwise nodes will try to reuse the same manager,
// so we get the correct cache manager by looking up the node ID in this map.
// I don't think any of this can happen in production, because nodes shouldn't share a JVM,
// and they should only instantiate their services once? But it's best to resolve it anyway.

private PersistentCacheManager cacheManager; // This is the manager this tier will actually use
private Cache<EhcacheKey, BytesReference> cache;
public final static String DISK_CACHE_FP = "disk_cache_tier"; // Placeholder. this should probably be defined somewhere else, since we need to change security.policy based on its value
public final static String BASE_DISK_CACHE_FP = "disk_cache_tier";
// Placeholder. this should probably be defined somewhere else, since we need to change security.policy based on its value
// To accomodate test setups, where multiple nodes may exist on the same filesystem, we add the node ID to the end of this
// These will be subfolders of BASE_DISK_CACHE_FP
private final String diskCacheFP; // the one to use for this node
private RemovalListener<IndicesRequestCache.Key, BytesReference> removalListener;
private ExponentiallyWeightedMovingAverage getTimeMillisEWMA;
private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in OpenSearch
Expand All @@ -42,18 +59,23 @@ public class EhcacheDiskCachingTier implements DiskCachingTier<IndicesRequestCac
private CounterMetric count; // number of entries in cache
private final EhcacheEventListener listener;
private final IndicesRequestCache indicesRequestCache; // only used to create new Keys
private final String nodeId;
// private RBMIntKeyLookupStore keystore;
// private CacheTierPolicy[] policies;
// private IndicesRequestCacheDiskTierPolicy policy;

public EhcacheDiskCachingTier(
long maxWeightInBytes,
long maxKeystoreWeightInBytes,
IndicesRequestCache indicesRequestCache
IndicesRequestCache indicesRequestCache,
String nodeId
) {
this.count = new CounterMetric();
this.listener = new EhcacheEventListener(this, this);
this.indicesRequestCache = indicesRequestCache;
this.nodeId = nodeId;
this.diskCacheFP = PathUtils.get(BASE_DISK_CACHE_FP, nodeId).toString();
// I know this function warns it shouldn't often be used, we can fix it to use the roots once we pick a final FP

getManager();
try {
Expand All @@ -71,19 +93,20 @@ public EhcacheDiskCachingTier(
// this.policy = new IndicesRequestCacheDiskTierPolicy(this.policies, true);
}

public static void getManager() {
public void getManager() {
// based on https://stackoverflow.com/questions/53756412/ehcache-org-ehcache-statetransitionexception-persistence-directory-already-lo
// resolving double-initialization issue when using OpenSearchSingleNodeTestCase
if (cacheManager != null) {
PersistentCacheManager oldCacheManager = cacheManagers.get(nodeId);
if (oldCacheManager != null) {
try {
try {
cacheManager.close();
oldCacheManager.close();
} catch (IllegalStateException e) {
System.out.println("Cache was uninitialized, skipping close() and moving to destroy()");
}
cacheManager.destroy();
oldCacheManager.destroy();
} catch (Exception e) {
System.out.println("Was unable to destroy cache manager");
System.out.println("Was unable to destroy existing cache manager");
e.printStackTrace();
// actual logging later
}
Expand All @@ -92,10 +115,13 @@ public static void getManager() {
.defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS)
.build();

cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
cacheManagers.put(nodeId,
CacheManagerBuilder.newCacheManagerBuilder()
.using(threadConfig)
.with(CacheManagerBuilder.persistence(DISK_CACHE_FP)
).build(true);
.with(CacheManagerBuilder.persistence(diskCacheFP)
).build(true)
);
this.cacheManager = cacheManagers.get(nodeId);
}

private void createCache(long maxWeightInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic

int diskTierWeight = 100 * 1048576; // 100 MB, for testing only
EhcacheDiskCachingTier diskCachingTier;
diskCachingTier = new EhcacheDiskCachingTier(diskTierWeight, 0, this);
diskCachingTier = new EhcacheDiskCachingTier(diskTierWeight, 0, this, indicesService.getNodeId());
tieredCacheHandler = new TieredCacheSpilloverStrategyHandler.Builder<Key, BytesReference>().setOnHeapCachingTier(
openSearchOnHeapCache
).setOnDiskCachingTier(diskCachingTier).setTieredCacheEventListener(this).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,10 @@ public boolean allPendingDanglingIndicesWritten() {
|| (danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0);
}

public String getNodeId() {
return nodeEnv.nodeId();
}

/**
* Validates the cluster default index refresh interval.
*
Expand Down

0 comments on commit 8510637

Please sign in to comment.