Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fail open stats in case the only healthy shard copy reside in weighed away az #7600

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541))
- [Search Pipelines] Better exception handling in search pipelines ([#7735](https://github.com/opensearch-project/OpenSearch/pull/7735))
- Fix input validation in segments and delete pit request ([#6645](https://github.com/opensearch-project/OpenSearch/pull/6645))
- Fix fail open stats in case the only healthy shard copy reside in weighed away az([#7600](https://github.com/opensearch-project/OpenSearch/pull/7600))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1429,4 +1429,160 @@ public void testWeightedRoutingFailOpenStats() throws Exception {
WeightedRoutingStats.getInstance().resetFailOpenCount();
}

/**
* Shard routing request is served by data nodes in az with weight set as 0,
* in case shard copies are not available in other azs.(with fail open enabled)
* This is tested by setting up a 3 node cluster with one data node per az.
* Weighted shard routing weight is set as 0 for az-c.
* Indices are created with two replica copy and data nodes in az-a and az-b are stopped,
* Since two copies of a shard are unhealthy, (fail open is triggered) shard search requests are served by standby
* az data node.
* Assertions are put to make sure such shard search requests are served by data node in zone c.
* Asserts are put to make sure fail open count is correct
* @throws IOException throws exception
*/
public void testFailOpenStatsWithOneHealthyStandbyShardCopy() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.put("cluster.routing.ignore_weighted_routing", false)
.build();
WeightedRoutingStats.getInstance().resetFailOpenCount();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 2;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);

logger.info("--> data nodes in zone a and b are stopped");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0)));
ensureStableCluster(2);

Set<String> hitNodes = new HashSet<>();

// Make Search Requests
Future<SearchResponse>[] responses = new Future[20];
for (int i = 0; i < 20; i++) {
responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute();
}
int failedCount = 0;
for (int i = 0; i < 20; i++) {
try {
SearchResponse searchResponse = responses[i].get();
assertEquals(0, searchResponse.getFailedShards());
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
} catch (Exception t) {
failedCount++;
}
}

Assert.assertTrue(failedCount == 0);
assertSearchInAZ("c");

DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();

Map<String, String> nodeIDMap = new HashMap<>();
for (DiscoveryNode node : dataNodes) {
nodeIDMap.put(node.getName(), node.getId());
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().addMetric("weighted_routing").execute().actionGet();
Map<String, NodeStats> stats = nodeStats.getNodesMap();
NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0)));
assertEquals(20 * numShards, nodeStatsC.getWeightedRoutingStats().getFailOpenCount());
WeightedRoutingStats.getInstance().resetFailOpenCount();
}

/**
* Shard routing request is served by data nodes in az with weight set as 0,
* in case shard copies are not available in other azs.(with fail open enabled)
* This is tested by setting up a 3 node cluster with one data node per az.
* Weighted shard routing weight is set as 0 for az-c.
* Indices are created with two replica copy and data nodes in az-a and az-b are stopped,
* Since two copies of a shard are unhealthy, (fail open is triggered) shard search requests are served by standby
* az data node.
* Assertions are put to make sure such shard search requests are served by data node in zone c.
* Asserts are put to make sure fail open count is correct
* @throws IOException throws exception
*/
public void testFailOpenStatsForMultiGetOneHealthyStandbyShardCopy() throws Exception {

WeightedRoutingStats.getInstance().resetFailOpenCount();

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.put("cluster.routing.ignore_weighted_routing", false)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 1;
int numReplicas = 2;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);

logger.info("--> data nodes in zone a and b are stopped");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0)));
ensureStableCluster(2);

Future<MultiGetResponse>[] responses = new Future[20];
logger.info("--> making search requests");
int index1, index2;
int docId = 0;
for (int i = 0; i < 20; i++) {

index1 = docId++;
index2 = docId++;
responses[i] = internalCluster().smartClient()
.prepareMultiGet()
.add(new MultiGetRequest.Item("test", "" + index1))
.add(new MultiGetRequest.Item("test", "" + index2))
.execute();
}

int failedCount = 0;
for (int i = 0; i < 20; i++) {
try {
MultiGetResponse multiGetResponse = responses[i].get();
assertThat(multiGetResponse.getResponses().length, equalTo(2));
if (multiGetResponse.getResponses()[0].isFailed() || multiGetResponse.getResponses()[1].isFailed()) {
failedCount++;
}
} catch (Exception t) {
fail("search should not fail");
}
}

Assert.assertTrue(failedCount == 0);

DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();

Map<String, String> nodeIDMap = new HashMap<>();
for (DiscoveryNode node : dataNodes) {
nodeIDMap.put(node.getName(), node.getId());
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().addMetric("weighted_routing").execute().actionGet();
Map<String, NodeStats> stats = nodeStats.getNodesMap();
NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0)));
assertEquals(20 * 2, nodeStatsC.getWeightedRoutingStats().getFailOpenCount());
WeightedRoutingStats.getInstance().resetFailOpenCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}

ShardRouting next = FailAwareWeightedRouting.getInstance()
.findNext(shardsIt.get(shardIndex), clusterService.state(), failure, this::moveToNextShard);

Expand Down Expand Up @@ -305,6 +306,10 @@ private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShar
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardsIt.get(shardIndex), clusterService.state(), shardRouting.currentNodeId());
request.shardId(shardRouting.shardId());
if (logger.isTraceEnabled()) {
logger.trace("sending request [{}] on node [{}]", request, node);
Expand All @@ -329,6 +334,7 @@ public String executor() {
public void handleResponse(final FieldCapabilitiesIndexResponse response) {
if (response.canMatch()) {
listener.onResponse(response);

} else {
moveToNextShard();
tryNext(null, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
public void innerOnResponse(Result result) {
try {
onShardResult(result, shardIt);
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardIt, clusterState, shard.getNodeId());
} finally {
executeNext(pendingExecutions, thread);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardIt, clusterService.state(), shard.currentNodeId());
transportService.sendRequest(
node,
transportShardAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ private void perform(@Nullable final Exception currentFailure) {
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
// Update weighted routing fail open stats in case the only healthy shard copy is present
// in weighed away az data nodes
FailAwareWeightedRouting.getInstance()
.updateFailOpenStatsForOneHealthyCopy(shardIt, clusterService.state(), shardRouting.currentNodeId());
internalRequest.request().internalShardId = shardRouting.shardId();
if (logger.isTraceEnabled()) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING;
import static org.opensearch.cluster.routing.OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
Expand Down Expand Up @@ -121,6 +122,49 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster
return next;
}

/**
* This function updates fail open stats in case the only healthy shard copy resides in weighed away az data node.
* The stats are updated in case weighed shard routing and fail open is enabled for search requests.
*
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @param clusterState The current cluster state
* @param nodeID the id of the node containing current shard copy
*/
public boolean updateFailOpenStatsForOneHealthyCopy(final SearchShardIterator shardsIt, ClusterState clusterState, String nodeID) {
if (clusterState != null) {
if (ignoreWeightedRouting(clusterState) == true || isFailOpenEnabled(clusterState) == false) {
return false;
}
if (shardsIt.size() == 1 && WeightedRoutingUtils.isWeighedAway(nodeID, clusterState)) {
getWeightedRoutingStats().updateFailOpenCount();
return true;
}
}
return false;

}

/**
* This function updates fail open stats in case the only healthy shard copy resides in weighed away az data node.
* The stats are updated in case weighed shard routing and fail open is enabled for search requests.
*
* @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested
* @param clusterState The current cluster state
* @param nodeID the id of the node containing current shard copy
*/
public boolean updateFailOpenStatsForOneHealthyCopy(final ShardsIterator shardsIt, ClusterState clusterState, String nodeID) {
if (clusterState != null) {
if (ignoreWeightedRouting(clusterState) == true || isFailOpenEnabled(clusterState) == false) {
return false;
}
if (shardsIt.size() == 1 && WeightedRoutingUtils.isWeighedAway(nodeID, clusterState)) {
getWeightedRoutingStats().updateFailOpenCount();
return true;
}
}
return false;
}

/**
* *
* @return true if can fail open ie request shard copies present in nodes with weighted shard
Expand All @@ -144,6 +188,10 @@ private boolean ignoreWeightedRouting(ClusterState clusterState) {
return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings());
}

private boolean isFailOpenEnabled(ClusterState clusterState) {
return WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(clusterState.getMetadata().settings());
}

public WeightedRoutingStats getWeightedRoutingStats() {
return WeightedRoutingStats.getInstance();
}
Expand Down
Loading