Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [BUG] Fix remote shards balancer when filtering throttled nodes #12024

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix typo in API annotation check message ([11836](https://github.com/opensearch-project/OpenSearch/pull/11836))
- Fix memory leak issue in ReorganizingLongHash ([#11953](https://github.com/opensearch-project/OpenSearch/issues/11953))
- Prevent setting remote_snapshot store type on index creation ([#11867](https://github.com/opensearch-project/OpenSearch/pull/11867))
- [BUG] Fix remote shards balancer when filtering throttled nodes ([#11724](https://github.com/opensearch-project/OpenSearch/pull/11724))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public final class RemoteShardsBalancer extends ShardsBalancer {
private final Logger logger;
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
// indicates if there are any nodes being throttled for allocating any unassigned shards
private boolean anyNodesThrottled = false;

public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
this.logger = logger;
Expand Down Expand Up @@ -358,12 +360,16 @@ private void allocateUnassignedReplicas(Queue<RoutingNode> nodeQueue, Map<String
}

private void ignoreRemainingShards(Map<String, UnassignedIndexShards> unassignedShardMap) {
// If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED
final UnassignedInfo.AllocationStatus status = anyNodesThrottled
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
for (UnassignedIndexShards indexShards : unassignedShardMap.values()) {
for (ShardRouting shard : indexShards.getPrimaries()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
}
for (ShardRouting shard : indexShards.getReplicas()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
}
}
}
Expand Down Expand Up @@ -424,11 +430,11 @@ private void allocateUnassignedShards(
private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouting shard) {
boolean allocated = false;
boolean throttled = false;
Set<String> nodesCheckedForShard = new HashSet<>();
int numNodesToCheck = nodeQueue.size();
while (nodeQueue.isEmpty() == false) {
RoutingNode node = nodeQueue.poll();
--numNodesToCheck;
Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation);
nodesCheckedForShard.add(node.nodeId());
if (allocateDecision.type() == Decision.Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId());
Expand Down Expand Up @@ -467,6 +473,10 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
}
nodeQueue.offer(node);
} else {
if (nodeLevelDecision.type() == Decision.Type.THROTTLE) {
anyNodesThrottled = true;
}

if (logger.isTraceEnabled()) {
logger.trace(
"Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]",
Expand All @@ -478,14 +488,14 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
}

// Break out if all nodes in the queue have been checked for this shard
if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
if (numNodesToCheck == 0) {
break;
}
}
}

if (allocated == false) {
UnassignedInfo.AllocationStatus status = throttled
UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled)
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return Decision.ALWAYS;
}
}

@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
return throttle ? Decision.THROTTLE : Decision.YES;
}
});
Collections.shuffle(deciders, random());
return new AllocationDeciders(deciders);
Expand Down
Loading