diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java index 2b6a5b4ee6867..dc157681be6fa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java @@ -54,7 +54,7 @@ public static Map prepareRequestMap(String[] indices, ); for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) { final ShardId shardId = new ShardId(index, shardIdNum); - shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + shardIdShardAttributesMap.put(shardId, new ShardAttributes(customDataPath)); } } return shardIdShardAttributesMap; diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java new file mode 100644 index 0000000000000..4f39a39cea678 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -0,0 +1,243 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.logging.Loggers; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +import reactor.util.annotation.NonNull; + +/** + * Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch + * part using the base class {@link AsyncShardFetch}. Other functionalities needed for a batch are only written here. + * This separation also takes care of the extra generic type V which is only needed for batch + * transport actions like {@link TransportNodesListGatewayStartedShardsBatch} and + * {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}. + * + * @param Response type of the transport action. + * @param Data type of shard level response. + * + * @opensearch.internal + */ +public abstract class AsyncShardBatchFetch extends AsyncShardFetch { + + @SuppressWarnings("unchecked") + AsyncShardBatchFetch( + Logger logger, + String type, + Map shardAttributesMap, + AsyncShardFetch.Lister, T> action, + String batchId, + Class clazz, + V emptyShardResponse, + Predicate emptyShardResponsePredicate, + ShardBatchResponseFactory responseFactory + ) { + super( + logger, + type, + shardAttributesMap, + action, + batchId, + new ShardBatchCache<>( + logger, + type, + shardAttributesMap, + "BatchID=[" + batchId + "]", + clazz, + emptyShardResponse, + emptyShardResponsePredicate, + responseFactory + ) + ); + } + + /** + * Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's + * assigned or failed. + * + * @param shardId shardId to be removed from the batch. + */ + public synchronized void clearShard(ShardId shardId) { + this.shardAttributesMap.remove(shardId); + this.cache.deleteShard(shardId); + } + + /** + * Cache implementation of transport actions returning batch of shards related data in the response. + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or + * {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching + * approach. This cache class is not thread safe, all of its methods are being called from + * {@link AsyncShardFetch} class which has synchronized blocks present to handle multiple threads. + * + * @param Response type of transport action. + * @param Data type of shard level response. + */ + static class ShardBatchCache extends AsyncShardFetchCache { + private final Map> cache; + private final Map shardIdToArray; + private final int batchSize; + private final Class shardResponseClass; + private final ShardBatchResponseFactory responseFactory; + private final V emptyResponse; + private final Predicate emptyShardResponsePredicate; + private final Logger logger; + + public ShardBatchCache( + Logger logger, + String type, + Map shardAttributesMap, + String logKey, + Class clazz, + V emptyResponse, + Predicate emptyShardResponsePredicate, + ShardBatchResponseFactory responseFactory + ) { + super(Loggers.getLogger(logger, "_" + logKey), type); + this.batchSize = shardAttributesMap.size(); + this.emptyShardResponsePredicate = emptyShardResponsePredicate; + cache = new HashMap<>(); + shardIdToArray = new HashMap<>(); + fillShardIdKeys(shardAttributesMap.keySet()); + this.shardResponseClass = clazz; + this.emptyResponse = emptyResponse; + this.logger = logger; + this.responseFactory = responseFactory; + } + + @Override + @NonNull + public Map getCache() { + return cache; + } + + @Override + public void deleteShard(ShardId shardId) { + if (shardIdToArray.containsKey(shardId)) { + Integer shardIdIndex = shardIdToArray.remove(shardId); + for (String nodeId : cache.keySet()) { + cache.get(nodeId).clearShard(shardIdIndex); + } + } + } + + @Override + public void initData(DiscoveryNode node) { + cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate)); + } + + /** + * Put the response received from data nodes into the cache. + * Get shard level data from batch, then filter out if any shards received failures. + * After that complete storing the data at node level and mark fetching as done. + * + * @param node node from which we got the response. + * @param response shard metadata coming from node. + */ + @Override + public void putData(DiscoveryNode node, T response) { + NodeEntry nodeEntry = cache.get(node.getId()); + Map batchResponse = responseFactory.getShardBatchData(response); + nodeEntry.doneFetching(batchResponse, shardIdToArray); + } + + @Override + public T getData(DiscoveryNode node) { + return this.responseFactory.getNewResponse(node, getBatchData(cache.get(node.getId()))); + } + + private HashMap getBatchData(NodeEntry nodeEntry) { + V[] nodeShardEntries = nodeEntry.getData(); + boolean[] emptyResponses = nodeEntry.getEmptyShardResponse(); + HashMap shardData = new HashMap<>(); + for (Map.Entry shardIdEntry : shardIdToArray.entrySet()) { + ShardId shardId = shardIdEntry.getKey(); + Integer arrIndex = shardIdEntry.getValue(); + if (emptyResponses[arrIndex]) { + shardData.put(shardId, emptyResponse); + } else if (nodeShardEntries[arrIndex] != null) { + // ignore null responses here + shardData.put(shardId, nodeShardEntries[arrIndex]); + } + } + return shardData; + } + + private void fillShardIdKeys(Set shardIds) { + int shardIdIndex = 0; + for (ShardId shardId : shardIds) { + this.shardIdToArray.putIfAbsent(shardId, shardIdIndex++); + } + } + + /** + * A node entry, holding the state of the fetched data for a specific shard + * for a giving node. + */ + static class NodeEntry extends BaseNodeEntry { + private final V[] shardData; + private final boolean[] emptyShardResponse; // we can not rely on null entries of the shardData array, + // those null entries means that we need to ignore those entries. Empty responses on the other hand are + // actually needed in allocation/explain API response. So instead of storing full empty response object + // in cache, it's better to just store a boolean and create that object on the fly just before + // decision-making. + private final Predicate emptyShardResponsePredicate; + + NodeEntry(String nodeId, Class clazz, int batchSize, Predicate emptyShardResponsePredicate) { + super(nodeId); + this.shardData = (V[]) Array.newInstance(clazz, batchSize); + this.emptyShardResponse = new boolean[batchSize]; + this.emptyShardResponsePredicate = emptyShardResponsePredicate; + } + + void doneFetching(Map shardDataFromNode, Map shardIdKey) { + fillShardData(shardDataFromNode, shardIdKey); + super.doneFetching(); + } + + void clearShard(Integer shardIdIndex) { + this.shardData[shardIdIndex] = null; + emptyShardResponse[shardIdIndex] = false; + } + + V[] getData() { + return this.shardData; + } + + boolean[] getEmptyShardResponse() { + return emptyShardResponse; + } + + private void fillShardData(Map shardDataFromNode, Map shardIdKey) { + for (Map.Entry shardData : shardDataFromNode.entrySet()) { + if (shardData.getValue() != null) { + ShardId shardId = shardData.getKey(); + if (emptyShardResponsePredicate.test(shardData.getValue())) { + this.emptyShardResponse[shardIdKey.get(shardId)] = true; + this.shardData[shardIdKey.get(shardId)] = null; + } else { + this.shardData[shardIdKey.get(shardId)] = shardData.getValue(); + } + } + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 3d129d4794a10..b664dd573ce67 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -82,10 +82,10 @@ public interface Lister, N protected final String type; protected final Map shardAttributesMap; private final Lister, T> action; - private final AsyncShardFetchCache cache; + protected final AsyncShardFetchCache cache; private final AtomicLong round = new AtomicLong(); private boolean closed; - private final String reroutingKey; + final String reroutingKey; private final Map> shardToIgnoreNodes = new HashMap<>(); @SuppressWarnings("unchecked") @@ -99,7 +99,7 @@ protected AsyncShardFetch( this.logger = logger; this.type = type; shardAttributesMap = new HashMap<>(); - shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + shardAttributesMap.put(shardId, new ShardAttributes(customDataPath)); this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; cache = new ShardCache<>(logger, reroutingKey, type); @@ -120,14 +120,15 @@ protected AsyncShardFetch( String type, Map shardAttributesMap, Lister, T> action, - String batchId + String batchId, + AsyncShardFetchCache cache ) { this.logger = logger; this.type = type; this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; this.reroutingKey = "BatchID=[" + batchId + "]"; - cache = new ShardCache<>(logger, reroutingKey, type); + this.cache = cache; } @Override diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index 3140ceef4f3ee..2a4e6181467b0 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -48,6 +48,7 @@ * @opensearch.internal */ public abstract class AsyncShardFetchCache { + private final Logger logger; private final String type; diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 8d222903b6f29..1979f33484d49 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.gateway.AsyncShardFetch.FetchResult; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; @@ -132,9 +133,7 @@ private static List adaptToNodeShardStates( // build data for a shard from all the nodes nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> { - TransportNodesGatewayStartedShardHelper.GatewayStartedShard shardData = nodeGatewayStartedShardsBatch - .getNodeGatewayStartedShardsBatch() - .get(unassignedShard.shardId()); + GatewayStartedShard shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId()); nodeShardStates.add( new NodeGatewayStartedShard( shardData.allocationId(), diff --git a/server/src/main/java/org/opensearch/gateway/ShardBatchResponseFactory.java b/server/src/main/java/org/opensearch/gateway/ShardBatchResponseFactory.java new file mode 100644 index 0000000000000..4b85ef995f1e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/ShardBatchResponseFactory.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; + +import java.util.Map; + +/** + * A factory class to create new responses of batch transport actions like + * {@link TransportNodesListGatewayStartedShardsBatch} or {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} + * + * @param Node level response returned by batch transport actions. + * @param Shard level metadata returned by batch transport actions. + */ +public class ShardBatchResponseFactory { + private final boolean primary; + + public ShardBatchResponseFactory(boolean primary) { + this.primary = primary; + } + + public T getNewResponse(DiscoveryNode node, Map shardData) { + if (primary) { + return (T) new NodeGatewayStartedShardsBatch(node, (Map) shardData); + } else { + return (T) new NodeStoreFilesMetadataBatch(node, (Map) shardData); + } + } + + public Map getShardBatchData(T response) { + if (primary) { + return (Map) ((NodeGatewayStartedShardsBatch) response).getNodeGatewayStartedShardsBatch(); + } else { + return (Map) ((NodeStoreFilesMetadataBatch) response).getNodeStoreFilesMetadataBatch(); + } + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java index 27cce76b1b694..2ddae1d8410c9 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -42,6 +42,8 @@ * @opensearch.internal */ public class TransportNodesGatewayStartedShardHelper { + public static final String INDEX_NOT_FOUND = "node doesn't have meta data for index"; + public static GatewayStartedShard getShardInfoOnLocalNode( Logger logger, final ShardId shardId, @@ -72,7 +74,7 @@ public static GatewayStartedShard getShardInfoOnLocalNode( customDataPath = new IndexSettings(metadata, settings).customDataPath(); } else { logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + throw new OpenSearchException(INDEX_NOT_FOUND + " " + shardId.getIndex()); } } // we don't have an open shard on the store, validate the files on disk are openable @@ -230,6 +232,13 @@ public String toString() { buf.append("]"); return buf.toString(); } + + public static boolean isEmpty(GatewayStartedShard gatewayStartedShard) { + return gatewayStartedShard.allocationId() == null + && gatewayStartedShard.primary() == false + && gatewayStartedShard.storeException() == null + && gatewayStartedShard.replicationCheckpoint() == null; + } } /** diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java index dc5d85b17bc32..89362988b4d85 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java @@ -8,7 +8,6 @@ package org.opensearch.gateway; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionType; import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.ActionFilters; @@ -27,7 +26,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; -import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.store.ShardAttributes; import org.opensearch.threadpool.ThreadPool; @@ -40,6 +38,8 @@ import java.util.Map; import java.util.Objects; +import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; +import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.INDEX_NOT_FOUND; import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode; /** @@ -136,8 +136,10 @@ protected NodesGatewayStartedShardsBatch newResponse( @Override protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { Map shardsOnNode = new HashMap<>(); - for (ShardAttributes shardAttr : request.shardAttributes.values()) { - final ShardId shardId = shardAttr.getShardId(); + // NOTE : If we ever change this for loop to run in parallel threads, we should re-visit the exception + // handling in AsyncShardBatchFetch class. + for (Map.Entry shardAttr : request.shardAttributes.entrySet()) { + final ShardId shardId = shardAttr.getKey(); try { shardsOnNode.put( shardId, @@ -147,16 +149,19 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { namedXContentRegistry, nodeEnv, indicesService, - shardAttr.getCustomDataPath(), + shardAttr.getValue().getCustomDataPath(), settings, clusterService ) ); } catch (Exception e) { - shardsOnNode.put( - shardId, - new GatewayStartedShard(null, false, null, new OpenSearchException("failed to load started shards", e)) - ); + // should return null in case of known exceptions being returned from getShardInfoOnLocalNode method. + if (e instanceof IllegalStateException || e.getMessage().contains(INDEX_NOT_FOUND) || e instanceof IOException) { + shardsOnNode.put(shardId, null); + } else { + // return actual exception as it is for unknown exceptions + shardsOnNode.put(shardId, new GatewayStartedShard(null, false, null, e)); + } } } return new NodeGatewayStartedShardsBatch(clusterService.localNode(), shardsOnNode); @@ -264,13 +269,26 @@ public Map getNodeGatewayStartedShardsBatch() { public NodeGatewayStartedShardsBatch(StreamInput in) throws IOException { super(in); - this.nodeGatewayStartedShardsBatch = in.readMap(ShardId::new, GatewayStartedShard::new); + this.nodeGatewayStartedShardsBatch = in.readMap(ShardId::new, i -> { + if (i.readBoolean()) { + return new GatewayStartedShard(i); + } else { + return null; + } + }); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeMap(nodeGatewayStartedShardsBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + out.writeMap(nodeGatewayStartedShardsBatch, (o, k) -> k.writeTo(o), (o, v) -> { + if (v != null) { + o.writeBoolean(true); + v.writeTo(o); + } else { + o.writeBoolean(false); + } + }); } public NodeGatewayStartedShardsBatch(DiscoveryNode node, Map nodeGatewayStartedShardsBatch) { diff --git a/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java index 4ef4e91f7af8c..999acd02030ab 100644 --- a/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java +++ b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java @@ -12,36 +12,27 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.gateway.AsyncShardFetch; import java.io.IOException; /** - * This class contains information about the shard that needs to be sent as part of request in Transport Action implementing - * {@link AsyncShardFetch.Lister} to fetch shard information in async manner + * This class contains Attributes related to Shards that are necessary for making the + * {@link org.opensearch.gateway.TransportNodesListGatewayStartedShards} transport requests * * @opensearch.internal */ public class ShardAttributes implements Writeable { - private final ShardId shardId; @Nullable private final String customDataPath; - public ShardAttributes(ShardId shardId, String customDataPath) { - this.shardId = shardId; + public ShardAttributes(String customDataPath) { this.customDataPath = customDataPath; } public ShardAttributes(StreamInput in) throws IOException { - shardId = new ShardId(in); customDataPath = in.readString(); } - public ShardId getShardId() { - return shardId; - } - /** * Returns the custom data path that is used to look up information for this shard. * Returns an empty string if no custom data path is used for this index. @@ -53,7 +44,11 @@ public String getCustomDataPath() { } public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); out.writeString(customDataPath); } + + @Override + public String toString() { + return "ShardAttributes{" + ", customDataPath='" + customDataPath + '\'' + '}'; + } } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java index 3f151fe1c5ca0..85d5bff4677ef 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -155,7 +155,7 @@ private Map listStoreMetadata(NodeRequest reque shardStoreMetadataMap.put(shardId, new NodeStoreFilesMetadata(storeFilesMetadata, null)); } catch (Exception e) { // should return null in case of known exceptions being returned from listShardMetadataInternal method. - if (e.getMessage().contains(INDEX_NOT_FOUND)) { + if (e.getMessage().contains(INDEX_NOT_FOUND) || e instanceof IOException) { shardStoreMetadataMap.put(shardId, null); } else { // return actual exception as it is for unknown exceptions @@ -276,6 +276,11 @@ public void writeTo(StreamOutput out) throws IOException { } } + boolean isEmpty(NodeStoreFilesMetadata response) { + return response.storeFilesMetadata() == null + || response.storeFilesMetadata().isEmpty() && response.getStoreFileFetchException() == null; + } + public Exception getStoreFileFetchException() { return storeFileFetchException; } diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 4e5e9c71e1fe4..db97c3ece94ba 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -39,7 +39,6 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.indices.store.ShardAttributes; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -47,7 +46,6 @@ import org.junit.Before; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -86,16 +84,7 @@ public class AsyncShardFetchTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); this.threadPool = new TestThreadPool(getTestName()); - if (randomBoolean()) { - this.test = new TestFetch(threadPool); - } else { - HashMap shardToCustomDataPath = new HashMap<>(); - ShardId shardId0 = new ShardId("index1", "index_uuid1", 0); - ShardId shardId1 = new ShardId("index2", "index_uuid2", 0); - shardToCustomDataPath.put(shardId0, new ShardAttributes(shardId0, "")); - shardToCustomDataPath.put(shardId1, new ShardAttributes(shardId1, "")); - this.test = new TestFetch(threadPool, shardToCustomDataPath); - } + this.test = new TestFetch(threadPool); } @After @@ -414,11 +403,6 @@ static class Entry { this.threadPool = threadPool; } - TestFetch(ThreadPool threadPool, Map shardAttributesMap) { - super(LogManager.getLogger(TestFetch.class), "test", shardAttributesMap, null, "test-batch"); - this.threadPool = threadPool; - } - public void addSimulation(String nodeId, Response response) { simulations.put(nodeId, new Entry(response, null)); } diff --git a/server/src/test/java/org/opensearch/gateway/BatchTestUtil.java b/server/src/test/java/org/opensearch/gateway/BatchTestUtil.java new file mode 100644 index 0000000000000..69f0cfeeb2c7d --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/BatchTestUtil.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.core.index.shard.ShardId; + +import java.util.ArrayList; +import java.util.List; + +public class BatchTestUtil { + public static List setUpShards(int numberOfShards) { + List shards = new ArrayList<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + ShardId shardId = new ShardId("test", "_na_", shardNumber); + shards.add(shardId); + } + return shards; + } +} diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index 4796def2b8902..522ad2a64ea5d 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -29,6 +29,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.Environment; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -289,14 +290,9 @@ public TestBatchAllocator addData( if (data == null) { data = new HashMap<>(); } - Map shardData = Map.of( + Map shardData = Map.of( shardId, - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( - allocationId, - primary, - replicationCheckpoint, - storeException - ) + new GatewayStartedShard(allocationId, primary, replicationCheckpoint, storeException) ); data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); return this; @@ -313,16 +309,8 @@ public TestBatchAllocator addShardData( if (data == null) { data = new HashMap<>(); } - Map shardData = new HashMap<>(); - shardData.put( - shardId, - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( - allocationId, - primary, - replicationCheckpoint, - storeException - ) - ); + Map shardData = new HashMap<>(); + shardData.put(shardId, new GatewayStartedShard(allocationId, primary, replicationCheckpoint, storeException)); if (data.get(node) != null) shardData.putAll(data.get(node).getNodeGatewayStartedShardsBatch()); data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); return this; diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java new file mode 100644 index 0000000000000..1b42a31a4fd84 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; +import org.opensearch.indices.store.ShardAttributes; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ShardBatchCacheTests extends OpenSearchAllocationTestCase { + private static final String BATCH_ID = "b1"; + private final DiscoveryNode node1 = newNode("node1"); + private final DiscoveryNode node2 = newNode("node2"); + // Needs to be enabled once ShardsBatchGatewayAllocator is pushed + // private final Map batchInfo = new HashMap<>(); + private AsyncShardBatchFetch.ShardBatchCache shardCache; + private List shardsInBatch = new ArrayList<>(); + private static final int NUMBER_OF_SHARDS_DEFAULT = 10; + + private enum ResponseType { + NULL, + EMPTY, + FAILURE, + VALID + } + + public void setupShardBatchCache(String batchId, int numberOfShards) { + Map shardAttributesMap = new HashMap<>(); + fillShards(shardAttributesMap, numberOfShards); + this.shardCache = new AsyncShardBatchFetch.ShardBatchCache<>( + logger, + "batch_shards_started", + shardAttributesMap, + "BatchID=[" + batchId + "]", + GatewayStartedShard.class, + new GatewayStartedShard(null, false, null, null), + GatewayStartedShard::isEmpty, + new ShardBatchResponseFactory<>(true) + ); + } + + public void testClearShardCache() { + setupShardBatchCache(BATCH_ID, NUMBER_OF_SHARDS_DEFAULT); + ShardId shard = shardsInBatch.iterator().next(); + this.shardCache.initData(node1); + this.shardCache.markAsFetching(List.of(node1.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.EMPTY))); + assertTrue( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node1).build(), null) + .get(node1) + .getNodeGatewayStartedShardsBatch() + .containsKey(shard) + ); + this.shardCache.deleteShard(shard); + assertFalse( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node1).build(), null) + .get(node1) + .getNodeGatewayStartedShardsBatch() + .containsKey(shard) + ); + } + + public void testGetCacheData() { + setupShardBatchCache(BATCH_ID, NUMBER_OF_SHARDS_DEFAULT); + ShardId shard = shardsInBatch.iterator().next(); + this.shardCache.initData(node1); + this.shardCache.initData(node2); + this.shardCache.markAsFetching(List.of(node1.getId(), node2.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.EMPTY))); + assertTrue( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node1).build(), null) + .get(node1) + .getNodeGatewayStartedShardsBatch() + .containsKey(shard) + ); + assertTrue( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node2).build(), null) + .get(node2) + .getNodeGatewayStartedShardsBatch() + .isEmpty() + ); + } + + public void testInitCacheData() { + setupShardBatchCache(BATCH_ID, NUMBER_OF_SHARDS_DEFAULT); + this.shardCache.initData(node1); + this.shardCache.initData(node2); + assertEquals(2, shardCache.getCache().size()); + + // test getData without fetch + assertTrue(shardCache.getData(node1).getNodeGatewayStartedShardsBatch().isEmpty()); + } + + public void testPutData() { + // test empty and non-empty responses + setupShardBatchCache(BATCH_ID, NUMBER_OF_SHARDS_DEFAULT); + ShardId shard = shardsInBatch.iterator().next(); + this.shardCache.initData(node1); + this.shardCache.initData(node2); + this.shardCache.markAsFetching(List.of(node1.getId(), node2.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.VALID))); + this.shardCache.putData(node2, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.EMPTY))); + + // assert that fetching is done as both node's responses are stored in cache + assertFalse(this.shardCache.getCache().get(node1.getId()).isFetching()); + assertFalse(this.shardCache.getCache().get(node2.getId()).isFetching()); + + Map fetchData = shardCache.getCacheData( + DiscoveryNodes.builder().add(node1).add(node2).build(), + null + ); + assertEquals(2, fetchData.size()); + assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); + assertEquals("alloc-1", fetchData.get(node1).getNodeGatewayStartedShardsBatch().get(shard).allocationId()); + + assertEquals(10, fetchData.get(node2).getNodeGatewayStartedShardsBatch().size()); + assertTrue(GatewayStartedShard.isEmpty(fetchData.get(node2).getNodeGatewayStartedShardsBatch().get(shard))); + + // test GetData after fetch + assertEquals(10, shardCache.getData(node1).getNodeGatewayStartedShardsBatch().size()); + } + + public void testNullResponses() { + setupShardBatchCache(BATCH_ID, NUMBER_OF_SHARDS_DEFAULT); + this.shardCache.initData(node1); + this.shardCache.markAsFetching(List.of(node1.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getPrimaryResponse(shardsInBatch, ResponseType.NULL))); + + Map fetchData = shardCache.getCacheData( + DiscoveryNodes.builder().add(node1).build(), + null + ); + assertTrue(fetchData.get(node1).getNodeGatewayStartedShardsBatch().isEmpty()); + } + + public void testShardsDataWithException() { + setupShardBatchCache(BATCH_ID, NUMBER_OF_SHARDS_DEFAULT); + this.shardCache.initData(node1); + this.shardCache.initData(node2); + this.shardCache.markAsFetching(List.of(node1.getId(), node2.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getFailedPrimaryResponse(shardsInBatch, 5))); + Map fetchData = shardCache.getCacheData( + DiscoveryNodes.builder().add(node1).add(node2).build(), + null + ); + + // assertEquals(5, batchInfo.size()); + assertEquals(2, fetchData.size()); + assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); + assertTrue(fetchData.get(node2).getNodeGatewayStartedShardsBatch().isEmpty()); + } + + private Map getPrimaryResponse(List shards, ResponseType responseType) { + int allocationId = 1; + Map shardData = new HashMap<>(); + for (ShardId shard : shards) { + switch (responseType) { + case NULL: + shardData.put(shard, null); + break; + case EMPTY: + shardData.put(shard, new GatewayStartedShard(null, false, null, null)); + break; + case VALID: + shardData.put(shard, new GatewayStartedShard("alloc-" + allocationId++, false, null, null)); + break; + default: + throw new AssertionError("unknown response type"); + } + } + return shardData; + } + + private Map getFailedPrimaryResponse(List shards, int failedShardsCount) { + int allocationId = 1; + Map shardData = new HashMap<>(); + for (ShardId shard : shards) { + if (failedShardsCount-- > 0) { + shardData.put( + shard, + new GatewayStartedShard("alloc-" + allocationId++, false, null, new OpenSearchRejectedExecutionException()) + ); + } else { + shardData.put(shard, new GatewayStartedShard("alloc-" + allocationId++, false, null, null)); + } + } + return shardData; + } + + private void fillShards(Map shardAttributesMap, int numberOfShards) { + shardsInBatch = BatchTestUtil.setUpShards(numberOfShards); + for (ShardId shardId : shardsInBatch) { + ShardAttributes attr = new ShardAttributes(""); + shardAttributesMap.put(shardId, attr); + // batchInfo.put( + // shardId, + // new ShardsBatchGatewayAllocator.ShardEntry(attr, randomShardRouting(shardId.getIndexName(), shardId.id())) + // ); + } + } + + private ShardRouting randomShardRouting(String index, int shard) { + ShardRoutingState state = randomFrom(ShardRoutingState.values()); + return TestShardRouting.newShardRouting( + index, + shard, + state == ShardRoutingState.UNASSIGNED ? null : "1", + state == ShardRoutingState.RELOCATING ? "2" : null, + state != ShardRoutingState.UNASSIGNED && randomBoolean(), + state + ); + } +} diff --git a/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java index 7fa95fefe72fd..94834bab1d98b 100644 --- a/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java +++ b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java @@ -28,13 +28,12 @@ public class ShardAttributesTests extends OpenSearchTestCase { String customDataPath = "/path/to/data"; public void testShardAttributesConstructor() { - ShardAttributes attributes = new ShardAttributes(shardId, customDataPath); - assertEquals(attributes.getShardId(), shardId); + ShardAttributes attributes = new ShardAttributes(customDataPath); assertEquals(attributes.getCustomDataPath(), customDataPath); } public void testSerialization() throws IOException { - ShardAttributes attributes1 = new ShardAttributes(shardId, customDataPath); + ShardAttributes attributes1 = new ShardAttributes(customDataPath); ByteArrayOutputStream bytes = new ByteArrayOutputStream(); StreamOutput output = new DataOutputStreamOutput(new DataOutputStream(bytes)); attributes1.writeTo(output); @@ -42,7 +41,6 @@ public void testSerialization() throws IOException { StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(bytes.toByteArray())); ShardAttributes attributes2 = new ShardAttributes(input); input.close(); - assertEquals(attributes1.getShardId(), attributes2.getShardId()); assertEquals(attributes1.getCustomDataPath(), attributes2.getCustomDataPath()); }