Skip to content

Commit

Permalink
Fail open stats in case the only healthy shard copy resided in weighe…
Browse files Browse the repository at this point in the history
…d away az data node
  • Loading branch information
Anshu Agarwal committed May 17, 2023
1 parent c43d713 commit ff721ac
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1429,4 +1429,136 @@ public void testWeightedRoutingFailOpenStats() throws Exception {
WeightedRoutingStats.getInstance().resetFailOpenCount();
}

public void testFailOpenStatsWithOneHealthyStandbyShardCopy() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.build();
WeightedRoutingStats.getInstance().resetFailOpenCount();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 2;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);

logger.info("--> data nodes in zone a and b are stopped");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0)));
ensureStableCluster(2);

Set<String> hitNodes = new HashSet<>();

// Make Search Requests
Future<SearchResponse>[] responses = new Future[20];
for (int i = 0; i < 20; i++) {
responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute();
}
int failedCount = 0;
for (int i = 0; i < 20; i++) {
try {
SearchResponse searchResponse = responses[i].get();
assertEquals(0, searchResponse.getFailedShards());
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
} catch (Exception t) {
failedCount++;
}
}

Assert.assertTrue(failedCount == 0);
assertSearchInAZ("c");

DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();

Map<String, String> nodeIDMap = new HashMap<>();
for (DiscoveryNode node : dataNodes) {
nodeIDMap.put(node.getName(), node.getId());
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().addMetric("weighted_routing").execute().actionGet();
Map<String, NodeStats> stats = nodeStats.getNodesMap();
NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0)));
assertEquals(20 * numShards, nodeStatsC.getWeightedRoutingStats().getFailOpenCount());
WeightedRoutingStats.getInstance().resetFailOpenCount();
}

public void testFailOpenStatsForMultiGetOneHealthyStandbyShardCopy() throws Exception {

WeightedRoutingStats.getInstance().resetFailOpenCount();

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 1;
int numReplicas = 2;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);

logger.info("--> data nodes in zone a and b are stopped");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0)));
ensureStableCluster(2);

Future<MultiGetResponse>[] responses = new Future[20];
logger.info("--> making search requests");
int index1, index2;
int docId = 0;
for (int i = 0; i < 20; i++) {

index1 = docId++;
index2 = docId++;
responses[i] = internalCluster().smartClient()
.prepareMultiGet()
.add(new MultiGetRequest.Item("test", "" + index1))
.add(new MultiGetRequest.Item("test", "" + index2))
.execute();
}

int failedCount = 0;
for (int i = 0; i < 20; i++) {
try {
MultiGetResponse multiGetResponse = responses[i].get();
assertThat(multiGetResponse.getResponses().length, equalTo(2));
if (multiGetResponse.getResponses()[0].isFailed() || multiGetResponse.getResponses()[1].isFailed()) {
failedCount++;
}
} catch (Exception t) {
fail("search should not fail");
}
}

Assert.assertTrue(failedCount == 0);

DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();

Map<String, String> nodeIDMap = new HashMap<>();
for (DiscoveryNode node : dataNodes) {
nodeIDMap.put(node.getName(), node.getId());
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().addMetric("weighted_routing").execute().actionGet();
Map<String, NodeStats> stats = nodeStats.getNodesMap();
NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0)));
assertEquals(20 * 2, nodeStatsC.getWeightedRoutingStats().getFailOpenCount());
WeightedRoutingStats.getInstance().resetFailOpenCount();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}

ShardRouting next = FailAwareWeightedRouting.getInstance()
.findNext(shardsIt.get(shardIndex), clusterService.state(), failure, this::moveToNextShard);

Expand Down Expand Up @@ -305,6 +306,10 @@ private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShar
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardsIt.get(shardIndex), clusterService.state(), shardRouting.currentNodeId());
request.shardId(shardRouting.shardId());
if (logger.isTraceEnabled()) {
logger.trace("sending request [{}] on node [{}]", request, node);
Expand All @@ -329,6 +334,7 @@ public String executor() {
public void handleResponse(final FieldCapabilitiesIndexResponse response) {
if (response.canMatch()) {
listener.onResponse(response);

} else {
moveToNextShard();
tryNext(null, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
public void innerOnResponse(Result result) {
try {
onShardResult(result, shardIt);
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardIt, clusterState, shard.getNodeId());
} finally {
executeNext(pendingExecutions, thread);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardIt, clusterService.state(), shard.currentNodeId());
transportService.sendRequest(
node,
transportShardAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ private void perform(@Nullable final Exception currentFailure) {
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardIt, clusterService.state(), shardRouting.currentNodeId());
internalRequest.request().internalShardId = shardRouting.shardId();
if (logger.isTraceEnabled()) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING;
import static org.opensearch.cluster.routing.OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
Expand Down Expand Up @@ -121,6 +122,40 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster
return next;
}

/**
* This function updates fail open stats in case the only healthy shard copy resides in weighed away az data node.
* The stats are updated in case weighed shard routing and fail open is enabled for search requests.
*
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @param clusterState The current cluster state
* @param nodeID the id of the node containing current shard copy
*/
public void updateFailOpenStatsForOneHealthyCopy(final SearchShardIterator shardsIt, ClusterState clusterState, String nodeID) {
if (ignoreWeightedRouting(clusterState) && isFailOpenDisabled(clusterState)) {
return;
}
if (shardsIt.size() == 1 && WeightedRoutingUtils.isWeighedAway(nodeID, clusterState)) {
getWeightedRoutingStats().updateFailOpenCount();
}
}

/**
* This function updates fail open stats in case the only healthy shard copy resides in weighed away az data node.
* The stats are updated in case weighed shard routing and fail open is enabled for search requests.
*
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @param clusterState The current cluster state
* @param nodeID the id of the node containing current shard copy
*/
public void updateFailOpenStatsForOneHealthyCopy(final ShardsIterator shardsIt, ClusterState clusterState, String nodeID) {
if (ignoreWeightedRouting(clusterState) && isFailOpenDisabled(clusterState)) {
return;
}
if (shardsIt.size() == 1 && WeightedRoutingUtils.isWeighedAway(nodeID, clusterState)) {
getWeightedRoutingStats().updateFailOpenCount();
}
}

/**
* *
* @return true if can fail open ie request shard copies present in nodes with weighted shard
Expand All @@ -144,6 +179,10 @@ private boolean ignoreWeightedRouting(ClusterState clusterState) {
return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings());
}

private boolean isFailOpenDisabled(ClusterState clusterState) {
return WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(clusterState.getMetadata().settings());
}

public WeightedRoutingStats getWeightedRoutingStats() {
return WeightedRoutingStats.getInstance();
}
Expand Down

0 comments on commit ff721ac

Please sign in to comment.