Skip to content

Commit

Permalink
Make balanced shards allocator timebound
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Aug 14, 2024
1 parent 625dd5a commit 0e9151c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -87,6 +88,7 @@
public class BalancedShardsAllocator implements ShardsAllocator {

private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class);
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);

public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.index",
Expand Down Expand Up @@ -169,6 +171,23 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<TimeValue> ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.routing.allocation.balanced_shards_allocator.allocator_timeout",
TimeValue.timeValueSeconds(20),
TimeValue.MINUS_ONE,
timeValue -> {
if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout"
+ "] should be more than 20s or -1ms to disable timeout"
);
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

Expand All @@ -181,6 +200,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float threshold;

private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private long startTime;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand All @@ -197,6 +218,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
Expand All @@ -206,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}

/**
Expand Down Expand Up @@ -284,6 +307,20 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setAllocatorTimeout(TimeValue allocatorTimeout) {
this.allocatorTimeout = allocatorTimeout;
}

private boolean allocatorTimedOut(long currentTime) {
if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) {
if (logger.isTraceEnabled()) {
logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks");
}
return false;
}
return currentTime - this.startTime > allocatorTimeout.nanos();
}

@Override
public void allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
Expand All @@ -298,8 +335,10 @@ public void allocate(RoutingAllocation allocation) {
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance,
ignoreThrottleInRestore
ignoreThrottleInRestore,
this::allocatorTimedOut
);
this.startTime = System.nanoTime();
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
Expand All @@ -321,7 +360,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance,
ignoreThrottleInRestore
ignoreThrottleInRestore,
this::allocatorTimedOut
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -585,7 +625,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -71,8 +72,10 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
private final Function<Long, Boolean> timedOutFunc;
private int totalShardCount = 0;


public LocalShardsBalancer(
Logger logger,
RoutingAllocation allocation,
Expand All @@ -81,7 +84,8 @@ public LocalShardsBalancer(
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance,
boolean ignoreThrottleInRestore
boolean ignoreThrottleInRestore,
Function<Long, Boolean> timedOutFunc
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -99,6 +103,7 @@ public LocalShardsBalancer(
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.ignoreThrottleInRestore = ignoreThrottleInRestore;
this.timedOutFunc = timedOutFunc;
}

/**
Expand Down Expand Up @@ -572,6 +577,15 @@ void moveShards() {
return;
}

// Terminate if the time allocated to the balanced shards allocator has elapsed
if (timedOutFunc.apply(System.nanoTime())) {
logger.info(
"Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed"
+ ". Skipping shard iteration"
);
return;
}

ShardRouting shardRouting = it.next();

if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
Expand Down Expand Up @@ -799,8 +813,21 @@ void allocateUnassigned() {
int secondaryLength = 0;
int primaryLength = primary.length;
ArrayUtil.timSort(primary, comparator);
if (logger.isTraceEnabled()) {
logger.trace("Staring allocation of [{}] unassigned shards", primaryLength);
}
do {
for (int i = 0; i < primaryLength; i++) {
if (timedOutFunc.apply(System.nanoTime())) {
// TODO - maybe check if we can allow wait for active shards thingy bypass this condition
logger.info("Ignoring [{}] unassigned shards for allocation as time allocated to " +
"balanced shards allocator has elapsed", (primaryLength - i));
while (i < primaryLength - 1) {
unassigned.ignoreShard(primary[i], UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
i++;
}
return;
}
ShardRouting shard = primary[i];
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard);
final String assignedNodeId = allocationDecision.getTargetNode() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down

0 comments on commit 0e9151c

Please sign in to comment.