Skip to content

Commit

Permalink
Fix responsibility check for existing shards allocator when timed out
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Aug 13, 2024
1 parent b6c80b1 commit f75154d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -90,12 +91,33 @@ protected void allocateUnassignedBatchOnTimeout(Set<ShardId> shardIds, RoutingAl
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() == primary && shardIds.contains(unassignedShard.shardId())) {
if (isResponsibleFor(unassignedShard, primary) == false) {
continue;
}
allocationDecision = AllocateUnassignedDecision.throttle(null);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected static boolean isResponsibleFor(final ShardRouting shard, boolean primary) {
if (primary) {
return shard.primary() // must be primary
&& shard.unassigned() // must be unassigned
// only handle either an existing store or a snapshot recovery
&& (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
} else {
return shard.primary() == false // must be a replica
&& shard.unassigned() // must be unassigned
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
&& shard.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED;
}
}

protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@
* @opensearch.internal
*/
public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected static boolean isResponsibleFor(final ShardRouting shard) {
return shard.primary() // must be primary
&& shard.unassigned() // must be unassigned
// only handle either an existing store or a snapshot recovery
&& (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
}

/**
* Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is
Expand All @@ -99,7 +89,7 @@ protected static boolean isResponsibleFor(final ShardRouting shard) {
* @return allocation decision taken for this shard
*/
protected AllocateUnassignedDecision getInEligibleShardDecision(ShardRouting unassignedShard, RoutingAllocation allocation) {
if (isResponsibleFor(unassignedShard) == false) {
if (isResponsibleFor(unassignedShard, true) == false) {
// this allocator is not responsible for allocating this shard
return AllocateUnassignedDecision.NOT_TAKEN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,13 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
}
}

/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected static boolean isResponsibleFor(final ShardRouting shard) {
return shard.primary() == false // must be a replica
&& shard.unassigned() // must be unassigned
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
&& shard.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED;
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
final RoutingAllocation allocation,
final Logger logger
) {
if (isResponsibleFor(unassignedShard) == false) {
if (isResponsibleFor(unassignedShard, false) == false) {
// this allocator is not responsible for deciding on this shard
return AllocateUnassignedDecision.NOT_TAKEN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
RoutingAllocation allocation,
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> nodeStoreFileMetaDataMapSupplier
) {
if (!isResponsibleFor(shardRouting)) {
if (!isResponsibleFor(shardRouting, false)) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);
Expand Down

0 comments on commit f75154d

Please sign in to comment.