From ff721acd084c6f7cf62a32facd83fa4f24fa1ef8 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Wed, 17 May 2023 11:08:25 +0530 Subject: [PATCH] Fail open stats in case the only healthy shard copy resided in weighed away az data node --- .../search/SearchWeightedRoutingIT.java | 132 ++++++++++++++++++ ...TransportFieldCapabilitiesIndexAction.java | 6 + .../search/AbstractSearchAsyncAction.java | 4 + .../broadcast/TransportBroadcastAction.java | 4 + .../shard/TransportSingleShardAction.java | 4 + .../routing/FailAwareWeightedRouting.java | 39 ++++++ 6 files changed, 189 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index a0bb2989b8328..a6fdc06a7f379 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -1429,4 +1429,136 @@ public void testWeightedRoutingFailOpenStats() throws Exception { WeightedRoutingStats.getInstance().resetFailOpenCount(); } + public void testFailOpenStatsWithOneHealthyStandbyShardCopy() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + WeightedRoutingStats.getInstance().resetFailOpenCount(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + + // Make Search Requests + Future[] responses = new Future[20]; + for (int i = 0; i < 20; i++) { + responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute(); + } + int failedCount = 0; + for (int i = 0; i < 20; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(0, searchResponse.getFailedShards()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + failedCount++; + } + } + + Assert.assertTrue(failedCount == 0); + assertSearchInAZ("c"); + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + + Map nodeIDMap = new HashMap<>(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().addMetric("weighted_routing").execute().actionGet(); + Map stats = nodeStats.getNodesMap(); + NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0))); + assertEquals(20 * numShards, nodeStatsC.getWeightedRoutingStats().getFailOpenCount()); + WeightedRoutingStats.getInstance().resetFailOpenCount(); + } + + public void testFailOpenStatsForMultiGetOneHealthyStandbyShardCopy() throws Exception { + + WeightedRoutingStats.getInstance().resetFailOpenCount(); + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 1; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Future[] responses = new Future[20]; + logger.info("--> making search requests"); + int index1, index2; + int docId = 0; + for (int i = 0; i < 20; i++) { + + index1 = docId++; + index2 = docId++; + responses[i] = internalCluster().smartClient() + .prepareMultiGet() + .add(new MultiGetRequest.Item("test", "" + index1)) + .add(new MultiGetRequest.Item("test", "" + index2)) + .execute(); + } + + int failedCount = 0; + for (int i = 0; i < 20; i++) { + try { + MultiGetResponse multiGetResponse = responses[i].get(); + assertThat(multiGetResponse.getResponses().length, equalTo(2)); + if (multiGetResponse.getResponses()[0].isFailed() || multiGetResponse.getResponses()[1].isFailed()) { + failedCount++; + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + Assert.assertTrue(failedCount == 0); + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + + Map nodeIDMap = new HashMap<>(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().addMetric("weighted_routing").execute().actionGet(); + Map stats = nodeStats.getNodesMap(); + NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0))); + assertEquals(20 * 2, nodeStatsC.getWeightedRoutingStats().getFailOpenCount()); + WeightedRoutingStats.getInstance().resetFailOpenCount(); + + } + } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index fc23ce97517f9..3896c1b092524 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -266,6 +266,7 @@ private ShardRouting nextRoutingOrNull(Exception failure) { if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { return null; } + ShardRouting next = FailAwareWeightedRouting.getInstance() .findNext(shardsIt.get(shardIndex), clusterService.state(), failure, this::moveToNextShard); @@ -305,6 +306,10 @@ private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShar if (node == null) { onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId())); } else { + // Update weighted routing fail open stats in case the only healthy shard copy is present + // in weighed away az data nodes + FailAwareWeightedRouting.getInstance() + .updateFailOpenStatsForOneHealthyCopy(shardsIt.get(shardIndex), clusterService.state(), shardRouting.currentNodeId()); request.shardId(shardRouting.shardId()); if (logger.isTraceEnabled()) { logger.trace("sending request [{}] on node [{}]", request, node); @@ -329,6 +334,7 @@ public String executor() { public void handleResponse(final FieldCapabilitiesIndexResponse response) { if (response.canMatch()) { listener.onResponse(response); + } else { moveToNextShard(); tryNext(null, false); diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 9a94737c84385..debae7c8b297f 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -283,6 +283,10 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator public void innerOnResponse(Result result) { try { onShardResult(result, shardIt); + // Update weighted routing fail open stats in case the only healthy shard copy is present + // in weighed away az data nodes + FailAwareWeightedRouting.getInstance() + .updateFailOpenStatsForOneHealthyCopy(shardIt, clusterState, shard.getNodeId()); } finally { executeNext(pendingExecutions, thread); } diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index 5abf97b7ef979..f7fa458610287 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -205,6 +205,10 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting // no node connected, act as failure onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); } else { + // Update weighted routing fail open stats in case the only healthy shard copy is present + // in weighed away az data nodes + FailAwareWeightedRouting.getInstance() + .updateFailOpenStatsForOneHealthyCopy(shardIt, clusterService.state(), shard.currentNodeId()); transportService.sendRequest( node, transportShardAction, diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index 927d3946a3643..4368850d49367 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -266,6 +266,10 @@ private void perform(@Nullable final Exception currentFailure) { if (node == null) { onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId())); } else { + // Update weighted routing fail open stats in case the only healthy shard copy is present + // in weighed away az data nodes + FailAwareWeightedRouting.getInstance() + .updateFailOpenStatsForOneHealthyCopy(shardIt, clusterService.state(), shardRouting.currentNodeId()); internalRequest.request().internalShardId = shardRouting.shardId(); if (logger.isTraceEnabled()) { logger.trace( diff --git a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java index 72c189f20eaf6..9339f0bb06b73 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java @@ -21,6 +21,7 @@ import java.util.List; import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING; +import static org.opensearch.cluster.routing.OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED; /** * This class contains logic to find next shard to retry search request in case of failure from other shard copy. @@ -121,6 +122,40 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster return next; } + /** + * This function updates fail open stats in case the only healthy shard copy resides in weighed away az data node. + * The stats are updated in case weighed shard routing and fail open is enabled for search requests. + * + * @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @param clusterState The current cluster state + * @param nodeID the id of the node containing current shard copy + */ + public void updateFailOpenStatsForOneHealthyCopy(final SearchShardIterator shardsIt, ClusterState clusterState, String nodeID) { + if (ignoreWeightedRouting(clusterState) && isFailOpenDisabled(clusterState)) { + return; + } + if (shardsIt.size() == 1 && WeightedRoutingUtils.isWeighedAway(nodeID, clusterState)) { + getWeightedRoutingStats().updateFailOpenCount(); + } + } + + /** + * This function updates fail open stats in case the only healthy shard copy resides in weighed away az data node. + * The stats are updated in case weighed shard routing and fail open is enabled for search requests. + * + * @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @param clusterState The current cluster state + * @param nodeID the id of the node containing current shard copy + */ + public void updateFailOpenStatsForOneHealthyCopy(final ShardsIterator shardsIt, ClusterState clusterState, String nodeID) { + if (ignoreWeightedRouting(clusterState) && isFailOpenDisabled(clusterState)) { + return; + } + if (shardsIt.size() == 1 && WeightedRoutingUtils.isWeighedAway(nodeID, clusterState)) { + getWeightedRoutingStats().updateFailOpenCount(); + } + } + /** * * * @return true if can fail open ie request shard copies present in nodes with weighted shard @@ -144,6 +179,10 @@ private boolean ignoreWeightedRouting(ClusterState clusterState) { return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings()); } + private boolean isFailOpenDisabled(ClusterState clusterState) { + return WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(clusterState.getMetadata().settings()); + } + public WeightedRoutingStats getWeightedRoutingStats() { return WeightedRoutingStats.getInstance(); }