Skip to content

Commit

Permalink
Fix remote shards balancer and remove unused variables (#11167) (#11416)
Browse files Browse the repository at this point in the history
* Fix RemoteShardsBalancer



* remove unused variables



* run spotless



* add change log



---------


(cherry picked from commit edf7861)

Signed-off-by: panguixin <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 2212625 commit 9d648fd
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BUG] Fix the thread context that is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873))
- Handle canMatchSearchAfter for frozen context scenario ([#11249](https://github.com/opensearch-project/OpenSearch/pull/11249))
- Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369))
- Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class LocalShardsBalancer extends ShardsBalancer {

private final float threshold;
private final Metadata metadata;
private final float avgShardsPerNode;

private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
Expand All @@ -85,7 +84,6 @@ public LocalShardsBalancer(
this.threshold = threshold;
this.routingNodes = allocation.routingNodes();
this.metadata = allocation.metadata();
avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size();
avgPrimaryShardsPerNode = (float) (StreamSupport.stream(metadata.spliterator(), false)
.mapToInt(IndexMetadata::getNumberOfShards)
.sum()) / routingNodes.size();
Expand Down Expand Up @@ -663,7 +661,6 @@ MoveDecision decideMove(final ShardRouting shardRouting) {
RoutingNode targetNode = null;
final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null;
int weightRanking = 0;
int targetNodeProcessed = 0;
for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) {
if (currentNode != sourceNode) {
RoutingNode target = currentNode.getRoutingNode();
Expand All @@ -677,7 +674,6 @@ MoveDecision decideMove(final ShardRouting shardRouting) {
continue;
}
}
targetNodeProcessed++;
// don't use canRebalance as we want hard filtering rules to apply. See #17698
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
if (explain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
allocation.metadata(),
allocation.routingTable()
);
ShardRouting initShard = routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes());
routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes());
nodeQueue.offer(node);
allocated = true;
break;
Expand Down Expand Up @@ -444,7 +444,6 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti

// Break out if all nodes in the queue have been checked for this shard
if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
throttled = true;
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation al
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider decider : allocations) {
Decision decision = decider.canAllocateAnyShardToNode(node, allocation);
if (decision.type().canPremptivelyReturn()) {
if (decision.type().canPreemptivelyReturn()) {
if (logger.isTraceEnabled()) {
logger.trace("Shard can not be allocated on node [{}] due to [{}]", node.nodeId(), decider.getClass().getSimpleName());
}
Expand All @@ -279,7 +279,7 @@ public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocat
for (AllocationDecider decider : allocations) {
Decision decision = decider.canMoveAway(shardRouting, allocation);
// short track if a NO is returned.
if (decision.type().canPremptivelyReturn()) {
if (decision.type().canPreemptivelyReturn()) {
if (logger.isTraceEnabled()) {
logger.trace("Shard [{}] can not be moved away due to [{}]", shardRouting, decider.getClass().getSimpleName());
}
Expand All @@ -301,7 +301,7 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) {
for (AllocationDecider decider : allocations) {
Decision decision = decider.canMoveAnyShard(allocation);
// short track if a NO is returned.
if (decision.type().canPremptivelyReturn()) {
if (decision.type().canPreemptivelyReturn()) {
if (allocation.debugDecision() == false) {
return decision;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean higherThan(Type other) {
return false;
}

public boolean canPremptivelyReturn() {
public boolean canPreemptivelyReturn() {
return this == THROTTLE || this == NO;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.allocator.RemoteShardsBalancer;

import java.util.HashMap;
import java.util.Map;

import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.DECIDERS_NO;
import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED;
import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.NO_ATTEMPT;

public class RemoteShardsAllocateUnassignedTests extends RemoteShardsBalancerBaseTestCase {

/**
Expand Down Expand Up @@ -89,6 +94,38 @@ public void testPrimaryAllocation() {
}
}

/**
* Test remote unassigned shard allocation when deciders make NO or THROTTLED decision.
*/
public void testNoRemoteAllocation() {
final int localOnlyNodes = 10;
final int remoteCapableNodes = 5;
final int localIndices = 2;
final int remoteIndices = 1;
final ClusterState oldState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
final boolean throttle = randomBoolean();
final AllocationService service = this.createRejectRemoteAllocationService(throttle);
final ClusterState newState = allocateShardsAndBalance(oldState, service);
final RoutingNodes routingNodes = newState.getRoutingNodes();
final RoutingAllocation allocation = getRoutingAllocation(newState, routingNodes);

assertEquals(totalShards(remoteIndices), routingNodes.unassigned().size());

for (ShardRouting shard : newState.getRoutingTable().allShards()) {
if (RoutingPool.getShardPool(shard, allocation) == RoutingPool.REMOTE_CAPABLE) {
assertTrue(shard.unassigned());
if (shard.primary()) {
final UnassignedInfo.AllocationStatus expect = throttle ? DECIDERS_THROTTLED : DECIDERS_NO;
assertEquals(expect, shard.unassignedInfo().getLastAllocationStatus());
} else {
assertEquals(NO_ATTEMPT, shard.unassignedInfo().getLastAllocationStatus());
}
} else {
assertFalse(shard.unassigned());
}
}
}

/**
* Test remote unassigned shard allocation when remote capable nodes fail to come up.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -201,6 +204,36 @@ public AllocationService createRemoteCapableAllocationService(String excludeNode
);
}

public AllocationService createRejectRemoteAllocationService(boolean throttle) {
Settings settings = Settings.Builder.EMPTY_SETTINGS;
return new OpenSearchAllocationTestCase.MockAllocationService(
createRejectRemoteAllocationDeciders(throttle),
new TestGatewayAllocator(),
createShardAllocator(settings),
EmptyClusterInfoService.INSTANCE,
SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES
);
}

public AllocationDeciders createRejectRemoteAllocationDeciders(boolean throttle) {
Settings settings = Settings.Builder.EMPTY_SETTINGS;
List<AllocationDecider> deciders = new ArrayList<>(
ClusterModule.createAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, Collections.emptyList())
);
deciders.add(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
return throttle ? Decision.THROTTLE : Decision.NO;
} else {
return Decision.ALWAYS;
}
}
});
Collections.shuffle(deciders, random());
return new AllocationDeciders(deciders);
}

public AllocationDeciders createAllocationDeciders() {
Settings settings = Settings.Builder.EMPTY_SETTINGS;
return randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random());
Expand Down

0 comments on commit 9d648fd

Please sign in to comment.