Skip to content

Commit

Permalink
Address review comments & integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Aug 9, 2022
1 parent ac47ffd commit de07dac
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -63,7 +62,7 @@ public abstract class BaseGatewayShardAllocator {

/**
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger, Settings)}
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
* to make decisions on assigning shards to nodes.
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
Expand All @@ -72,10 +71,9 @@ public abstract class BaseGatewayShardAllocator {
public void allocateUnassigned(
ShardRouting shardRouting,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler,
Settings settings
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger, settings);
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);

if (allocateUnassignedDecision.isDecisionTaken() == false) {
// no decision was taken by this allocator
Expand Down Expand Up @@ -108,7 +106,7 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation

/**
* Make a decision on the allocation of an unassigned shard. This method is used by
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler, Settings)} to make decisions
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
*
* @param unassignedShard the unassigned shard to allocate
Expand All @@ -119,8 +117,7 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation
public abstract AllocateUnassignedDecision makeAllocationDecision(
ShardRouting unassignedShard,
RoutingAllocation allocation,
Logger logger,
Settings settings
Logger logger
);

/**
Expand Down
28 changes: 7 additions & 21 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -80,8 +79,6 @@ public class GatewayAllocator implements ExistingShardsAllocator {
private final PrimaryShardAllocator primaryShardAllocator;
private final ReplicaShardAllocator replicaShardAllocator;

protected final Settings settings;

private final ConcurrentMap<
ShardId,
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections
Expand All @@ -94,13 +91,11 @@ public class GatewayAllocator implements ExistingShardsAllocator {
public GatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetadata storeAction,
Settings settings
TransportNodesListShardStoreMetadata storeAction
) {
this.rerouteService = rerouteService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
this.settings = settings;
}

@Override
Expand All @@ -116,7 +111,6 @@ protected GatewayAllocator() {
this.rerouteService = null;
this.primaryShardAllocator = null;
this.replicaShardAllocator = null;
this.settings = null;
}

@Override
Expand Down Expand Up @@ -171,14 +165,7 @@ public void allocateUnassigned(
) {
assert primaryShardAllocator != null;
assert replicaShardAllocator != null;
innerAllocatedUnassigned(
allocation,
primaryShardAllocator,
replicaShardAllocator,
shardRouting,
unassignedAllocationHandler,
this.settings
);
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
}

// allow for testing infra to change shard allocators implementation
Expand All @@ -187,14 +174,13 @@ protected static void innerAllocatedUnassigned(
PrimaryShardAllocator primaryShardAllocator,
ReplicaShardAllocator replicaShardAllocator,
ShardRouting shardRouting,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler,
Settings settings
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
assert shardRouting.unassigned();
if (shardRouting.primary()) {
primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings);
primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);
} else {
replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings);
replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);
}
}

Expand All @@ -204,10 +190,10 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting
assert routingAllocation.debugDecision();
if (unassignedShard.primary()) {
assert primaryShardAllocator != null;
return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings);
return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} else {
assert replicaShardAllocator != null;
return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings);
return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.Decision.Type;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.opensearch.index.IndexSettings;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -95,8 +93,7 @@ private static boolean isResponsibleFor(final ShardRouting shard) {
public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
final RoutingAllocation allocation,
final Logger logger,
final Settings settings
final Logger logger
) {
if (isResponsibleFor(unassignedShard) == false) {
// this allocator is not responsible for allocating this shard
Expand Down Expand Up @@ -127,7 +124,6 @@ public AllocateUnassignedDecision makeAllocationDecision(
// don't create a new IndexSetting object for every shard as this could cause a lot of garbage
// on cluster restart if we allocate a boat load of shards
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());
final IndexSettings indexSettings = settings != null ? new IndexSettings(indexMetadata, settings) : null;
final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id());
final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;

Expand All @@ -139,7 +135,6 @@ public AllocateUnassignedDecision makeAllocationDecision(
allocation.getIgnoreNodes(unassignedShard.shardId()),
inSyncAllocationIds,
shardState,
indexSettings,
logger
);
final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
Expand Down Expand Up @@ -318,7 +313,7 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS
NodeGatewayStartedShards::primary
).reversed();

private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.nullsLast(
private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.nullsLast(
Comparator.comparing(NodeGatewayStartedShards::replicationCheckpoint)
);

Expand All @@ -333,7 +328,6 @@ protected static NodeShardsResult buildNodeShardsResult(
Set<String> ignoreNodes,
Set<String> inSyncAllocationIds,
FetchResult<NodeGatewayStartedShards> shardState,
IndexSettings indexSettings,
Logger logger
) {
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
Expand Down Expand Up @@ -391,21 +385,24 @@ protected static NodeShardsResult buildNodeShardsResult(
}
}

Comparator<NodeGatewayStartedShards> comparator; // allocation preference
/**
* Orders the active shards copies based on below comparators
* 1. No store exception
* 2. Shard copies previously primary shard
* 3. Shard copies with highest replication checkpoint. This comparator is NO-OP for doc rep enabled indices.
*/
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
).reversed();
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
.thenComparing(PRIMARY_FIRST_COMPARATOR);
.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
} else {
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
}

// If index has segrep enabled, then use replication checkpoint info to order the replicas
if (indexSettings != null && indexSettings.isSegRepEnabled()) {
comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR);
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
}

nodeShardStates.sort(comparator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -181,8 +180,7 @@ private static boolean isResponsibleFor(final ShardRouting shard) {
public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
final RoutingAllocation allocation,
final Logger logger,
final Settings settings
final Logger logger
) {
if (isResponsibleFor(unassignedShard) == false) {
// this allocator is not responsible for deciding on this shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ public String toString() {
if (storeException != null) {
buf.append(",storeException=").append(storeException);
}
buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString());
if (replicationCheckpoint != null) {
buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString());
}
buf.append("]");
return buf.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotId;
Expand Down Expand Up @@ -98,7 +97,7 @@ public void buildTestAllocator() {
private void allocateAllUnassigned(final RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, testAllocator.settings);
testAllocator.allocateUnassigned(iterator.next(), allocation, iterator);
}
}

Expand Down Expand Up @@ -208,7 +207,7 @@ public void testShardLockObtainFailedException() {
}

/**
* Tests that replica with highest primary ter version will be selected as target
* Tests that replica with the highest primary term version will be selected as target
*/
public void testPreferReplicaWithHighestPrimaryTerm() {
String allocId1 = randomAlphaOfLength(10);
Expand All @@ -221,7 +220,6 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
allocId2,
allocId3
);
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
Expand Down Expand Up @@ -255,7 +253,6 @@ public void testPreferReplicaWithNullReplicationCheckpoint() {
allocId2,
allocId3
);
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2));
Expand Down Expand Up @@ -289,7 +286,6 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
allocId2,
allocId3
);
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
Expand Down Expand Up @@ -322,7 +318,6 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocId1,
allocId3
);
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
Expand Down Expand Up @@ -356,7 +351,6 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocId2,
allocId3
);
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
Expand Down Expand Up @@ -767,8 +761,6 @@ class TestAllocator extends PrimaryShardAllocator {

private Map<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> data;

private Settings settings = Settings.EMPTY;

public TestAllocator clear() {
data = null;
return this;
Expand Down Expand Up @@ -821,9 +813,5 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
) {
return new AsyncShardFetch.FetchResult<>(shardId, data, Collections.<String>emptySet());
}

public void enableSegmentReplication() {
this.settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ public class ReplicaShardAllocatorTests extends OpenSearchAllocationTestCase {

private TestAllocator testAllocator;

private final Settings settings = Settings.EMPTY;

@Before
public void buildTestAllocator() {
this.testAllocator = new TestAllocator();
Expand All @@ -106,7 +104,7 @@ public void buildTestAllocator() {
private void allocateAllUnassigned(final RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, settings);
testAllocator.allocateUnassigned(iterator.next(), allocation, iterator);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@
public class TestGatewayAllocator extends GatewayAllocator {

Map<String /* node id */, Map<ShardId, ShardRouting>> knownAllocations = new HashMap<>();

Map<String, ReplicationCheckpoint> shardIdNodeToReplicationCheckPointMap = new HashMap<>();
DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES;
Map<String, ReplicationCheckpoint> shardIdNodeToReplicationCheckPointMap = new HashMap<>();

PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator() {
@Override
protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
Expand Down Expand Up @@ -103,10 +103,7 @@ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardR
};

private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String nodeName) {
return shardIdNodeToReplicationCheckPointMap.getOrDefault(
getReplicationCheckPointKey(shardId, nodeName),
ReplicationCheckpoint.empty(shardId)
);
return shardIdNodeToReplicationCheckPointMap.getOrDefault(getReplicationCheckPointKey(shardId, nodeName), null);
}

ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator() {
Expand Down Expand Up @@ -157,14 +154,7 @@ public void allocateUnassigned(
UnassignedAllocationHandler unassignedAllocationHandler
) {
currentNodes = allocation.nodes();
innerAllocatedUnassigned(
allocation,
primaryShardAllocator,
replicaShardAllocator,
shardRouting,
unassignedAllocationHandler,
this.settings
);
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
}

/**
Expand Down

0 comments on commit de07dac

Please sign in to comment.