From 8126c0c1473d691cc5daed7bf08036627497c734 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Tue, 2 Jan 2024 23:14:16 +0530 Subject: [PATCH] Batch Async Fetcher class changes (#8742) * Async Fetcher class changes Signed-off-by: Gaurav Chandani Signed-off-by: Shivansh Arora --- .../TransportIndicesShardStoresAction.java | 6 +- .../opensearch/gateway/AsyncShardFetch.java | 140 +++++++++++++----- ...ransportNodesListGatewayStartedShards.java | 11 +- .../indices/store/ShardAttributes.java | 59 ++++++++ .../TransportNodesListShardStoreMetadata.java | 10 +- .../gateway/AsyncShardFetchTests.java | 80 ++++++---- .../gateway/PrimaryShardAllocatorTests.java | 6 +- .../gateway/ReplicaShardAllocatorTests.java | 6 +- .../indices/store/ShardAttributesTests.java | 49 ++++++ 9 files changed, 291 insertions(+), 76 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/store/ShardAttributes.java create mode 100644 server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index 41225bc362235..04166c88a00ad 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -195,7 +195,7 @@ void start() { } else { for (Tuple shard : shards) { InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), listShardStoresInfo); - fetch.fetchData(nodes, Collections.emptySet()); + fetch.fetchData(nodes, Collections.emptyMap()); } } } @@ -223,7 +223,7 @@ protected synchronized void processAsyncFetch( List failures, long fetchingRound ) { - fetchResponses.add(new Response(shardId, responses, failures)); + fetchResponses.add(new Response(shardAttributesMap.keySet().iterator().next(), responses, failures)); if (expectedOps.countDown()) { finish(); } @@ -312,7 +312,7 @@ private boolean shardExistsInNode(final NodeGatewayStartedShards response) { } @Override - protected void reroute(ShardId shardId, String reason) { + protected void reroute(String shardId, String reason) { // no-op } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index d0ade4eb25168..50774f7e0cb1c 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -46,6 +46,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -54,12 +55,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import static java.util.Collections.emptySet; -import static java.util.Collections.unmodifiableSet; +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; /** * Allows to asynchronously fetch shard related data from other nodes for allocation, without blocking @@ -69,6 +69,7 @@ * and once the results are back, it makes sure to schedule a reroute to make sure those results will * be taken into account. * + * It comes in two modes, to single fetch a shard or fetch a batch of shards. * @opensearch.internal */ public abstract class AsyncShardFetch implements Releasable { @@ -77,18 +78,21 @@ public abstract class AsyncShardFetch implements Rel * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(ShardId shardId, @Nullable String customDataPath, DiscoveryNode[] nodes, ActionListener listener); + void list(Map shardAttributesMap, DiscoveryNode[] nodes, ActionListener listener); + } protected final Logger logger; protected final String type; - protected final ShardId shardId; - protected final String customDataPath; + protected final Map shardAttributesMap; private final Lister, T> action; private final Map> cache = new HashMap<>(); - private final Set nodesToIgnore = new HashSet<>(); private final AtomicLong round = new AtomicLong(); private boolean closed; + private final String reroutingKey; + private final Map> shardToIgnoreNodes = new HashMap<>(); + + private final boolean enableBatchMode; @SuppressWarnings("unchecked") protected AsyncShardFetch( @@ -100,9 +104,36 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - this.shardId = Objects.requireNonNull(shardId); - this.customDataPath = Objects.requireNonNull(customDataPath); + shardAttributesMap = new HashMap<>(); + shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + this.action = (Lister, T>) action; + this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; + enableBatchMode = false; + } + + /** + * Added to fetch a batch of shards from nodes + * + * @param logger Logger + * @param type type of action + * @param shardAttributesMap Map of {@link ShardId} to {@link ShardAttributes} to perform fetching on them a + * @param action Transport Action + * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification + */ + @SuppressWarnings("unchecked") + protected AsyncShardFetch( + Logger logger, + String type, + Map shardAttributesMap, + Lister, T> action, + String batchId + ) { + this.logger = logger; + this.type = type; + this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; + this.reroutingKey = "BatchID=[" + batchId + "]"; + enableBatchMode = true; } @Override @@ -130,11 +161,32 @@ public synchronized int getNumberOfInFlightFetches() { * The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need * to keep them around and make sure we add them back when all the responses are fetched and returned. */ - public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set ignoreNodes) { + public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { if (closed) { - throw new IllegalStateException(shardId + ": can't fetch data on closed async fetch"); + throw new IllegalStateException(reroutingKey + ": can't fetch data on closed async fetch"); } - nodesToIgnore.addAll(ignoreNodes); + + if (enableBatchMode == false) { + // we will do assertions here on ignoreNodes + if (ignoreNodes.size() > 1) { + throw new IllegalStateException( + "Fetching Shard Data, " + reroutingKey + "Can only have atmost one shard" + "for non-batch mode" + ); + } + if (ignoreNodes.size() == 1) { + if (shardAttributesMap.containsKey(ignoreNodes.keySet().iterator().next()) == false) { + throw new IllegalStateException("Shard Id must be same as initialized in AsyncShardFetch. Expecting = " + reroutingKey); + } + } + } + + // add the nodes to ignore to the list of nodes to ignore for each shard + for (Map.Entry> ignoreNodesEntry : ignoreNodes.entrySet()) { + Set ignoreNodesSet = shardToIgnoreNodes.getOrDefault(ignoreNodesEntry.getKey(), new HashSet<>()); + ignoreNodesSet.addAll(ignoreNodesEntry.getValue()); + shardToIgnoreNodes.put(ignoreNodesEntry.getKey(), ignoreNodesSet); + } + fillShardCacheWithDataNodes(cache, nodes); List> nodesToFetch = findNodesToFetch(cache); if (nodesToFetch.isEmpty() == false) { @@ -153,7 +205,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set i // if we are still fetching, return null to indicate it if (hasAnyNodeFetching(cache)) { - return new FetchResult<>(shardId, null, emptySet()); + return new FetchResult<>(null, emptyMap()); } else { // nothing to fetch, yay, build the return value Map fetchData = new HashMap<>(); @@ -177,16 +229,27 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set i } } } - Set allIgnoreNodes = unmodifiableSet(new HashSet<>(nodesToIgnore)); + + Map> allIgnoreNodesMap = unmodifiableMap(new HashMap<>(shardToIgnoreNodes)); // clear the nodes to ignore, we had a successful run in fetching everything we can // we need to try them if another full run is needed - nodesToIgnore.clear(); + shardToIgnoreNodes.clear(); // if at least one node failed, make sure to have a protective reroute // here, just case this round won't find anything, and we need to retry fetching data - if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) { - reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]"); + + if (failedNodes.isEmpty() == false + || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) { + reroute( + reroutingKey, + "nodes failed [" + + failedNodes.size() + + "], ignored [" + + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + + "]" + ); } - return new FetchResult<>(shardId, fetchData, allIgnoreNodes); + + return new FetchResult<>(fetchData, allIgnoreNodesMap); } } @@ -199,10 +262,10 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set i protected synchronized void processAsyncFetch(List responses, List failures, long fetchingRound) { if (closed) { // we are closed, no need to process this async fetch at all - logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type); + logger.trace("{} ignoring fetched [{}] results, already closed", reroutingKey, type); return; } - logger.trace("{} processing fetched [{}] results", shardId, type); + logger.trace("{} processing fetched [{}] results", reroutingKey, type); if (responses != null) { for (T response : responses) { @@ -212,7 +275,7 @@ protected synchronized void processAsyncFetch(List responses, List fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - shardId, + reroutingKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -221,14 +284,14 @@ protected synchronized void processAsyncFetch(List responses, List responses, List nodeEntry = cache.get(failure.nodeId()); if (nodeEntry != null) { if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - shardId, + reroutingKey, nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -261,7 +324,7 @@ protected synchronized void processAsyncFetch(List responses, List new ParameterizedMessage( "{}: failed to list shard for {} on node [{}]", - shardId, + reroutingKey, type, failure.nodeId() ), @@ -273,13 +336,13 @@ protected synchronized void processAsyncFetch(List responses, List> shardCache) { */ // visible for testing void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { - logger.trace("{} fetching [{}] from {}", shardId, type, nodes); - action.list(shardId, customDataPath, nodes, new ActionListener>() { + logger.trace("{} fetching [{}] from {}", reroutingKey, type, nodes); + action.list(shardAttributesMap, nodes, new ActionListener>() { @Override public void onResponse(BaseNodesResponse response) { processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); @@ -358,14 +421,12 @@ public void onFailure(Exception e) { */ public static class FetchResult { - private final ShardId shardId; private final Map data; - private final Set ignoreNodes; + private final Map> ignoredShardToNodes; - public FetchResult(ShardId shardId, Map data, Set ignoreNodes) { - this.shardId = shardId; + public FetchResult(Map data, Map> ignoreNodes) { this.data = data; - this.ignoreNodes = ignoreNodes; + this.ignoredShardToNodes = ignoreNodes; } /** @@ -389,9 +450,14 @@ public Map getData() { * Process any changes needed to the allocation based on this fetch result. */ public void processAllocation(RoutingAllocation allocation) { - for (String ignoreNode : ignoreNodes) { - allocation.addIgnoreShardForNode(shardId, ignoreNode); + for (Map.Entry> entry : ignoredShardToNodes.entrySet()) { + ShardId shardId = entry.getKey(); + Set ignoreNodes = entry.getValue(); + if (ignoreNodes.isEmpty() == false) { + ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); + } } + } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index a47893f37a97c..601a5c671d67c 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -62,12 +62,14 @@ import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -124,7 +126,14 @@ public TransportNodesListGatewayStartedShards( } @Override - public void list(ShardId shardId, String customDataPath, DiscoveryNode[] nodes, ActionListener listener) { + public void list( + Map shardAttributesMap, + DiscoveryNode[] nodes, + ActionListener listener + ) { + assert shardAttributesMap.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardAttributesMap.keySet().iterator().next(); + final String customDataPath = shardAttributesMap.get(shardId).getCustomDataPath(); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java new file mode 100644 index 0000000000000..4ef4e91f7af8c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.opensearch.common.Nullable; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.AsyncShardFetch; + +import java.io.IOException; + +/** + * This class contains information about the shard that needs to be sent as part of request in Transport Action implementing + * {@link AsyncShardFetch.Lister} to fetch shard information in async manner + * + * @opensearch.internal + */ +public class ShardAttributes implements Writeable { + private final ShardId shardId; + @Nullable + private final String customDataPath; + + public ShardAttributes(ShardId shardId, String customDataPath) { + this.shardId = shardId; + this.customDataPath = customDataPath; + } + + public ShardAttributes(StreamInput in) throws IOException { + shardId = new ShardId(in); + customDataPath = in.readString(); + } + + public ShardId getShardId() { + return shardId; + } + + /** + * Returns the custom data path that is used to look up information for this shard. + * Returns an empty string if no custom data path is used for this index. + * Returns null if custom data path information is not available (due to BWC). + */ + @Nullable + public String getCustomDataPath() { + return customDataPath; + } + + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeString(customDataPath); + } +} diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index 0afeae253ae14..5a3c1038cd5f0 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -73,6 +73,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -125,7 +126,14 @@ public TransportNodesListShardStoreMetadata( } @Override - public void list(ShardId shardId, String customDataPath, DiscoveryNode[] nodes, ActionListener listener) { + public void list( + Map shardAttributes, + DiscoveryNode[] nodes, + ActionListener listener + ) { + assert shardAttributes.size() == 1 : "only one shard should be specified"; + final ShardId shardId = shardAttributes.keySet().iterator().next(); + final String customDataPath = shardAttributes.get(shardId).getCustomDataPath(); execute(new Request(shardId, customDataPath, nodes), listener); } diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 31a27503069d7..4e5e9c71e1fe4 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -46,12 +47,13 @@ import org.junit.Before; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import static java.util.Collections.emptySet; +import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; @@ -84,7 +86,16 @@ public class AsyncShardFetchTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); this.threadPool = new TestThreadPool(getTestName()); - this.test = new TestFetch(threadPool); + if (randomBoolean()) { + this.test = new TestFetch(threadPool); + } else { + HashMap shardToCustomDataPath = new HashMap<>(); + ShardId shardId0 = new ShardId("index1", "index_uuid1", 0); + ShardId shardId1 = new ShardId("index2", "index_uuid2", 0); + shardToCustomDataPath.put(shardId0, new ShardAttributes(shardId0, "")); + shardToCustomDataPath.put(shardId1, new ShardAttributes(shardId1, "")); + this.test = new TestFetch(threadPool, shardToCustomDataPath); + } } @After @@ -97,7 +108,7 @@ public void testClose() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -107,7 +118,7 @@ public void testClose() throws Exception { assertThat(test.reroute.get(), equalTo(1)); test.close(); try { - test.fetchData(nodes, emptySet()); + test.fetchData(nodes, emptyMap()); fail("fetch data should fail when closed"); } catch (IllegalStateException e) { // all is well @@ -119,7 +130,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -127,7 +138,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception { test.fireSimulationAndWait(node1.getId()); // verify we get back the data node assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -139,7 +150,7 @@ public void testFullCircleSingleNodeFailure() throws Exception { test.addSimulation(node1.getId(), failure1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -147,19 +158,19 @@ public void testFullCircleSingleNodeFailure() throws Exception { test.fireSimulationAndWait(node1.getId()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); // on failure, we reset the failure on a successive call to fetchData, and try again afterwards test.addSimulation(node1.getId(), response1); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); test.fireSimulationAndWait(node1.getId()); // 2 reroutes, cause we have a failure that we clear assertThat(test.reroute.get(), equalTo(3)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -170,7 +181,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { test.addSimulation(node1.getId(), response1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -183,7 +194,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception { test.fireSimulationAndWait(node1.getId()); // verify we get back the data node assertThat(test.reroute.get(), equalTo(2)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -195,7 +206,7 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { test.addSimulation(node1.getId(), failure1); // first fetch, no data, still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -212,7 +223,7 @@ public void testIgnoreFailureFromDifferentRound() throws Exception { test.fireSimulationAndWait(node1.getId()); // failure, fetched data exists, but has no data assertThat(test.reroute.get(), equalTo(2)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(0)); } @@ -223,7 +234,7 @@ public void testTwoNodesOnSetup() throws Exception { test.addSimulation(node2.getId(), response2); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -231,14 +242,14 @@ public void testTwoNodesOnSetup() throws Exception { test.fireSimulationAndWait(node1.getId()); // there is still another on going request, so no data assertThat(test.getNumberOfInFlightFetches(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // no more ongoing requests, we should fetch the data assertThat(test.reroute.get(), equalTo(2)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -251,21 +262,21 @@ public void testTwoNodesOnSetupAndFailure() throws Exception { test.addSimulation(node2.getId(), failure2); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); // fire the first response, it should trigger a reroute test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(1)); - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); assertThat(test.reroute.get(), equalTo(2)); // since one of those failed, we should only have one entry - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -276,7 +287,7 @@ public void testTwoNodesAddedInBetween() throws Exception { test.addSimulation(node1.getId(), response1); // no fetched data, 2 requests still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -287,14 +298,14 @@ public void testTwoNodesAddedInBetween() throws Exception { nodes = DiscoveryNodes.builder(nodes).add(node2).build(); test.addSimulation(node2.getId(), response2); // no fetch data, has a new node introduced - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); // fire the second simulation, this should allow us to get the data test.fireSimulationAndWait(node2.getId()); // since one of those failed, we should only have one entry - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(2)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -309,7 +320,7 @@ public void testClearCache() throws Exception { test.clearCacheForNode(node1.getId()); // no fetched data, request still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -317,13 +328,13 @@ public void testClearCache() throws Exception { assertThat(test.reroute.get(), equalTo(1)); // verify we get back right data from node - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); // second fetch gets same data - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1)); @@ -334,14 +345,14 @@ public void testClearCache() throws Exception { test.addSimulation(node1.getId(), response1_2); // no fetched data, new request on going - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // verify we get new data back - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); @@ -352,7 +363,7 @@ public void testConcurrentRequestAndClearCache() throws Exception { test.addSimulation(node1.getId(), response1); // no fetched data, request still on going - AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); assertThat(test.reroute.get(), equalTo(0)); @@ -366,14 +377,14 @@ public void testConcurrentRequestAndClearCache() throws Exception { test.addSimulation(node1.getId(), response1_2); // verify still no fetched data, request still on going - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(false)); test.fireSimulationAndWait(node1.getId()); assertThat(test.reroute.get(), equalTo(2)); // verify we get new data back - fetchData = test.fetchData(nodes, emptySet()); + fetchData = test.fetchData(nodes, emptyMap()); assertThat(fetchData.hasData(), equalTo(true)); assertThat(fetchData.getData().size(), equalTo(1)); assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); @@ -403,6 +414,11 @@ static class Entry { this.threadPool = threadPool; } + TestFetch(ThreadPool threadPool, Map shardAttributesMap) { + super(LogManager.getLogger(TestFetch.class), "test", shardAttributesMap, null, "test-batch"); + this.threadPool = threadPool; + } + public void addSimulation(String nodeId, Response response) { simulations.put(nodeId, new Entry(response, null)); } @@ -418,7 +434,7 @@ public void fireSimulationAndWait(String nodeId) throws InterruptedException { } @Override - protected void reroute(ShardId shardId, String reason) { + protected void reroute(String shardId, String reason) { reroute.incrementAndGet(); } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index c31ce60cb96a1..dceda6433575c 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -857,7 +857,11 @@ protected AsyncShardFetch.FetchResult(shardId, data, Collections.emptySet()); + return new AsyncShardFetch.FetchResult<>(data, new HashMap<>() { + { + put(shardId, Collections.emptySet()); + } + }); } } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index 3eeebd8cab6e4..5833d9c4f187f 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -728,7 +728,11 @@ protected AsyncShardFetch.FetchResult(shardId, tData, Collections.emptySet()); + return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>() { + { + put(shardId, Collections.emptySet()); + } + }); } @Override diff --git a/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java new file mode 100644 index 0000000000000..7fa95fefe72fd --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.opensearch.core.common.io.stream.DataOutputStreamOutput; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class ShardAttributesTests extends OpenSearchTestCase { + + Index index = new Index("index", "test-uid"); + ShardId shardId = new ShardId(index, 0); + String customDataPath = "/path/to/data"; + + public void testShardAttributesConstructor() { + ShardAttributes attributes = new ShardAttributes(shardId, customDataPath); + assertEquals(attributes.getShardId(), shardId); + assertEquals(attributes.getCustomDataPath(), customDataPath); + } + + public void testSerialization() throws IOException { + ShardAttributes attributes1 = new ShardAttributes(shardId, customDataPath); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + StreamOutput output = new DataOutputStreamOutput(new DataOutputStream(bytes)); + attributes1.writeTo(output); + output.close(); + StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(bytes.toByteArray())); + ShardAttributes attributes2 = new ShardAttributes(input); + input.close(); + assertEquals(attributes1.getShardId(), attributes2.getShardId()); + assertEquals(attributes1.getCustomDataPath(), attributes2.getCustomDataPath()); + } + +}