Skip to content

Commit

Permalink
Add cluster primary balance contraint for rebalancing with buffer (#1…
Browse files Browse the repository at this point in the history
…2656) (#13014)

(cherry picked from commit 3491bcb)

Signed-off-by: Arpit-Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya authored Apr 2, 2024
1 parent f69da6c commit 831f233
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- Add a counter to node stat (and _cat/shards) api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() {
);
}

public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance)
.put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance)
.put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer)
)
);
}

/**
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
* balance per index and across all indices is maintained.
Expand Down Expand Up @@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance();
verifyPrimaryBalance(0.0f);
}

/**
Expand Down Expand Up @@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception {
verifyPerIndexPrimaryBalance();
}

/**
* Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting
* removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes
* when the PREFER_PRIMARY_SHARD_REBALANCE is set to true
*/
public void testAllocationAndRebalanceWithDisruption() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 2;
final int maxShardCount = 2;
// Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in
// and preventing primary relocations
final int nodeCount = randomIntBetween(5, 10);
final int numberOfIndices = randomIntBetween(1, 10);
final float buffer = randomIntBetween(1, 4) * 0.10f;

logger.info("--> Creating {} nodes", nodeCount);
final List<String> nodeNames = new ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodeNames.add(internalCluster().startNode());
}
setAllocationRelocationStrategy(true, true, buffer);

int shardCount, replicaCount;
ClusterState state;
for (int i = 0; i < numberOfIndices; i++) {
shardCount = randomIntBetween(1, maxShardCount);
replicaCount = randomIntBetween(1, maxReplicaCount);
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount);
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
ensureGreen(TimeValue.timeValueSeconds(60));
if (logger.isTraceEnabled()) {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
}
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

final int additionalNodeCount = randomIntBetween(1, 5);
logger.info("--> Adding {} nodes", additionalNodeCount);

internalCluster().startNodes(additionalNodeCount);
ensureGreen(TimeValue.timeValueSeconds(60));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

int nodeCountToStop = additionalNodeCount;
while (nodeCountToStop > 0) {
internalCluster().stopRandomDataNode();
// give replica a chance to promote as primary before terminating node containing the replica
ensureGreen(TimeValue.timeValueSeconds(60));
nodeCountToStop--;
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info("--> Cluster state post nodes stop {}", state);
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);
}

/**
* Utility method which ensures cluster has balanced primary shard distribution across a single index.
* @throws Exception exception
Expand Down Expand Up @@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
}, 60, TimeUnit.SECONDS);
}

private void verifyPrimaryBalance() throws Exception {
private void verifyPrimaryBalance(float buffer) throws Exception {
assertBusy(() -> {
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
RoutingNodes nodes = currentState.getRoutingNodes();
Expand All @@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception {
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.size();
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer)));
}
}, 60, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class AllocationConstraints {

public AllocationConstraints() {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached()));
this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f)));
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class ConstraintTypes {
*/
public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint";

/**
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices
*/
public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.rebalance.constraint";

/**
* Defines an index constraint which is breached when a node contains more than avg number of shards for an index
*/
Expand Down Expand Up @@ -70,14 +75,14 @@ public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerN
}

/**
* Defines a predicate which returns true when a node contains more than average number of primary shards. This
* constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
* is assigned to node resulting in lesser chances of node being selected as allocation target
* Defines a predicate which returns true when a node contains more than average number of primary shards with added buffer. This
* constraint is used in weight calculation during allocation/rebalance both. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
* is assigned to node resulting in lesser chances of node being selected as allocation/rebalance target
*/
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached(float buffer) {
return (params) -> {
int primaryShardCount = params.getNode().numPrimaryShards();
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode());
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer));
return primaryShardCount >= allowedPrimaryShardCount;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached;

/**
* Constraints applied during rebalancing round; specify conditions which, if breached, reduce the
Expand All @@ -27,9 +29,13 @@ public class RebalanceConstraints {

private Map<String, Constraint> constraints;

public RebalanceConstraints() {
public RebalanceConstraints(RebalanceParameter rebalanceParameter) {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(
CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer()))
);
}

public void updateRebalanceConstraint(String constraint, boolean enable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

/**
* RebalanceConstraint Params
*/
public class RebalanceParameter {
private float preferPrimaryBalanceBuffer;

public RebalanceParameter(float preferPrimaryBalanceBuffer) {
this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer;
}

public float getPreferPrimaryBalanceBuffer() {
return preferPrimaryBalanceBuffer;
}
}
Loading

0 comments on commit 831f233

Please sign in to comment.