From 352fb5545178782d7a605a062149249bde0cabaa Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Tue, 18 Jul 2023 12:44:38 +0530 Subject: [PATCH 1/9] Async Fetcher class changes Signed-off-by: Gaurav Chandani --- .../gateway/AsyncBatchShardFetch.java | 488 ++++++++++++++++++ 1 file changed, 488 insertions(+) create mode 100644 server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java diff --git a/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java new file mode 100644 index 0000000000000..0d9272d0d5689 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java @@ -0,0 +1,488 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.Nullable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.transport.ReceiveTimeoutTransportException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.unmodifiableSet; + +public abstract class AsyncBatchShardFetch implements Releasable { + + /** + * An action that lists the relevant shard data that needs to be fetched. + */ + public interface Lister, NodeResponse extends BaseNodeResponse> { + void list(DiscoveryNode[] nodes,Map shardIdsWithCustomDataPath, ActionListener listener); + } + + protected final Logger logger; + protected final String type; + private final String batchUUID; + protected Map shardsToCustomDataPathMap; + private Map> ignoredShardToNodes = new HashMap<>(); + private final AsyncBatchShardFetch.Lister, T> action; + private final Map> cache = new HashMap<>(); + private final Set nodesToIgnore = new HashSet<>(); + private final AtomicLong round = new AtomicLong(); + private boolean closed; + + @SuppressWarnings("unchecked") + protected AsyncBatchShardFetch( + Logger logger, + String type, + Map shardsToCustomDataPathMap, + AsyncBatchShardFetch.Lister, T> action, + String batchUUID + ) { + this.logger = logger; + this.type = type; + this.shardsToCustomDataPathMap = shardsToCustomDataPathMap; + this.action = (AsyncBatchShardFetch.Lister, T>) action; + this.batchUUID = batchUUID; + } + + @Override + public synchronized void close() { + this.closed = true; + } + + /** + * Returns the number of async fetches that are currently ongoing. + */ + public synchronized int getNumberOfInFlightFetches() { + int count = 0; + for (AsyncBatchShardFetch.NodeEntry nodeEntry : cache.values()) { + if (nodeEntry.isFetching()) { + count++; + } + } + return count; + } + + /** + * Fetches the data for the relevant shard. If there any ongoing async fetches going on, or new ones have + * been initiated by this call, the result will have no data. + *

+ * The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need + * to keep them around and make sure we add them back when all the responses are fetched and returned. + */ + public synchronized AsyncBatchShardFetch.FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { + if (closed) { + throw new IllegalStateException(batchUUID + ": can't fetch data on closed async fetch"); + } + + // create a flat map for all ignore nodes in a batch. + // This information is only used for retrying. Fetching is still a broadcast to all available nodes + nodesToIgnore.addAll(ignoreNodes.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())); + fillShardCacheWithDataNodes(cache, nodes); + List> nodesToFetch = findNodesToFetch(cache); + if (nodesToFetch.isEmpty() == false) { + // mark all node as fetching and go ahead and async fetch them + // use a unique round id to detect stale responses in processAsyncFetch + final long fetchingRound = round.incrementAndGet(); + for (AsyncBatchShardFetch.NodeEntry nodeEntry : nodesToFetch) { + nodeEntry.markAsFetching(fetchingRound); + } + DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream() + .map(AsyncBatchShardFetch.NodeEntry::getNodeId) + .map(nodes::get) + .toArray(DiscoveryNode[]::new); + asyncFetch(discoNodesToFetch, fetchingRound); + } + + // if we are still fetching, return null to indicate it + if (hasAnyNodeFetching(cache)) { + return new AsyncBatchShardFetch.FetchResult<>(null, emptyMap()); + } else { + // nothing to fetch, yay, build the return value + Map fetchData = new HashMap<>(); + Set failedNodes = new HashSet<>(); + for (Iterator>> it = cache.entrySet().iterator(); it.hasNext();) { + Map.Entry> entry = it.next(); + String nodeId = entry.getKey(); + AsyncBatchShardFetch.NodeEntry nodeEntry = entry.getValue(); + + DiscoveryNode node = nodes.get(nodeId); + if (node != null) { + if (nodeEntry.isFailed()) { + // if its failed, remove it from the list of nodes, so if this run doesn't work + // we try again next round to fetch it again + it.remove(); + failedNodes.add(nodeEntry.getNodeId()); + } else { + if (nodeEntry.getValue() != null) { + fetchData.put(node, nodeEntry.getValue()); + } + } + } + } + Set allIgnoreNodes = unmodifiableSet(new HashSet<>(nodesToIgnore)); + // clear the nodes to ignore, we had a successful run in fetching everything we can + // we need to try them if another full run is needed + nodesToIgnore.clear(); + + // if at least one node failed, make sure to have a protective reroute + // here, just case this round won't find anything, and we need to retry fetching data + if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) { + reroute(batchUUID, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]"); + } + + return new AsyncBatchShardFetch.FetchResult<>(fetchData, ignoredShardToNodes); + } + } + + /** + * Called by the response handler of the async action to fetch data. Verifies that its still working + * on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for + * the shard (response + failures), issuing a reroute at the end of it to make sure there will be another round + * of allocations taking this new data into account. + */ + protected synchronized void processAsyncFetch(List responses, List failures, long fetchingRound) { + if (closed) { + // we are closed, no need to process this async fetch at all + logger.trace("{} ignoring fetched [{}] results, already closed", batchUUID, type); + return; + } + logger.trace("{} processing fetched [{}] results", batchUUID, type); + + if (responses != null) { + for (T response : responses) { + AsyncBatchShardFetch.NodeEntry nodeEntry = cache.get(response.getNode().getId()); + if (nodeEntry != null) { + if (nodeEntry.getFetchingRound() != fetchingRound) { + assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; + logger.trace( + "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", + batchUUID, + nodeEntry.getNodeId(), + type, + nodeEntry.getFetchingRound(), + fetchingRound + ); + } else if (nodeEntry.isFailed()) { + logger.trace( + "{} node {} has failed for [{}] (failure [{}])", + batchUUID, + nodeEntry.getNodeId(), + type, + nodeEntry.getFailure() + ); + } else { + // if the entry is there, for the right fetching round and not marked as failed already, process it + logger.trace("{} marking {} as done for [{}], result is [{}]", batchUUID, nodeEntry.getNodeId(), type, response); + nodeEntry.doneFetching(response); + } + } + } + } + if (failures != null) { + for (FailedNodeException failure : failures) { + logger.trace("{} processing failure {} for [{}]", batchUUID, failure, type); + AsyncBatchShardFetch.NodeEntry nodeEntry = cache.get(failure.nodeId()); + if (nodeEntry != null) { + if (nodeEntry.getFetchingRound() != fetchingRound) { + assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; + logger.trace( + "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", + batchUUID, + nodeEntry.getNodeId(), + type, + nodeEntry.getFetchingRound(), + fetchingRound + ); + } else if (nodeEntry.isFailed() == false) { + // if the entry is there, for the right fetching round and not marked as failed already, process it + Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); + // if the request got rejected or timed out, we need to try it again next time... + if (unwrappedCause instanceof OpenSearchRejectedExecutionException + || unwrappedCause instanceof ReceiveTimeoutTransportException + || unwrappedCause instanceof OpenSearchTimeoutException) { + nodeEntry.restartFetching(); + } else { + logger.warn( + () -> new ParameterizedMessage( + "{}: failed to list shard for {} on node [{}]", + batchUUID, + type, + failure.nodeId() + ), + failure + ); + nodeEntry.doneFetching(failure.getCause()); + } + } + } + } + } + reroute(batchUUID, "post_response"); + } + + /** + * Implement this in order to scheduled another round that causes a call to fetch data. + */ + protected abstract void reroute(String batchUUID, String reason); + + /** + * Clear cache for node, ensuring next fetch will fetch a fresh copy. + */ + synchronized void clearCacheForNode(String nodeId) { + cache.remove(nodeId); + } + + /** + * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from + * it nodes that are no longer part of the state. + */ + private void fillShardCacheWithDataNodes(Map> shardCache, DiscoveryNodes nodes) { + // verify that all current data nodes are there + for (final DiscoveryNode node : nodes.getDataNodes().values()) { + if (shardCache.containsKey(node.getId()) == false) { + shardCache.put(node.getId(), new AsyncBatchShardFetch.NodeEntry(node.getId())); + } + } + // remove nodes that are not longer part of the data nodes set + shardCache.keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId)); + } + + /** + * Finds all the nodes that need to be fetched. Those are nodes that have no + * data, and are not in fetch mode. + */ + private List> findNodesToFetch(Map> shardCache) { + List> nodesToFetch = new ArrayList<>(); + for (AsyncBatchShardFetch.NodeEntry nodeEntry : shardCache.values()) { + if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) { + nodesToFetch.add(nodeEntry); + } + } + return nodesToFetch; + } + + /** + * Are there any nodes that are fetching data? + */ + private boolean hasAnyNodeFetching(Map> shardCache) { + for (AsyncBatchShardFetch.NodeEntry nodeEntry : shardCache.values()) { + if (nodeEntry.isFetching()) { + return true; + } + } + return false; + } + + /** + * Async fetches data for the provided shard with the set of nodes that need to be fetched from. + */ + // visible for testing + void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { + logger.trace("{} fetching [{}] from {}", batchUUID, type, nodes); + action.list(nodes, shardsToCustomDataPathMap, new ActionListener>() { + @Override + public void onResponse(BaseNodesResponse response) { + processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); + } + + @Override + public void onFailure(Exception e) { + List failures = new ArrayList<>(nodes.length); + for (final DiscoveryNode node : nodes) { + failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e)); + } + processAsyncFetch(null, failures, fetchingRound); + } + }); + } + + /** + * The result of a fetch operation. Make sure to first check {@link #hasData()} before + * fetching the actual data. + */ + public static class FetchResult { + + private final Map data; + private Map> ignoredShardToNodes = new HashMap<>(); + + public FetchResult(Map data, Map> ignoreNodes) { + this.data = data; + this.ignoredShardToNodes = ignoreNodes; + } + + /** + * Does the result actually contain data? If not, then there are on going fetch + * operations happening, and it should wait for it. + */ + public boolean hasData() { + return data != null; + } + + /** + * Returns the actual data, note, make sure to check {@link #hasData()} first and + * only use this when there is an actual data. + */ + public Map getData() { + assert data != null : "getData should only be called if there is data to be fetched, please check hasData first"; + return this.data; + } + + /** + * Process any changes needed to the allocation based on this fetch result. + */ + public void processAllocation(RoutingAllocation allocation) { + for(Map.Entry> entry : ignoredShardToNodes.entrySet()) { + ShardId shardId = entry.getKey(); + Set ignoreNodes = entry.getValue(); + if (ignoreNodes.isEmpty() == false) { + ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); + } + } + + } + } + + /** + * The result of a fetch operation. Make sure to first check {@link #hasData()} before + * fetching the actual data. + */ + public static class AdaptedResultsForShard { + + private final ShardId shardId; + private final Map data; + private final Set ignoreNodes; + + public AdaptedResultsForShard(ShardId shardId, Map data, Set ignoreNodes) { + this.shardId = shardId; + this.data = data; + this.ignoreNodes = ignoreNodes; + } + + /** + * Does the result actually contain data? If not, then there are on going fetch + * operations happening, and it should wait for it. + */ + public boolean hasData() { + return data != null; + } + + /** + * Returns the actual data, note, make sure to check {@link #hasData()} first and + * only use this when there is an actual data. + */ + public Map getData() { + assert data != null : "getData should only be called if there is data to be fetched, please check hasData first"; + return this.data; + } + } + + /** + * A node entry, holding the state of the fetched data for a specific shard + * for a giving node. + */ + static class NodeEntry { + private final String nodeId; + private boolean fetching; + @Nullable + private T value; + private boolean valueSet; + private Throwable failure; + private long fetchingRound; + + NodeEntry(String nodeId) { + this.nodeId = nodeId; + } + + String getNodeId() { + return this.nodeId; + } + + boolean isFetching() { + return fetching; + } + + void markAsFetching(long fetchingRound) { + assert fetching == false : "double marking a node as fetching"; + this.fetching = true; + this.fetchingRound = fetchingRound; + } + + void doneFetching(T value) { + assert fetching : "setting value but not in fetching mode"; + assert failure == null : "setting value when failure already set"; + this.valueSet = true; + this.value = value; + this.fetching = false; + } + + void doneFetching(Throwable failure) { + assert fetching : "setting value but not in fetching mode"; + assert valueSet == false : "setting failure when already set value"; + assert failure != null : "setting failure can't be null"; + this.failure = failure; + this.fetching = false; + } + + void restartFetching() { + assert fetching : "restarting fetching, but not in fetching mode"; + assert valueSet == false : "value can't be set when restarting fetching"; + assert failure == null : "failure can't be set when restarting fetching"; + this.fetching = false; + } + + boolean isFailed() { + return failure != null; + } + + boolean hasData() { + return valueSet || failure != null; + } + + Throwable getFailure() { + assert hasData() : "getting failure when data has not been fetched"; + return failure; + } + + @Nullable + T getValue() { + assert failure == null : "trying to fetch value, but its marked as failed, check isFailed"; + assert valueSet : "value is not set, hasn't been fetched yet"; + return value; + } + + long getFetchingRound() { + return fetchingRound; + } + } +} From 43beda5066a7d0ffb98899aaa3eda2af36896635 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 23 Aug 2023 19:35:07 +0530 Subject: [PATCH 2/9] Removed AsyncShardBatchFetch class Signed-off-by: Gaurav Chandani --- .../TransportIndicesShardStoresAction.java | 6 +- .../opensearch/gateway/AsyncShardFetch.java | 131 ++++++++++++------ .../opensearch/gateway/GatewayAllocator.java | 22 +-- ...ransportNodesListGatewayStartedShards.java | 6 +- .../TransportNodesListShardStoreMetadata.java | 6 +- .../gateway/AsyncShardFetchTests.java | 62 ++++----- .../gateway/PrimaryShardAllocatorTests.java | 4 +- .../gateway/ReplicaShardAllocatorTests.java | 4 +- .../test/gateway/TestGatewayAllocator.java | 8 +- 9 files changed, 157 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index e0fb4fd922ef6..fe6dae378058b 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -196,7 +196,7 @@ void start() { } else { for (Tuple shard : shards) { InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), listShardStoresInfo); - fetch.fetchData(nodes, Collections.emptySet()); + fetch.fetchData(nodes, Collections.emptyMap()); } } } @@ -224,7 +224,7 @@ protected synchronized void processAsyncFetch( List failures, long fetchingRound ) { - fetchResponses.add(new Response(shardId, responses, failures)); + fetchResponses.add(new Response(shardToCustomDataPath.keySet().iterator().next(), responses, failures)); if (expectedOps.countDown()) { finish(); } @@ -314,7 +314,7 @@ private boolean shardExistsInNode(final NodeGatewayStartedShards response) { } @Override - protected void reroute(ShardId shardId, String reason) { + protected void reroute(String shardId, String reason) { // no-op } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index e4df2e604c320..6dc7011a31f3b 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -54,12 +54,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import static java.util.Collections.emptySet; -import static java.util.Collections.unmodifiableSet; +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; /** * Allows to asynchronously fetch shard related data from other nodes for allocation, without blocking @@ -77,18 +76,22 @@ public abstract class AsyncShardFetch implements Rel * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(ShardId shardId, @Nullable String customDataPath, DiscoveryNode[] nodes, ActionListener listener); + void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener); + } protected final Logger logger; protected final String type; - protected final ShardId shardId; - protected final String customDataPath; + + protected final Map shardToCustomDataPath; private final Lister, T> action; private final Map> cache = new HashMap<>(); - private final Set nodesToIgnore = new HashSet<>(); private final AtomicLong round = new AtomicLong(); private boolean closed; + private final String logKey; + private final Map> shardToIgnoreNodes = new HashMap<>(); + + private final boolean enableBatchMode; @SuppressWarnings("unchecked") protected AsyncShardFetch( @@ -100,11 +103,30 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - this.shardId = Objects.requireNonNull(shardId); - this.customDataPath = Objects.requireNonNull(customDataPath); + shardToCustomDataPath =new HashMap<>(); + shardToCustomDataPath.put(shardId, customDataPath); this.action = (Lister, T>) action; + this.logKey = "ShardId=[" + shardId.toString() + "]"; + enableBatchMode = false; } + @SuppressWarnings("unchecked") + protected AsyncShardFetch( + Logger logger, + String type, + Map shardToCustomDataPath, + Lister, T> action, + String batchId + ) { + this.logger = logger; + this.type = type; + this.shardToCustomDataPath = shardToCustomDataPath; + this.action = (Lister, T>) action; + this.logKey = "BatchID=[" + batchId+ "]"; + enableBatchMode = true; + } + + @Override public synchronized void close() { this.closed = true; @@ -130,11 +152,26 @@ public synchronized int getNumberOfInFlightFetches() { * The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need * to keep them around and make sure we add them back when all the responses are fetched and returned. */ - public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set ignoreNodes) { + public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { if (closed) { - throw new IllegalStateException(shardId + ": can't fetch data on closed async fetch"); + throw new IllegalStateException(logKey + ": can't fetch data on closed async fetch"); + } + + if(enableBatchMode == false){ + // we will do assertions here on ignoreNodes + assert ignoreNodes.size() <=1 : "Can only have at-most one shard"; + if(ignoreNodes.size() == 1) { + assert shardToCustomDataPath.containsKey(ignoreNodes.keySet().iterator().next()) : "ShardId should be same as initialised in fetcher"; + } + } + + // add the nodes to ignore to the list of nodes to ignore for each shard + for (Map.Entry> ignoreNodesEntry : ignoreNodes.entrySet()) { + Set ignoreNodesSet = shardToIgnoreNodes.getOrDefault(ignoreNodesEntry.getKey(), new HashSet<>()); + ignoreNodesSet.addAll(ignoreNodesEntry.getValue()); + shardToIgnoreNodes.put(ignoreNodesEntry.getKey(), ignoreNodesSet); } - nodesToIgnore.addAll(ignoreNodes); + fillShardCacheWithDataNodes(cache, nodes); List> nodesToFetch = findNodesToFetch(cache); if (nodesToFetch.isEmpty() == false) { @@ -153,7 +190,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set i // if we are still fetching, return null to indicate it if (hasAnyNodeFetching(cache)) { - return new FetchResult<>(shardId, null, emptySet()); + return new FetchResult<>(null, emptyMap()); } else { // nothing to fetch, yay, build the return value Map fetchData = new HashMap<>(); @@ -177,16 +214,19 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set i } } } - Set allIgnoreNodes = unmodifiableSet(new HashSet<>(nodesToIgnore)); + + Map> allIgnoreNodesMap = unmodifiableMap(new HashMap<>(shardToIgnoreNodes)); // clear the nodes to ignore, we had a successful run in fetching everything we can // we need to try them if another full run is needed - nodesToIgnore.clear(); + shardToIgnoreNodes.clear(); // if at least one node failed, make sure to have a protective reroute // here, just case this round won't find anything, and we need to retry fetching data - if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) { - reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]"); + if (failedNodes.isEmpty() == false || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) { + reroute(logKey, "nodes failed [" + failedNodes.size() + "], ignored [" + + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + "]"); } - return new FetchResult<>(shardId, fetchData, allIgnoreNodes); + + return new FetchResult<>(fetchData, allIgnoreNodesMap); } } @@ -199,10 +239,10 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set i protected synchronized void processAsyncFetch(List responses, List failures, long fetchingRound) { if (closed) { // we are closed, no need to process this async fetch at all - logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type); + logger.trace("{} ignoring fetched [{}] results, already closed", logKey, type); return; } - logger.trace("{} processing fetched [{}] results", shardId, type); + logger.trace("{} processing fetched [{}] results", logKey, type); if (responses != null) { for (T response : responses) { @@ -212,7 +252,7 @@ protected synchronized void processAsyncFetch(List responses, List fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - shardId, + logKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -221,14 +261,14 @@ protected synchronized void processAsyncFetch(List responses, List responses, List nodeEntry = cache.get(failure.nodeId()); if (nodeEntry != null) { if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - shardId, + logKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -261,7 +301,7 @@ protected synchronized void processAsyncFetch(List responses, List new ParameterizedMessage( "{}: failed to list shard for {} on node [{}]", - shardId, + logKey, type, failure.nodeId() ), @@ -273,13 +313,13 @@ protected synchronized void processAsyncFetch(List responses, List> shardCache) { */ // visible for testing void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { - logger.trace("{} fetching [{}] from {}", shardId, type, nodes); - action.list(shardId, customDataPath, nodes, new ActionListener>() { + logger.trace("{} fetching [{}] from {}", logKey, type, nodes); + action.list(shardToCustomDataPath, nodes, new ActionListener>() { @Override public void onResponse(BaseNodesResponse response) { processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); @@ -358,15 +398,13 @@ public void onFailure(Exception e) { */ public static class FetchResult { - private final ShardId shardId; - private final Map data; - private final Set ignoreNodes; + private final Map data; + private final Map> ignoredShardToNodes; - public FetchResult(ShardId shardId, Map data, Set ignoreNodes) { - this.shardId = shardId; - this.data = data; - this.ignoreNodes = ignoreNodes; - } + public FetchResult(Map data, Map> ignoreNodes) { + this.data = data; + this.ignoredShardToNodes = ignoreNodes; + } /** * Does the result actually contain data? If not, then there are on going fetch @@ -385,15 +423,20 @@ public Map getData() { return this.data; } - /** - * Process any changes needed to the allocation based on this fetch result. - */ - public void processAllocation(RoutingAllocation allocation) { - for (String ignoreNode : ignoreNodes) { - allocation.addIgnoreShardForNode(shardId, ignoreNode); + /** + * Process any changes needed to the allocation based on this fetch result. + */ + public void processAllocation(RoutingAllocation allocation) { + for(Map.Entry> entry : ignoredShardToNodes.entrySet()) { + ShardId shardId = entry.getKey(); + Set ignoreNodes = entry.getValue(); + if (ignoreNodes.isEmpty() == false) { + ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); } } + } +} /** * A node entry, holding the state of the fetched data for a specific shard diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 1a4681766e489..81a205f5a9235 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -56,6 +56,7 @@ import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.Spliterators; @@ -226,7 +227,9 @@ private static void clearCacheForPrimary( AsyncShardFetch fetch, RoutingAllocation allocation ) { - ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId); + assert fetch.shardToCustomDataPath.size() == 1 : "expected only one shard"; + ShardId shardId = fetch.shardToCustomDataPath.keySet().iterator().next(); + ShardRouting primary = allocation.routingNodes().activePrimary(shardId); if (primary != null) { fetch.clearCacheForNode(primary.currentNodeId()); } @@ -254,20 +257,19 @@ class InternalAsyncFetch extends AsyncShardFetch } @Override - protected void reroute(ShardId shardId, String reason) { - logger.trace("{} scheduling reroute for {}", shardId, reason); + protected void reroute(String logKey, String reason) { + logger.trace("{} scheduling reroute for {}", logKey, reason); assert rerouteService != null; rerouteService.reroute( "async_shard_fetch", Priority.HIGH, ActionListener.wrap( - r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason), - e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e) + r -> logger.trace("{} scheduled reroute completed for {}", logKey, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", logKey, reason), e) ) ); } } - class InternalPrimaryShardAllocator extends PrimaryShardAllocator { private final TransportNodesListGatewayStartedShards startedAction; @@ -293,7 +295,9 @@ protected AsyncShardFetch.FetchResult shardState = fetch.fetchData( allocation.nodes(), - allocation.getIgnoreNodes(shard.shardId()) + new HashMap<>() {{ + put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); + }} ); if (shardState.hasData()) { @@ -328,7 +332,9 @@ protected AsyncShardFetch.FetchResult shardStores = fetch.fetchData( allocation.nodes(), - allocation.getIgnoreNodes(shard.shardId()) + new HashMap<>() {{ + put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); + }} ); if (shardStores.hasData()) { shardStores.processAllocation(allocation); diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index b529557aa9815..61ae5e62b8a83 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -68,6 +68,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -124,7 +125,10 @@ public TransportNodesListGatewayStartedShards( } @Override - public void list(ShardId shardId, String customDataPath, DiscoveryNode[] nodes, ActionListener listener) { + public void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener) { + assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); + final String customDataPath = shardIdsWithCustomDataPath.get(shardId); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index bdb0d99fa93b0..e2447a1c5612c 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -73,6 +73,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -125,7 +126,10 @@ public TransportNodesListShardStoreMetadata( } @Override - public void list(ShardId shardId, String customDataPath, DiscoveryNode[] nodes, ActionListener listener) { + public void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener) { + assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); + final String customDataPath = shardIdsWithCustomDataPath.get(shardId); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 982c21a9e57ec..5462d42dbe03e 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -51,7 +51,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import static java.util.Collections.emptySet; +import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; @@ -97,7 +97,7 @@ public void testClose() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -107,7 +107,7 @@ public void testClose() throws Exception { assertThat(test.reroute.get(), equalTo(1)); test.close(); try { - test.fetchData(nodes, emptySet()); + test.fetchData(nodes, emptyMap()); fail("fetch data should fail when closed"); } catch (IllegalStateException e) { // all is well @@ -119,7 +119,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -127,7 +127,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { test.fireSimulationAndWait(node1.getId()); // verify we get back the data node assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -139,7 +139,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { test.addSimulation(node1.getId(), failure1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -147,19 +147,19 @@ public void testFullCircleSingleNodeFailure() throws Exception { test.fireSimulationAndWait(node1.getId()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); // on failure, we reset the failure on a successive call to fetchData, and try again afterwards test.addSimulation(node1.getId(), response1); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); test.fireSimulationAndWait(node1.getId()); // 2 reroutes, cause we have a failure that we clear assertThat(test.reroute.get(), equalTo(3)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -170,7 +170,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -183,7 +183,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { test.fireSimulationAndWait(node1.getId()); // verify we get back the data node assertThat(test.reroute.get(), equalTo(2)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -195,7 +195,7 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { test.addSimulation(node1.getId(), failure1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -212,7 +212,7 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { test.fireSimulationAndWait(node1.getId()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(2)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); } @@ -223,7 +223,7 @@ public void testTwoNodesOnSetup() throws Exception { test.addSimulation(node2.getId(), response2); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -231,14 +231,14 @@ public void testTwoNodesOnSetup() throws Exception { test.fireSimulationAndWait(node1.getId()); // there is still another on going request, so no data assertThat(test.getNumberOfInFlightFetches(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // no more ongoing requests, we should fetch the data assertThat(test.reroute.get(), equalTo(2)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -251,21 +251,21 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { test.addSimulation(node2.getId(), failure2); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); assertThat(test.reroute.get(), equalTo(2)); // since one of those failed, we should only have one entry - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -276,7 +276,7 @@ public void testTwoNodesAddedInBetween() throws Exception { test.addSimulation(node1.getId(), response1); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -287,14 +287,14 @@ public void testTwoNodesAddedInBetween() throws Exception { nodes = DiscoveryNodes.builder(nodes).add(node2).build(); test.addSimulation(node2.getId(), response2); // no fetch data, has a new node introduced - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // since one of those failed, we should only have one entry - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -309,7 +309,7 @@ public void testClearCache() throws Exception { test.clearCacheForNode(node1.getId()); // no fetched data, request still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -317,13 +317,13 @@ public void testClearCache() throws Exception { assertThat(test.reroute.get(), equalTo(1)); // verify we get back right data from node - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // second fetch gets same data - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -334,14 +334,14 @@ public void testClearCache() throws Exception { test.addSimulation(node1.getId(), response1_2); // no fetched data, new request on going - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // verify we get new data back - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); @@ -352,7 +352,7 @@ public void testConcurrentRequestAndClearCache() throws Exception { test.addSimulation(node1.getId(), response1); // no fetched data, request still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -366,14 +366,14 @@ public void testConcurrentRequestAndClearCache() throws Exception { test.addSimulation(node1.getId(), response1_2); // verify still no fetched data, request still on going - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // verify we get new data back - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); @@ -418,7 +418,7 @@ public void fireSimulationAndWait(String nodeId) throws InterruptedException { } @Override - protected void reroute(ShardId shardId, String reason) { + protected void reroute(String shardId, String reason) { reroute.incrementAndGet(); } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index d15bb49f5342a..c1da9575d4a83 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -850,7 +850,9 @@ protected AsyncShardFetch.FetchResult(shardId, data, Collections.emptySet()); + return new AsyncShardFetch.FetchResult<>( data, new HashMap<>(){{ + put(shardId, Collections.emptySet()); + }}); } } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index 36ac93524d6aa..f32f541f5ba4e 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -727,7 +727,9 @@ protected AsyncShardFetch.FetchResult(shardId, tData, Collections.emptySet()); + return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>(){{ + put(shardId, Collections.emptySet()); + }}); } @Override diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index a36dc26685eb4..824c85dfb1f84 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -98,7 +98,9 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR ) ); - return new AsyncShardFetch.FetchResult<>(shardId, foundShards, ignoreNodes); + return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>(){{ + put(shardId, ignoreNodes); + }}); } }; @@ -111,7 +113,9 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // for now, just pretend no node has data final ShardId shardId = shard.shardId(); - return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), allocation.getIgnoreNodes(shardId)); + return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>(){{ + put(shardId, allocation.getIgnoreNodes(shardId)); + }}); } @Override From 6c82091b59f988e3b5531518af071807cf7546ee Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 30 Aug 2023 16:31:19 +0530 Subject: [PATCH 3/9] Applied spotlesscheck Signed-off-by: Gaurav Chandani --- .../gateway/AsyncBatchShardFetch.java | 7 +-- .../opensearch/gateway/AsyncShardFetch.java | 63 ++++++++++--------- .../opensearch/gateway/GatewayAllocator.java | 17 +++-- ...ransportNodesListGatewayStartedShards.java | 6 +- .../TransportNodesListShardStoreMetadata.java | 8 ++- .../gateway/PrimaryShardAllocatorTests.java | 8 ++- .../gateway/ReplicaShardAllocatorTests.java | 8 ++- 7 files changed, 70 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java index 0d9272d0d5689..994631d4bf026 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java @@ -37,7 +37,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableSet; public abstract class AsyncBatchShardFetch implements Releasable { @@ -46,13 +45,13 @@ public abstract class AsyncBatchShardFetch implement * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(DiscoveryNode[] nodes,Map shardIdsWithCustomDataPath, ActionListener listener); + void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener); } protected final Logger logger; protected final String type; private final String batchUUID; - protected Map shardsToCustomDataPathMap; + protected Map shardsToCustomDataPathMap; private Map> ignoredShardToNodes = new HashMap<>(); private final AsyncBatchShardFetch.Lister, T> action; private final Map> cache = new HashMap<>(); @@ -362,7 +361,7 @@ public Map getData() { * Process any changes needed to the allocation based on this fetch result. */ public void processAllocation(RoutingAllocation allocation) { - for(Map.Entry> entry : ignoredShardToNodes.entrySet()) { + for (Map.Entry> entry : ignoredShardToNodes.entrySet()) { ShardId shardId = entry.getKey(); Set ignoreNodes = entry.getValue(); if (ignoreNodes.isEmpty() == false) { diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 6dc7011a31f3b..794aacb0c2fd1 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -83,7 +83,7 @@ public interface Lister, N protected final Logger logger; protected final String type; - protected final Map shardToCustomDataPath; + protected final Map shardToCustomDataPath; private final Lister, T> action; private final Map> cache = new HashMap<>(); private final AtomicLong round = new AtomicLong(); @@ -103,7 +103,7 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - shardToCustomDataPath =new HashMap<>(); + shardToCustomDataPath = new HashMap<>(); shardToCustomDataPath.put(shardId, customDataPath); this.action = (Lister, T>) action; this.logKey = "ShardId=[" + shardId.toString() + "]"; @@ -122,11 +122,10 @@ protected AsyncShardFetch( this.type = type; this.shardToCustomDataPath = shardToCustomDataPath; this.action = (Lister, T>) action; - this.logKey = "BatchID=[" + batchId+ "]"; + this.logKey = "BatchID=[" + batchId + "]"; enableBatchMode = true; } - @Override public synchronized void close() { this.closed = true; @@ -157,11 +156,12 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map fetchData(DiscoveryNodes nodes, Map ignoreNodeSet.isEmpty() == false)) { - reroute(logKey, "nodes failed [" + failedNodes.size() + "], ignored [" - + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + "]"); + if (failedNodes.isEmpty() == false + || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) { + reroute( + logKey, + "nodes failed [" + + failedNodes.size() + + "], ignored [" + + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + + "]" + ); } return new FetchResult<>(fetchData, allIgnoreNodesMap); @@ -398,13 +405,13 @@ public void onFailure(Exception e) { */ public static class FetchResult { - private final Map data; - private final Map> ignoredShardToNodes; + private final Map data; + private final Map> ignoredShardToNodes; - public FetchResult(Map data, Map> ignoreNodes) { - this.data = data; - this.ignoredShardToNodes = ignoreNodes; - } + public FetchResult(Map data, Map> ignoreNodes) { + this.data = data; + this.ignoredShardToNodes = ignoreNodes; + } /** * Does the result actually contain data? If not, then there are on going fetch @@ -423,20 +430,20 @@ public Map getData() { return this.data; } - /** - * Process any changes needed to the allocation based on this fetch result. - */ - public void processAllocation(RoutingAllocation allocation) { - for(Map.Entry> entry : ignoredShardToNodes.entrySet()) { - ShardId shardId = entry.getKey(); - Set ignoreNodes = entry.getValue(); - if (ignoreNodes.isEmpty() == false) { - ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); + /** + * Process any changes needed to the allocation based on this fetch result. + */ + public void processAllocation(RoutingAllocation allocation) { + for (Map.Entry> entry : ignoredShardToNodes.entrySet()) { + ShardId shardId = entry.getKey(); + Set ignoreNodes = entry.getValue(); + if (ignoreNodes.isEmpty() == false) { + ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); + } } - } + } } -} /** * A node entry, holding the state of the fetched data for a specific shard diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 81a205f5a9235..e74cb375a3bc4 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -270,6 +270,7 @@ protected void reroute(String logKey, String reason) { ); } } + class InternalPrimaryShardAllocator extends PrimaryShardAllocator { private final TransportNodesListGatewayStartedShards startedAction; @@ -295,9 +296,11 @@ protected AsyncShardFetch.FetchResult shardState = fetch.fetchData( allocation.nodes(), - new HashMap<>() {{ - put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); - }} + new HashMap<>() { + { + put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); + } + } ); if (shardState.hasData()) { @@ -332,9 +335,11 @@ protected AsyncShardFetch.FetchResult shardStores = fetch.fetchData( allocation.nodes(), - new HashMap<>() {{ - put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); - }} + new HashMap<>() { + { + put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); + } + } ); if (shardStores.hasData()) { shardStores.processAllocation(allocation); diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 61ae5e62b8a83..0abbb497039d9 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -125,7 +125,11 @@ public TransportNodesListGatewayStartedShards( } @Override - public void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener) { + public void list( + Map shardIdsWithCustomDataPath, + DiscoveryNode[] nodes, + ActionListener listener + ) { assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); final String customDataPath = shardIdsWithCustomDataPath.get(shardId); diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index e2447a1c5612c..32e792169a811 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -126,8 +126,12 @@ public TransportNodesListShardStoreMetadata( } @Override - public void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener) { - assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; + public void list( + Map shardIdsWithCustomDataPath, + DiscoveryNode[] nodes, + ActionListener listener + ) { + assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); final String customDataPath = shardIdsWithCustomDataPath.get(shardId); execute(new Request(shardId, customDataPath, nodes), listener); diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index c1da9575d4a83..21c16eb3cbc6d 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -850,9 +850,11 @@ protected AsyncShardFetch.FetchResult( data, new HashMap<>(){{ - put(shardId, Collections.emptySet()); - }}); + return new AsyncShardFetch.FetchResult<>(data, new HashMap<>() { + { + put(shardId, Collections.emptySet()); + } + }); } } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index f32f541f5ba4e..dc28315465c02 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -727,9 +727,11 @@ protected AsyncShardFetch.FetchResult(tData, new HashMap<>(){{ - put(shardId, Collections.emptySet()); - }}); + return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>() { + { + put(shardId, Collections.emptySet()); + } + }); } @Override From ea6e081c0db5cf9595db39432456ff83ffdf76e6 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 30 Aug 2023 16:40:23 +0530 Subject: [PATCH 4/9] Missed deleting the older file Signed-off-by: Gaurav Chandani --- .../gateway/AsyncBatchShardFetch.java | 487 ------------------ 1 file changed, 487 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java diff --git a/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java deleted file mode 100644 index 994631d4bf026..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchTimeoutException; -import org.opensearch.action.ActionListener; -import org.opensearch.action.FailedNodeException; -import org.opensearch.action.support.nodes.BaseNodeResponse; -import org.opensearch.action.support.nodes.BaseNodesResponse; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.common.Nullable; -import org.opensearch.common.lease.Releasable; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.index.shard.ShardId; -import org.opensearch.transport.ReceiveTimeoutTransportException; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableSet; - -public abstract class AsyncBatchShardFetch implements Releasable { - - /** - * An action that lists the relevant shard data that needs to be fetched. - */ - public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener); - } - - protected final Logger logger; - protected final String type; - private final String batchUUID; - protected Map shardsToCustomDataPathMap; - private Map> ignoredShardToNodes = new HashMap<>(); - private final AsyncBatchShardFetch.Lister, T> action; - private final Map> cache = new HashMap<>(); - private final Set nodesToIgnore = new HashSet<>(); - private final AtomicLong round = new AtomicLong(); - private boolean closed; - - @SuppressWarnings("unchecked") - protected AsyncBatchShardFetch( - Logger logger, - String type, - Map shardsToCustomDataPathMap, - AsyncBatchShardFetch.Lister, T> action, - String batchUUID - ) { - this.logger = logger; - this.type = type; - this.shardsToCustomDataPathMap = shardsToCustomDataPathMap; - this.action = (AsyncBatchShardFetch.Lister, T>) action; - this.batchUUID = batchUUID; - } - - @Override - public synchronized void close() { - this.closed = true; - } - - /** - * Returns the number of async fetches that are currently ongoing. - */ - public synchronized int getNumberOfInFlightFetches() { - int count = 0; - for (AsyncBatchShardFetch.NodeEntry nodeEntry : cache.values()) { - if (nodeEntry.isFetching()) { - count++; - } - } - return count; - } - - /** - * Fetches the data for the relevant shard. If there any ongoing async fetches going on, or new ones have - * been initiated by this call, the result will have no data. - *

- * The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need - * to keep them around and make sure we add them back when all the responses are fetched and returned. - */ - public synchronized AsyncBatchShardFetch.FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { - if (closed) { - throw new IllegalStateException(batchUUID + ": can't fetch data on closed async fetch"); - } - - // create a flat map for all ignore nodes in a batch. - // This information is only used for retrying. Fetching is still a broadcast to all available nodes - nodesToIgnore.addAll(ignoreNodes.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())); - fillShardCacheWithDataNodes(cache, nodes); - List> nodesToFetch = findNodesToFetch(cache); - if (nodesToFetch.isEmpty() == false) { - // mark all node as fetching and go ahead and async fetch them - // use a unique round id to detect stale responses in processAsyncFetch - final long fetchingRound = round.incrementAndGet(); - for (AsyncBatchShardFetch.NodeEntry nodeEntry : nodesToFetch) { - nodeEntry.markAsFetching(fetchingRound); - } - DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream() - .map(AsyncBatchShardFetch.NodeEntry::getNodeId) - .map(nodes::get) - .toArray(DiscoveryNode[]::new); - asyncFetch(discoNodesToFetch, fetchingRound); - } - - // if we are still fetching, return null to indicate it - if (hasAnyNodeFetching(cache)) { - return new AsyncBatchShardFetch.FetchResult<>(null, emptyMap()); - } else { - // nothing to fetch, yay, build the return value - Map fetchData = new HashMap<>(); - Set failedNodes = new HashSet<>(); - for (Iterator>> it = cache.entrySet().iterator(); it.hasNext();) { - Map.Entry> entry = it.next(); - String nodeId = entry.getKey(); - AsyncBatchShardFetch.NodeEntry nodeEntry = entry.getValue(); - - DiscoveryNode node = nodes.get(nodeId); - if (node != null) { - if (nodeEntry.isFailed()) { - // if its failed, remove it from the list of nodes, so if this run doesn't work - // we try again next round to fetch it again - it.remove(); - failedNodes.add(nodeEntry.getNodeId()); - } else { - if (nodeEntry.getValue() != null) { - fetchData.put(node, nodeEntry.getValue()); - } - } - } - } - Set allIgnoreNodes = unmodifiableSet(new HashSet<>(nodesToIgnore)); - // clear the nodes to ignore, we had a successful run in fetching everything we can - // we need to try them if another full run is needed - nodesToIgnore.clear(); - - // if at least one node failed, make sure to have a protective reroute - // here, just case this round won't find anything, and we need to retry fetching data - if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) { - reroute(batchUUID, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]"); - } - - return new AsyncBatchShardFetch.FetchResult<>(fetchData, ignoredShardToNodes); - } - } - - /** - * Called by the response handler of the async action to fetch data. Verifies that its still working - * on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for - * the shard (response + failures), issuing a reroute at the end of it to make sure there will be another round - * of allocations taking this new data into account. - */ - protected synchronized void processAsyncFetch(List responses, List failures, long fetchingRound) { - if (closed) { - // we are closed, no need to process this async fetch at all - logger.trace("{} ignoring fetched [{}] results, already closed", batchUUID, type); - return; - } - logger.trace("{} processing fetched [{}] results", batchUUID, type); - - if (responses != null) { - for (T response : responses) { - AsyncBatchShardFetch.NodeEntry nodeEntry = cache.get(response.getNode().getId()); - if (nodeEntry != null) { - if (nodeEntry.getFetchingRound() != fetchingRound) { - assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; - logger.trace( - "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - batchUUID, - nodeEntry.getNodeId(), - type, - nodeEntry.getFetchingRound(), - fetchingRound - ); - } else if (nodeEntry.isFailed()) { - logger.trace( - "{} node {} has failed for [{}] (failure [{}])", - batchUUID, - nodeEntry.getNodeId(), - type, - nodeEntry.getFailure() - ); - } else { - // if the entry is there, for the right fetching round and not marked as failed already, process it - logger.trace("{} marking {} as done for [{}], result is [{}]", batchUUID, nodeEntry.getNodeId(), type, response); - nodeEntry.doneFetching(response); - } - } - } - } - if (failures != null) { - for (FailedNodeException failure : failures) { - logger.trace("{} processing failure {} for [{}]", batchUUID, failure, type); - AsyncBatchShardFetch.NodeEntry nodeEntry = cache.get(failure.nodeId()); - if (nodeEntry != null) { - if (nodeEntry.getFetchingRound() != fetchingRound) { - assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; - logger.trace( - "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - batchUUID, - nodeEntry.getNodeId(), - type, - nodeEntry.getFetchingRound(), - fetchingRound - ); - } else if (nodeEntry.isFailed() == false) { - // if the entry is there, for the right fetching round and not marked as failed already, process it - Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); - // if the request got rejected or timed out, we need to try it again next time... - if (unwrappedCause instanceof OpenSearchRejectedExecutionException - || unwrappedCause instanceof ReceiveTimeoutTransportException - || unwrappedCause instanceof OpenSearchTimeoutException) { - nodeEntry.restartFetching(); - } else { - logger.warn( - () -> new ParameterizedMessage( - "{}: failed to list shard for {} on node [{}]", - batchUUID, - type, - failure.nodeId() - ), - failure - ); - nodeEntry.doneFetching(failure.getCause()); - } - } - } - } - } - reroute(batchUUID, "post_response"); - } - - /** - * Implement this in order to scheduled another round that causes a call to fetch data. - */ - protected abstract void reroute(String batchUUID, String reason); - - /** - * Clear cache for node, ensuring next fetch will fetch a fresh copy. - */ - synchronized void clearCacheForNode(String nodeId) { - cache.remove(nodeId); - } - - /** - * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from - * it nodes that are no longer part of the state. - */ - private void fillShardCacheWithDataNodes(Map> shardCache, DiscoveryNodes nodes) { - // verify that all current data nodes are there - for (final DiscoveryNode node : nodes.getDataNodes().values()) { - if (shardCache.containsKey(node.getId()) == false) { - shardCache.put(node.getId(), new AsyncBatchShardFetch.NodeEntry(node.getId())); - } - } - // remove nodes that are not longer part of the data nodes set - shardCache.keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId)); - } - - /** - * Finds all the nodes that need to be fetched. Those are nodes that have no - * data, and are not in fetch mode. - */ - private List> findNodesToFetch(Map> shardCache) { - List> nodesToFetch = new ArrayList<>(); - for (AsyncBatchShardFetch.NodeEntry nodeEntry : shardCache.values()) { - if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) { - nodesToFetch.add(nodeEntry); - } - } - return nodesToFetch; - } - - /** - * Are there any nodes that are fetching data? - */ - private boolean hasAnyNodeFetching(Map> shardCache) { - for (AsyncBatchShardFetch.NodeEntry nodeEntry : shardCache.values()) { - if (nodeEntry.isFetching()) { - return true; - } - } - return false; - } - - /** - * Async fetches data for the provided shard with the set of nodes that need to be fetched from. - */ - // visible for testing - void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { - logger.trace("{} fetching [{}] from {}", batchUUID, type, nodes); - action.list(nodes, shardsToCustomDataPathMap, new ActionListener>() { - @Override - public void onResponse(BaseNodesResponse response) { - processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); - } - - @Override - public void onFailure(Exception e) { - List failures = new ArrayList<>(nodes.length); - for (final DiscoveryNode node : nodes) { - failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e)); - } - processAsyncFetch(null, failures, fetchingRound); - } - }); - } - - /** - * The result of a fetch operation. Make sure to first check {@link #hasData()} before - * fetching the actual data. - */ - public static class FetchResult { - - private final Map data; - private Map> ignoredShardToNodes = new HashMap<>(); - - public FetchResult(Map data, Map> ignoreNodes) { - this.data = data; - this.ignoredShardToNodes = ignoreNodes; - } - - /** - * Does the result actually contain data? If not, then there are on going fetch - * operations happening, and it should wait for it. - */ - public boolean hasData() { - return data != null; - } - - /** - * Returns the actual data, note, make sure to check {@link #hasData()} first and - * only use this when there is an actual data. - */ - public Map getData() { - assert data != null : "getData should only be called if there is data to be fetched, please check hasData first"; - return this.data; - } - - /** - * Process any changes needed to the allocation based on this fetch result. - */ - public void processAllocation(RoutingAllocation allocation) { - for (Map.Entry> entry : ignoredShardToNodes.entrySet()) { - ShardId shardId = entry.getKey(); - Set ignoreNodes = entry.getValue(); - if (ignoreNodes.isEmpty() == false) { - ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); - } - } - - } - } - - /** - * The result of a fetch operation. Make sure to first check {@link #hasData()} before - * fetching the actual data. - */ - public static class AdaptedResultsForShard { - - private final ShardId shardId; - private final Map data; - private final Set ignoreNodes; - - public AdaptedResultsForShard(ShardId shardId, Map data, Set ignoreNodes) { - this.shardId = shardId; - this.data = data; - this.ignoreNodes = ignoreNodes; - } - - /** - * Does the result actually contain data? If not, then there are on going fetch - * operations happening, and it should wait for it. - */ - public boolean hasData() { - return data != null; - } - - /** - * Returns the actual data, note, make sure to check {@link #hasData()} first and - * only use this when there is an actual data. - */ - public Map getData() { - assert data != null : "getData should only be called if there is data to be fetched, please check hasData first"; - return this.data; - } - } - - /** - * A node entry, holding the state of the fetched data for a specific shard - * for a giving node. - */ - static class NodeEntry { - private final String nodeId; - private boolean fetching; - @Nullable - private T value; - private boolean valueSet; - private Throwable failure; - private long fetchingRound; - - NodeEntry(String nodeId) { - this.nodeId = nodeId; - } - - String getNodeId() { - return this.nodeId; - } - - boolean isFetching() { - return fetching; - } - - void markAsFetching(long fetchingRound) { - assert fetching == false : "double marking a node as fetching"; - this.fetching = true; - this.fetchingRound = fetchingRound; - } - - void doneFetching(T value) { - assert fetching : "setting value but not in fetching mode"; - assert failure == null : "setting value when failure already set"; - this.valueSet = true; - this.value = value; - this.fetching = false; - } - - void doneFetching(Throwable failure) { - assert fetching : "setting value but not in fetching mode"; - assert valueSet == false : "setting failure when already set value"; - assert failure != null : "setting failure can't be null"; - this.failure = failure; - this.fetching = false; - } - - void restartFetching() { - assert fetching : "restarting fetching, but not in fetching mode"; - assert valueSet == false : "value can't be set when restarting fetching"; - assert failure == null : "failure can't be set when restarting fetching"; - this.fetching = false; - } - - boolean isFailed() { - return failure != null; - } - - boolean hasData() { - return valueSet || failure != null; - } - - Throwable getFailure() { - assert hasData() : "getting failure when data has not been fetched"; - return failure; - } - - @Nullable - T getValue() { - assert failure == null : "trying to fetch value, but its marked as failed, check isFailed"; - assert valueSet : "value is not set, hasn't been fetched yet"; - return value; - } - - long getFetchingRound() { - return fetchingRound; - } - } -} From 4def9746759acf845eae3fc3a44f1fd3b105804d Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 30 Aug 2023 16:47:10 +0530 Subject: [PATCH 5/9] Applied spotlesscheck on tests Signed-off-by: Gaurav Chandani --- .../test/gateway/TestGatewayAllocator.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index 824c85dfb1f84..51d7bfd5e8445 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -98,9 +98,11 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR ) ); - return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>(){{ - put(shardId, ignoreNodes); - }}); + return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>() { + { + put(shardId, ignoreNodes); + } + }); } }; @@ -113,9 +115,11 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // for now, just pretend no node has data final ShardId shardId = shard.shardId(); - return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>(){{ - put(shardId, allocation.getIgnoreNodes(shardId)); - }}); + return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>() { + { + put(shardId, allocation.getIgnoreNodes(shardId)); + } + }); } @Override From f4e15e1a44776f1763ed2263da61941b27cd381c Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Fri, 22 Sep 2023 17:42:44 +0530 Subject: [PATCH 6/9] Added non null checks and test changes for new constructor Signed-off-by: Gaurav Chandani --- .../org/opensearch/gateway/AsyncShardFetch.java | 8 +++++--- .../opensearch/gateway/AsyncShardFetchTests.java | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index eed46c2940f44..bc3fb93774ed1 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -54,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -104,7 +105,7 @@ protected AsyncShardFetch( this.logger = logger; this.type = type; shardToCustomDataPath = new HashMap<>(); - shardToCustomDataPath.put(shardId, customDataPath); + shardToCustomDataPath.put(Objects.requireNonNull(shardId), Objects.requireNonNull(customDataPath)); this.action = (Lister, T>) action; this.logKey = "ShardId=[" + shardId.toString() + "]"; enableBatchMode = false; @@ -120,7 +121,7 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - this.shardToCustomDataPath = shardToCustomDataPath; + this.shardToCustomDataPath = Objects.requireNonNull(shardToCustomDataPath); this.action = (Lister, T>) action; this.logKey = "BatchID=[" + batchId + "]"; enableBatchMode = true; @@ -221,6 +222,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map ignoreNodeSet.isEmpty() == false)) { reroute( @@ -326,7 +328,7 @@ protected synchronized void processAsyncFetch(List responses, List shardToCustomDataPath = new HashMap<>(); + shardToCustomDataPath.put(new ShardId("index1", "index_uuid1", 0), ""); + shardToCustomDataPath.put(new ShardId("index2", "index_uuid2", 0), ""); + this.test = new TestFetch(threadPool, shardToCustomDataPath); + } } @After @@ -403,6 +411,11 @@ static class Entry { this.threadPool = threadPool; } + TestFetch(ThreadPool threadPool, Map shardToCustomDataPath) { + super(LogManager.getLogger(TestFetch.class), "test", shardToCustomDataPath, null, "test-batch"); + this.threadPool = threadPool; + } + public void addSimulation(String nodeId, Response response) { simulations.put(nodeId, new Entry(response, null)); } From 6242d2eda920d8a988b0aba35a9d4901ba538820 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Tue, 5 Dec 2023 12:41:48 +0530 Subject: [PATCH 7/9] Addressed PR comments(#8742) 1. Instead of using Map in Fetcher class now using Map for making code more extensible 2. Added UT for newly added constructor in fetcher class 3. Renamed logKey to RerouteKey 4. Add IllegalStateException for non-batched fetch Signed-off-by: Gaurav Chandani --- .../TransportIndicesShardStoresAction.java | 2 +- .../opensearch/gateway/AsyncShardFetch.java | 84 ++++++++++--------- .../opensearch/gateway/GatewayAllocator.java | 13 +-- ...ransportNodesListGatewayStartedShards.java | 13 ++- .../indices/store/ShardAttributes.java | 59 +++++++++++++ .../TransportNodesListShardStoreMetadata.java | 12 +-- .../gateway/AsyncShardFetchTests.java | 15 ++-- 7 files changed, 129 insertions(+), 69 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/store/ShardAttributes.java diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index d7859fb17bcb6..04166c88a00ad 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -223,7 +223,7 @@ protected synchronized void processAsyncFetch( List failures, long fetchingRound ) { - fetchResponses.add(new Response(shardToCustomDataPath.keySet().iterator().next(), responses, failures)); + fetchResponses.add(new Response(shardAttributesMap.keySet().iterator().next(), responses, failures)); if (expectedOps.countDown()) { finish(); } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index bc3fb93774ed1..306888fb0f2ee 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -46,6 +46,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -54,7 +55,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -69,6 +69,7 @@ * and once the results are back, it makes sure to schedule a reroute to make sure those results will * be taken into account. * + * It comes in two modes, to single fetch a shard or fetch a batch of shards. * @opensearch.internal */ public abstract class AsyncShardFetch implements Releasable { @@ -77,19 +78,18 @@ public abstract class AsyncShardFetch implements Rel * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener); + void list(Map shardAttributesMap, DiscoveryNode[] nodes, ActionListener listener); } protected final Logger logger; protected final String type; - - protected final Map shardToCustomDataPath; + protected final Map shardAttributesMap; private final Lister, T> action; private final Map> cache = new HashMap<>(); private final AtomicLong round = new AtomicLong(); private boolean closed; - private final String logKey; + private final String reroutingKey; private final Map> shardToIgnoreNodes = new HashMap<>(); private final boolean enableBatchMode; @@ -104,26 +104,35 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - shardToCustomDataPath = new HashMap<>(); - shardToCustomDataPath.put(Objects.requireNonNull(shardId), Objects.requireNonNull(customDataPath)); + shardAttributesMap =new HashMap<>(); + shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); this.action = (Lister, T>) action; - this.logKey = "ShardId=[" + shardId.toString() + "]"; + this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; enableBatchMode = false; } + /** + * Added to fetch a batch of shards from nodes + * + * @param logger Logger + * @param type type of action + * @param shardAttributesMap Map of {@link ShardId} to {@link ShardAttributes} to perform fetching on them a + * @param action Transport Action + * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification + */ @SuppressWarnings("unchecked") protected AsyncShardFetch( Logger logger, String type, - Map shardToCustomDataPath, + Map shardAttributesMap, Lister, T> action, String batchId ) { this.logger = logger; this.type = type; - this.shardToCustomDataPath = Objects.requireNonNull(shardToCustomDataPath); + this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; - this.logKey = "BatchID=[" + batchId + "]"; + this.reroutingKey = "BatchID=[" + batchId+ "]"; enableBatchMode = true; } @@ -154,15 +163,19 @@ public synchronized int getNumberOfInFlightFetches() { */ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { if (closed) { - throw new IllegalStateException(logKey + ": can't fetch data on closed async fetch"); + throw new IllegalStateException(reroutingKey + ": can't fetch data on closed async fetch"); } if (enableBatchMode == false) { // we will do assertions here on ignoreNodes - assert ignoreNodes.size() <= 1 : "Can only have at-most one shard"; - if (ignoreNodes.size() == 1) { - assert shardToCustomDataPath.containsKey(ignoreNodes.keySet().iterator().next()) - : "ShardId should be same as initialised in fetcher"; + if (ignoreNodes.size() > 1) { + throw new IllegalStateException("Fetching Shard Data, " + reroutingKey + "Can only have atmost one shard" + + "for non-batch mode" ); + } + if(ignoreNodes.size() == 1) { + if (shardAttributesMap.containsKey(ignoreNodes.keySet().iterator().next()) == false) { + throw new IllegalStateException("Shard Id must be same as initialized in AsyncShardFetch. Expecting = " + reroutingKey); + } } } @@ -222,17 +235,10 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map ignoreNodeSet.isEmpty() == false)) { - reroute( - logKey, - "nodes failed [" - + failedNodes.size() - + "], ignored [" - + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() - + "]" - ); + + if (failedNodes.isEmpty() == false || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) { + reroute(reroutingKey, "nodes failed [" + failedNodes.size() + "], ignored [" + + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + "]"); } return new FetchResult<>(fetchData, allIgnoreNodesMap); @@ -248,10 +254,10 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map responses, List failures, long fetchingRound) { if (closed) { // we are closed, no need to process this async fetch at all - logger.trace("{} ignoring fetched [{}] results, already closed", logKey, type); + logger.trace("{} ignoring fetched [{}] results, already closed", reroutingKey, type); return; } - logger.trace("{} processing fetched [{}] results", logKey, type); + logger.trace("{} processing fetched [{}] results", reroutingKey, type); if (responses != null) { for (T response : responses) { @@ -261,7 +267,7 @@ protected synchronized void processAsyncFetch(List responses, List fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + reroutingKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -270,14 +276,14 @@ protected synchronized void processAsyncFetch(List responses, List responses, List nodeEntry = cache.get(failure.nodeId()); if (nodeEntry != null) { if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + reroutingKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -310,7 +316,7 @@ protected synchronized void processAsyncFetch(List responses, List new ParameterizedMessage( "{}: failed to list shard for {} on node [{}]", - logKey, + reroutingKey, type, failure.nodeId() ), @@ -322,13 +328,13 @@ protected synchronized void processAsyncFetch(List responses, List> shardCache) { */ // visible for testing void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { - logger.trace("{} fetching [{}] from {}", logKey, type, nodes); - action.list(shardToCustomDataPath, nodes, new ActionListener>() { + logger.trace("{} fetching [{}] from {}", reroutingKey, type, nodes); + action.list(shardAttributesMap, nodes, new ActionListener>() { @Override public void onResponse(BaseNodesResponse response) { processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index f46050714f024..5a72986a5c8e4 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -53,6 +53,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import java.util.Collections; @@ -227,8 +228,8 @@ private static void clearCacheForPrimary( AsyncShardFetch fetch, RoutingAllocation allocation ) { - assert fetch.shardToCustomDataPath.size() == 1 : "expected only one shard"; - ShardId shardId = fetch.shardToCustomDataPath.keySet().iterator().next(); + assert fetch.shardAttributesMap.size() == 1 : "expected only one shard"; + ShardId shardId = fetch.shardAttributesMap.keySet().iterator().next(); ShardRouting primary = allocation.routingNodes().activePrimary(shardId); if (primary != null) { fetch.clearCacheForNode(primary.currentNodeId()); @@ -257,15 +258,15 @@ class InternalAsyncFetch extends AsyncShardFetch } @Override - protected void reroute(String logKey, String reason) { - logger.trace("{} scheduling reroute for {}", logKey, reason); + protected void reroute(String reroutingKey, String reason) { + logger.trace("{} scheduling reroute for {}", reroutingKey, reason); assert rerouteService != null; rerouteService.reroute( "async_shard_fetch", Priority.HIGH, ActionListener.wrap( - r -> logger.trace("{} scheduled reroute completed for {}", logKey, reason), - e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", logKey, reason), e) + r -> logger.trace("{} scheduled reroute completed for {}", reroutingKey, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", reroutingKey, reason), e) ) ); } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 03ba5fd62eb6a..d90be0449470d 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -62,6 +62,7 @@ import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; @@ -125,14 +126,10 @@ public TransportNodesListGatewayStartedShards( } @Override - public void list( - Map shardIdsWithCustomDataPath, - DiscoveryNode[] nodes, - ActionListener listener - ) { - assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; - final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); - final String customDataPath = shardIdsWithCustomDataPath.get(shardId); + public void list(Map shardAttributesMap, DiscoveryNode[] nodes, ActionListener listener) { + assert shardAttributesMap.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardAttributesMap.keySet().iterator().next(); + final String customDataPath = shardAttributesMap.get(shardId).getCustomDataPath(); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java new file mode 100644 index 0000000000000..4ef4e91f7af8c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.opensearch.common.Nullable; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.AsyncShardFetch; + +import java.io.IOException; + +/** + * This class contains information about the shard that needs to be sent as part of request in Transport Action implementing + * {@link AsyncShardFetch.Lister} to fetch shard information in async manner + * + * @opensearch.internal + */ +public class ShardAttributes implements Writeable { + private final ShardId shardId; + @Nullable + private final String customDataPath; + + public ShardAttributes(ShardId shardId, String customDataPath) { + this.shardId = shardId; + this.customDataPath = customDataPath; + } + + public ShardAttributes(StreamInput in) throws IOException { + shardId = new ShardId(in); + customDataPath = in.readString(); + } + + public ShardId getShardId() { + return shardId; + } + + /** + * Returns the custom data path that is used to look up information for this shard. + * Returns an empty string if no custom data path is used for this index. + * Returns null if custom data path information is not available (due to BWC). + */ + @Nullable + public String getCustomDataPath() { + return customDataPath; + } + + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeString(customDataPath); + } +} diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index 492663ef55738..730902c862b79 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -126,14 +126,10 @@ public TransportNodesListShardStoreMetadata( } @Override - public void list( - Map shardIdsWithCustomDataPath, - DiscoveryNode[] nodes, - ActionListener listener - ) { - assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; - final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); - final String customDataPath = shardIdsWithCustomDataPath.get(shardId); + public void list(Map shardAttributes, DiscoveryNode[] nodes, ActionListener listener) { + assert shardAttributes.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardAttributes.keySet().iterator().next(); + final String customDataPath = shardAttributes.get(shardId).getCustomDataPath(); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 94fbe8ce85430..e1d0034d3b06a 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -88,9 +89,11 @@ public void setUp() throws Exception { if (randomBoolean()) { this.test = new TestFetch(threadPool); } else { - HashMap shardToCustomDataPath = new HashMap<>(); - shardToCustomDataPath.put(new ShardId("index1", "index_uuid1", 0), ""); - shardToCustomDataPath.put(new ShardId("index2", "index_uuid2", 0), ""); + HashMap shardToCustomDataPath = new HashMap<>(); + ShardId shardId0 = new ShardId("index1", "index_uuid1", 0); + ShardId shardId1 = new ShardId("index2", "index_uuid2", 0); + shardToCustomDataPath.put(shardId0,new ShardAttributes(shardId0, "")); + shardToCustomDataPath.put(shardId1, new ShardAttributes(shardId1, "")); this.test = new TestFetch(threadPool, shardToCustomDataPath); } } @@ -410,12 +413,10 @@ static class Entry { super(LogManager.getLogger(TestFetch.class), "test", new ShardId("test", "_na_", 1), "", null); this.threadPool = threadPool; } - - TestFetch(ThreadPool threadPool, Map shardToCustomDataPath) { - super(LogManager.getLogger(TestFetch.class), "test", shardToCustomDataPath, null, "test-batch"); + TestFetch(ThreadPool threadPool, Map shardAttributesMap) { + super(LogManager.getLogger(TestFetch.class), "test", shardAttributesMap, null, "test-batch"); this.threadPool = threadPool; } - public void addSimulation(String nodeId, Response response) { simulations.put(nodeId, new Entry(response, null)); } From 7dc30a4fc8744146fb387971bbf6af959956f3b5 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Tue, 5 Dec 2023 16:45:06 +0530 Subject: [PATCH 8/9] Apply spotless checked in AsyncShardFetch PR Signed-off-by: Gaurav Chandani --- .../opensearch/gateway/AsyncShardFetch.java | 26 ++++++++++++------- .../opensearch/gateway/GatewayAllocator.java | 1 - ...ransportNodesListGatewayStartedShards.java | 6 ++++- .../TransportNodesListShardStoreMetadata.java | 8 ++++-- .../gateway/AsyncShardFetchTests.java | 4 ++- 5 files changed, 31 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 306888fb0f2ee..50774f7e0cb1c 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -104,7 +104,7 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - shardAttributesMap =new HashMap<>(); + shardAttributesMap = new HashMap<>(); shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; @@ -132,7 +132,7 @@ protected AsyncShardFetch( this.type = type; this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; - this.reroutingKey = "BatchID=[" + batchId+ "]"; + this.reroutingKey = "BatchID=[" + batchId + "]"; enableBatchMode = true; } @@ -169,10 +169,11 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map 1) { - throw new IllegalStateException("Fetching Shard Data, " + reroutingKey + "Can only have atmost one shard" + - "for non-batch mode" ); + throw new IllegalStateException( + "Fetching Shard Data, " + reroutingKey + "Can only have atmost one shard" + "for non-batch mode" + ); } - if(ignoreNodes.size() == 1) { + if (ignoreNodes.size() == 1) { if (shardAttributesMap.containsKey(ignoreNodes.keySet().iterator().next()) == false) { throw new IllegalStateException("Shard Id must be same as initialized in AsyncShardFetch. Expecting = " + reroutingKey); } @@ -236,9 +237,16 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map ignoreNodeSet.isEmpty() == false)) { - reroute(reroutingKey, "nodes failed [" + failedNodes.size() + "], ignored [" - + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + "]"); + if (failedNodes.isEmpty() == false + || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) { + reroute( + reroutingKey, + "nodes failed [" + + failedNodes.size() + + "], ignored [" + + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + + "]" + ); } return new FetchResult<>(fetchData, allIgnoreNodesMap); @@ -334,7 +342,7 @@ protected synchronized void processAsyncFetch(List responses, List shardAttributesMap, DiscoveryNode[] nodes, ActionListener listener) { + public void list( + Map shardAttributesMap, + DiscoveryNode[] nodes, + ActionListener listener + ) { assert shardAttributesMap.size() == 1 : "only one shard should be specified"; final ShardId shardId = shardAttributesMap.keySet().iterator().next(); final String customDataPath = shardAttributesMap.get(shardId).getCustomDataPath(); diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index 730902c862b79..5a3c1038cd5f0 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -126,8 +126,12 @@ public TransportNodesListShardStoreMetadata( } @Override - public void list(Map shardAttributes, DiscoveryNode[] nodes, ActionListener listener) { - assert shardAttributes.size() == 1 : "only one shard should be specified"; + public void list( + Map shardAttributes, + DiscoveryNode[] nodes, + ActionListener listener + ) { + assert shardAttributes.size() == 1 : "only one shard should be specified"; final ShardId shardId = shardAttributes.keySet().iterator().next(); final String customDataPath = shardAttributes.get(shardId).getCustomDataPath(); execute(new Request(shardId, customDataPath, nodes), listener); diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index e1d0034d3b06a..4e5e9c71e1fe4 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -92,7 +92,7 @@ public void setUp() throws Exception { HashMap shardToCustomDataPath = new HashMap<>(); ShardId shardId0 = new ShardId("index1", "index_uuid1", 0); ShardId shardId1 = new ShardId("index2", "index_uuid2", 0); - shardToCustomDataPath.put(shardId0,new ShardAttributes(shardId0, "")); + shardToCustomDataPath.put(shardId0, new ShardAttributes(shardId0, "")); shardToCustomDataPath.put(shardId1, new ShardAttributes(shardId1, "")); this.test = new TestFetch(threadPool, shardToCustomDataPath); } @@ -413,10 +413,12 @@ static class Entry { super(LogManager.getLogger(TestFetch.class), "test", new ShardId("test", "_na_", 1), "", null); this.threadPool = threadPool; } + TestFetch(ThreadPool threadPool, Map shardAttributesMap) { super(LogManager.getLogger(TestFetch.class), "test", shardAttributesMap, null, "test-batch"); this.threadPool = threadPool; } + public void addSimulation(String nodeId, Response response) { simulations.put(nodeId, new Entry(response, null)); } From 8d56eca844e6ccd369877a61648d6a6e5823f48a Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Tue, 19 Dec 2023 14:49:06 +0530 Subject: [PATCH 9/9] Added ShardAttributes UTs Signed-off-by: Gaurav Chandani --- .../indices/store/ShardAttributesTests.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java diff --git a/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java new file mode 100644 index 0000000000000..7fa95fefe72fd --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.opensearch.core.common.io.stream.DataOutputStreamOutput; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class ShardAttributesTests extends OpenSearchTestCase { + + Index index = new Index("index", "test-uid"); + ShardId shardId = new ShardId(index, 0); + String customDataPath = "/path/to/data"; + + public void testShardAttributesConstructor() { + ShardAttributes attributes = new ShardAttributes(shardId, customDataPath); + assertEquals(attributes.getShardId(), shardId); + assertEquals(attributes.getCustomDataPath(), customDataPath); + } + + public void testSerialization() throws IOException { + ShardAttributes attributes1 = new ShardAttributes(shardId, customDataPath); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + StreamOutput output = new DataOutputStreamOutput(new DataOutputStream(bytes)); + attributes1.writeTo(output); + output.close(); + StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(bytes.toByteArray())); + ShardAttributes attributes2 = new ShardAttributes(input); + input.close(); + assertEquals(attributes1.getShardId(), attributes2.getShardId()); + assertEquals(attributes1.getCustomDataPath(), attributes2.getCustomDataPath()); + } + +}