From a083c2d04d5858aebf02439d04f3cd864d899cd6 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 28 Jul 2022 16:58:33 -0700 Subject: [PATCH 01/11] [Segment Replication] Update PrimaryShardAllocator to prefer replicas having higher replication checkpoint Signed-off-by: Suraj Singh --- .../gateway/PrimaryShardAllocator.java | 33 +++-- ...ransportNodesListGatewayStartedShards.java | 51 ++++++- .../checkpoint/ReplicationCheckpoint.java | 7 +- .../gateway/PrimaryShardAllocatorTests.java | 128 +++++++++++++++++- .../test/gateway/TestGatewayAllocator.java | 14 +- 5 files changed, 213 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index b9cfebaa98521..1419106decf59 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.Decision.Type; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; @@ -313,6 +314,10 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS NodeGatewayStartedShards::primary ).reversed(); + private static final Comparator HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.comparing( + NodeGatewayStartedShards::replicationCheckpoint + ); + /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but @@ -382,15 +387,27 @@ protected static NodeShardsResult buildNodeShardsResult( } final Comparator comparator; // allocation preference - if (matchAnyShard) { - // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) - ).reversed(); - comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR); + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) + ).reversed(); + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + if (matchAnyShard) { + // prefer shards with matching allocation ids + comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) + .thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); + } else { + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); + } } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); + if (matchAnyShard) { + comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) + .thenComparing(PRIMARY_FIRST_COMPARATOR); + } else { + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); + } + } nodeShardStates.sort(comparator); diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 78b4fa287ef59..16aeb62e92a92 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -55,12 +55,14 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.shard.ShardStateMetadata; import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -149,6 +151,13 @@ protected NodesGatewayStartedShards newResponse( protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { final ShardId shardId = request.getShardId(); + IndexService indexService = indicesService.indexService(request.getShardId().getIndex()); + final ReplicationCheckpoint replicationCheckpoint; + if (indexService != null) { + replicationCheckpoint = indexService.getShard(shardId.getId()).getLatestReplicationCheckpoint(); + } else { + replicationCheckpoint = null; + } logger.trace("{} loading local shard state info", shardId); ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( logger, @@ -191,10 +200,12 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { exception ); String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + return new NodeGatewayStartedShards( clusterService.localNode(), allocationId, shardStateMetadata.primary, + replicationCheckpoint, exception ); } @@ -202,10 +213,15 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary); + return new NodeGatewayStartedShards( + clusterService.localNode(), + allocationId, + shardStateMetadata.primary, + replicationCheckpoint + ); } logger.trace("{} no local shard info found", shardId); - return new NodeGatewayStartedShards(clusterService.localNode(), null, false); + return new NodeGatewayStartedShards(clusterService.localNode(), null, false, replicationCheckpoint); } catch (Exception e) { throw new OpenSearchException("failed to load started shards", e); } @@ -352,12 +368,15 @@ public static class NodeGatewayStartedShards extends BaseNodeResponse { private final String allocationId; private final boolean primary; + + private final ReplicationCheckpoint replicationCheckpoint; private final Exception storeException; public NodeGatewayStartedShards(StreamInput in) throws IOException { super(in); allocationId = in.readOptionalString(); primary = in.readBoolean(); + replicationCheckpoint = new ReplicationCheckpoint(in); if (in.readBoolean()) { storeException = in.readException(); } else { @@ -365,14 +384,26 @@ public NodeGatewayStartedShards(StreamInput in) throws IOException { } } - public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary) { - this(node, allocationId, primary, null); + public NodeGatewayStartedShards( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint + ) { + this(node, allocationId, primary, replicationCheckpoint, null); } - public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary, Exception storeException) { + public NodeGatewayStartedShards( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException + ) { super(node); this.allocationId = allocationId; this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; this.storeException = storeException; } @@ -384,6 +415,10 @@ public boolean primary() { return this.primary; } + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + public Exception storeException() { return this.storeException; } @@ -393,6 +428,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalString(allocationId); out.writeBoolean(primary); + replicationCheckpoint.writeTo(out); if (storeException != null) { out.writeBoolean(true); out.writeException(storeException); @@ -414,7 +450,8 @@ public boolean equals(Object o) { return primary == that.primary && Objects.equals(allocationId, that.allocationId) - && Objects.equals(storeException, that.storeException); + && Objects.equals(storeException, that.storeException) + && Objects.equals(replicationCheckpoint, that.replicationCheckpoint); } @Override @@ -422,6 +459,7 @@ public int hashCode() { int result = (allocationId != null ? allocationId.hashCode() : 0); result = 31 * result + (primary ? 1 : 0); result = 31 * result + (storeException != null ? storeException.hashCode() : 0); + result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0); return result; } @@ -429,6 +467,7 @@ public int hashCode() { public String toString() { StringBuilder buf = new StringBuilder(); buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary); + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); if (storeException != null) { buf.append(",storeException=").append(storeException); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 8afb5bd055636..6a4e5e449f178 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -23,7 +23,7 @@ * * @opensearch.internal */ -public class ReplicationCheckpoint implements Writeable { +public class ReplicationCheckpoint implements Writeable, Comparable { private final ShardId shardId; private final long primaryTerm; @@ -107,6 +107,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(segmentInfosVersion); } + @Override + public int compareTo(ReplicationCheckpoint other) { + return this.isAheadOf(other) ? -1 : 1; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index 4a1ecb9661687..5ab7812430a24 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -62,6 +62,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -205,6 +206,102 @@ public void testShardLockObtainFailedException() { assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } + /** + * Tests that replica with highest segment info version will be selected as target + */ + public void testPreferReplicaWithHighestSegmentInfoVersion() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2 + ); + testAllocator.addData(node1, allocId1, randomBoolean(), new ReplicationCheckpoint(shardId, 1, 10, 101, 1)); + testAllocator.addData(node2, allocId2, randomBoolean(), new ReplicationCheckpoint(shardId, 1, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node2.getId()) + ); + // Assert node2's allocation id is used + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId2) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that replica with equal segment info version but higher primary term will be selected as target + */ + public void testPreferReplicaWithHigherPrimaryTermOnSameSegmentInfoVersion() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 1000, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node2.getId()) + ); + // Assert node2's allocation id is used with highest replication checkpoint + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId2) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that prefer allocation of older primary even though having lower replication checkpoint + */ + public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 1000, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node1.getId()) + ); + // Assert node2's allocation id is used with highest replication checkpoint + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId1) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + /** * Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will * select the second node as target @@ -219,7 +316,7 @@ public void testShardLockObtainFailedExceptionPreferOtherValidCopies() { allocId2 ); testAllocator.addData(node1, allocId1, randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); - testAllocator.addData(node2, allocId2, randomBoolean(), null); + testAllocator.addData(node2, allocId2, randomBoolean()); allocateAllUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -601,17 +698,42 @@ public TestAllocator clear() { return this; } + public TestAllocator addData( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint + ) { + return addData(node, allocationId, primary, replicationCheckpoint, null); + } + public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) { - return addData(node, allocationId, primary, null); + return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), null); } public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) { + return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), storeException); + } + + public TestAllocator addData( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + @Nullable Exception storeException + ) { if (data == null) { data = new HashMap<>(); } data.put( node, - new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, allocationId, primary, storeException) + new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards( + node, + allocationId, + primary, + replicationCheckpoint, + storeException + ) ); return this; } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index 54c92f4d519aa..f7e1d9bb09c72 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -42,7 +42,10 @@ import org.opensearch.gateway.PrimaryShardAllocator; import org.opensearch.gateway.ReplicaShardAllocator; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; +import org.opensearch.index.IndexService; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; import java.util.Collections; @@ -52,6 +55,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.test.OpenSearchIntegTestCase.internalCluster; + /** * A gateway allocator implementation that keeps an in memory list of started shard allocation * that are used as replies to the, normally async, fetch data requests. The in memory list @@ -71,7 +76,6 @@ public class TestGatewayAllocator extends GatewayAllocator { Map> knownAllocations = new HashMap<>(); DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; - PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator() { @Override protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { @@ -90,7 +94,8 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR routing -> new NodeGatewayStartedShards( currentNodes.get(routing.currentNodeId()), routing.allocationId().getId(), - routing.primary() + routing.primary(), + getReplicationCheckpoint(shardId, currentNodes.get(routing.currentNodeId()).getName()) ) ) ); @@ -99,6 +104,11 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR } }; + private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String nodeName) { + final IndexService indexService = internalCluster().getInstance(IndicesService.class, nodeName).indexService(shardId.getIndex()); + return indexService != null ? indexService.getShard(shardId.getId()).getLatestReplicationCheckpoint() : null; + } + ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator() { @Override protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { From dbc8295ee018ff44e5c075d502fb2b4f5f9e6c3f Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 1 Aug 2022 17:51:50 -0700 Subject: [PATCH 02/11] Use empty replication checkpoint to avoid NPE Signed-off-by: Suraj Singh --- ...ransportNodesListGatewayStartedShards.java | 2 +- .../test/gateway/TestGatewayAllocator.java | 22 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 16aeb62e92a92..38bb9786024c7 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -156,7 +156,7 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { if (indexService != null) { replicationCheckpoint = indexService.getShard(shardId.getId()).getLatestReplicationCheckpoint(); } else { - replicationCheckpoint = null; + replicationCheckpoint = ReplicationCheckpoint.empty(shardId); } logger.trace("{} loading local shard state info", shardId); ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index f7e1d9bb09c72..8499f3d77ca9b 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -42,9 +42,7 @@ import org.opensearch.gateway.PrimaryShardAllocator; import org.opensearch.gateway.ReplicaShardAllocator; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; -import org.opensearch.index.IndexService; import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; @@ -55,8 +53,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.opensearch.test.OpenSearchIntegTestCase.internalCluster; - /** * A gateway allocator implementation that keeps an in memory list of started shard allocation * that are used as replies to the, normally async, fetch data requests. The in memory list @@ -75,6 +71,8 @@ public class TestGatewayAllocator extends GatewayAllocator { Map> knownAllocations = new HashMap<>(); + + Map shardIdNodeToReplicationCheckPointMap = new HashMap<>(); DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator() { @Override @@ -95,7 +93,7 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR currentNodes.get(routing.currentNodeId()), routing.allocationId().getId(), routing.primary(), - getReplicationCheckpoint(shardId, currentNodes.get(routing.currentNodeId()).getName()) + getReplicationCheckpoint(shardId, routing.currentNodeId()) ) ) ); @@ -105,8 +103,10 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR }; private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String nodeName) { - final IndexService indexService = internalCluster().getInstance(IndicesService.class, nodeName).indexService(shardId.getIndex()); - return indexService != null ? indexService.getShard(shardId.getId()).getLatestReplicationCheckpoint() : null; + return shardIdNodeToReplicationCheckPointMap.getOrDefault( + getReplicationCheckPointKey(shardId, nodeName), + ReplicationCheckpoint.empty(shardId) + ); } ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator() { @@ -166,4 +166,12 @@ public void allocateUnassigned( public void addKnownAllocation(ShardRouting shard) { knownAllocations.computeIfAbsent(shard.currentNodeId(), id -> new HashMap<>()).put(shard.shardId(), shard); } + + public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { + return shardId.toString() + "_" + nodeName; + } + + public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) { + shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint); + } } From 2d6096e3ef80376ec0e4cfcd3552108efe073225 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 1 Aug 2022 18:53:25 -0700 Subject: [PATCH 03/11] Update NodeGatewayStartedShards to optionally wire in/out ReplicationCheckpoint field Signed-off-by: Suraj Singh --- .../gateway/PrimaryShardAllocator.java | 36 ++++++++----------- ...ransportNodesListGatewayStartedShards.java | 32 ++++++++++++----- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 1419106decf59..744c5aad583f5 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -48,7 +48,6 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.Decision.Type; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; @@ -386,30 +385,23 @@ protected static NodeShardsResult buildNodeShardsResult( } } - final Comparator comparator; // allocation preference - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) - ).reversed(); - if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { - if (matchAnyShard) { - // prefer shards with matching allocation ids - comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR) - .thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); - } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR) - .thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); - } + Comparator comparator; // allocation preference + if (matchAnyShard) { + // prefer shards with matching allocation ids + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) + ).reversed(); + comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) + .thenComparing(PRIMARY_FIRST_COMPARATOR); } else { - if (matchAnyShard) { - comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR); - } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); - } - + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); } + // If index has segrep enabled, then use replication checkpoint info to order the replicas + if (true) { + comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); + } + nodeShardStates.sort(comparator); if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 38bb9786024c7..4c869497644b9 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionType; import org.opensearch.action.FailedNodeException; @@ -57,6 +58,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.shard.ShardStateMetadata; @@ -152,12 +154,18 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { final ShardId shardId = request.getShardId(); IndexService indexService = indicesService.indexService(request.getShardId().getIndex()); - final ReplicationCheckpoint replicationCheckpoint; - if (indexService != null) { - replicationCheckpoint = indexService.getShard(shardId.getId()).getLatestReplicationCheckpoint(); - } else { - replicationCheckpoint = ReplicationCheckpoint.empty(shardId); +// IndexSettings settings = indicesService.indexService(request.getShardId().getIndex()).getIndexSettings(); + ReplicationCheckpoint replicationCheckpoint = null; + try { + IndexShard shard = indexService.getShard(shardId.getId()); + if (shard != null) { + replicationCheckpoint = shard.getLatestReplicationCheckpoint(); + } + } catch (Exception e) { + logger.warn("Error fetching replication checkpoint {}", replicationCheckpoint); } + if (replicationCheckpoint == null) + replicationCheckpoint = ReplicationCheckpoint.empty(shardId); logger.trace("{} loading local shard state info", shardId); ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( logger, @@ -369,19 +377,25 @@ public static class NodeGatewayStartedShards extends BaseNodeResponse { private final String allocationId; private final boolean primary; - private final ReplicationCheckpoint replicationCheckpoint; private final Exception storeException; + private final ReplicationCheckpoint replicationCheckpoint; + + public NodeGatewayStartedShards(StreamInput in) throws IOException { super(in); allocationId = in.readOptionalString(); primary = in.readBoolean(); - replicationCheckpoint = new ReplicationCheckpoint(in); if (in.readBoolean()) { storeException = in.readException(); } else { storeException = null; } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + replicationCheckpoint = new ReplicationCheckpoint(in); + } else { + replicationCheckpoint = null; + } } public NodeGatewayStartedShards( @@ -428,13 +442,15 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalString(allocationId); out.writeBoolean(primary); - replicationCheckpoint.writeTo(out); if (storeException != null) { out.writeBoolean(true); out.writeException(storeException); } else { out.writeBoolean(false); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + replicationCheckpoint.writeTo(out); + } } @Override From 2fc031086e6ecbf87474d117346b5529e05f71d7 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 2 Aug 2022 15:46:24 -0700 Subject: [PATCH 04/11] Use default replication checkpoint causing EOF errors on empty checkpoint --- .../gateway/TransportNodesListGatewayStartedShards.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 4c869497644b9..94b22e74dddbb 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -164,8 +164,7 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { } catch (Exception e) { logger.warn("Error fetching replication checkpoint {}", replicationCheckpoint); } - if (replicationCheckpoint == null) - replicationCheckpoint = ReplicationCheckpoint.empty(shardId); + if (replicationCheckpoint == null) replicationCheckpoint = ReplicationCheckpoint.empty(shardId); logger.trace("{} loading local shard state info", shardId); ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( logger, @@ -381,7 +380,6 @@ public static class NodeGatewayStartedShards extends BaseNodeResponse { private final ReplicationCheckpoint replicationCheckpoint; - public NodeGatewayStartedShards(StreamInput in) throws IOException { super(in); allocationId = in.readOptionalString(); @@ -483,10 +481,10 @@ public int hashCode() { public String toString() { StringBuilder buf = new StringBuilder(); buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary); - buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); if (storeException != null) { buf.append(",storeException=").append(storeException); } + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); buf.append("]"); return buf.toString(); } From 94cbf50fb5409110bd28106f2e0ce52780455ddf Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 3 Aug 2022 09:28:38 -0700 Subject: [PATCH 05/11] Add indexSettings to GatewayAllocator to allow ReplicationCheckpoint comparator only for segrep enabled indices --- .../routing/allocation/Allocators.java | 3 +- .../org/opensearch/cluster/ClusterModule.java | 8 +++- .../routing/allocation/AllocationService.java | 13 ++++-- .../gateway/BaseGatewayShardAllocator.java | 13 +++--- .../opensearch/gateway/GatewayAllocator.java | 28 +++++++++---- .../gateway/PrimaryShardAllocator.java | 19 ++++++--- .../gateway/ReplicaShardAllocator.java | 4 +- ...ransportNodesListGatewayStartedShards.java | 7 ++-- .../ClusterAllocationExplainActionTests.java | 2 +- .../cluster/reroute/ClusterRerouteTests.java | 3 +- .../shrink/TransportResizeActionTests.java | 12 ++++-- .../health/ClusterStateHealthTests.java | 2 +- .../MetadataCreateIndexServiceTests.java | 9 ++-- .../allocation/AllocationServiceTests.java | 5 ++- .../allocation/BalanceConfigurationTests.java | 3 +- .../DecisionsImpactOnClusterHealthTests.java | 3 +- .../MaxRetryAllocationDeciderTests.java | 3 +- .../NodeVersionAllocationDeciderTests.java | 6 ++- .../RandomAllocationDeciderTests.java | 3 +- .../ResizeAllocationDeciderTests.java | 3 +- .../decider/DiskThresholdDeciderTests.java | 42 ++++++++++++------- .../EnableAllocationShortCircuitTests.java | 3 +- .../decider/FilterAllocationDeciderTests.java | 9 ++-- .../gateway/GatewayServiceTests.java | 3 +- .../gateway/PrimaryShardAllocatorTests.java | 15 +++++-- .../gateway/ReplicaShardAllocatorTests.java | 4 +- .../indices/cluster/ClusterStateChanges.java | 3 +- .../cluster/OpenSearchAllocationTestCase.java | 14 ++++--- .../test/gateway/TestGatewayAllocator.java | 9 +++- 29 files changed, 176 insertions(+), 75 deletions(-) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java index d700b9dab2cf3..4a12a6f342f42 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java @@ -94,7 +94,8 @@ public static AllocationService createAllocationService(Settings settings, Clust NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); } diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index f8ba520e465e2..e8976ac48de26 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -147,7 +147,13 @@ public ClusterModule( this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext); - this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); + this.allocationService = new AllocationService( + allocationDeciders, + shardsAllocator, + clusterInfoService, + snapshotsInfoService, + settings + ); } public static List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 1c6e4732a2ab7..dedd8e538bcaa 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -55,6 +55,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; import org.opensearch.snapshots.SnapshotsInfoService; @@ -92,15 +93,18 @@ public class AllocationService { private final ClusterInfoService clusterInfoService; private SnapshotsInfoService snapshotsInfoService; + private final Settings settings; + // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator public AllocationService( AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService + SnapshotsInfoService snapshotsInfoService, + Settings settings ) { - this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); + this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, settings); setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator)); } @@ -108,12 +112,14 @@ public AllocationService( AllocationDeciders allocationDeciders, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService + SnapshotsInfoService snapshotsInfoService, + Settings settings ) { this.allocationDeciders = allocationDeciders; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; + this.settings = settings; } /** @@ -219,6 +225,7 @@ public ClusterState applyFailedShards( return clusterState; } ClusterState tmpState = IndexMetadataUpdater.removeStaleIdsWithoutRoutings(clusterState, staleShards, logger); + // clusterState.getMetadata().index(failedShards.get(0).getMessage()) RoutingNodes routingNodes = getMutableRoutingNodes(tmpState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 59ef894958cbe..dc767fe3a2554 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.settings.Settings; import java.util.ArrayList; import java.util.List; @@ -62,7 +63,7 @@ public abstract class BaseGatewayShardAllocator { /** * Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist. - * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)} + * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger, Settings)} * to make decisions on assigning shards to nodes. * @param shardRouting the shard to allocate * @param allocation the allocation state container object @@ -71,9 +72,10 @@ public abstract class BaseGatewayShardAllocator { public void allocateUnassigned( ShardRouting shardRouting, RoutingAllocation allocation, - ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler + ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler, + Settings settings ) { - final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger); + final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger, settings); if (allocateUnassignedDecision.isDecisionTaken() == false) { // no decision was taken by this allocator @@ -106,7 +108,7 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation /** * Make a decision on the allocation of an unassigned shard. This method is used by - * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions + * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler, Settings)} to make decisions * about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated. * * @param unassignedShard the unassigned shard to allocate @@ -117,7 +119,8 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation public abstract AllocateUnassignedDecision makeAllocationDecision( ShardRouting unassignedShard, RoutingAllocation allocation, - Logger logger + Logger logger, + Settings settings ); /** diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index cdcf813d9ede0..e15c23f26da10 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -51,6 +51,7 @@ import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; @@ -79,6 +80,8 @@ public class GatewayAllocator implements ExistingShardsAllocator { private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; + protected final Settings settings; + private final ConcurrentMap< ShardId, AsyncShardFetch> asyncFetchStarted = ConcurrentCollections @@ -91,11 +94,13 @@ public class GatewayAllocator implements ExistingShardsAllocator { public GatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, - TransportNodesListShardStoreMetadata storeAction + TransportNodesListShardStoreMetadata storeAction, + Settings settings ) { this.rerouteService = rerouteService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); + this.settings = settings; } @Override @@ -111,6 +116,7 @@ protected GatewayAllocator() { this.rerouteService = null; this.primaryShardAllocator = null; this.replicaShardAllocator = null; + this.settings = null; } @Override @@ -165,7 +171,14 @@ public void allocateUnassigned( ) { assert primaryShardAllocator != null; assert replicaShardAllocator != null; - innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); + innerAllocatedUnassigned( + allocation, + primaryShardAllocator, + replicaShardAllocator, + shardRouting, + unassignedAllocationHandler, + this.settings + ); } // allow for testing infra to change shard allocators implementation @@ -174,13 +187,14 @@ protected static void innerAllocatedUnassigned( PrimaryShardAllocator primaryShardAllocator, ReplicaShardAllocator replicaShardAllocator, ShardRouting shardRouting, - ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler + ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler, + Settings settings ) { assert shardRouting.unassigned(); if (shardRouting.primary()) { - primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler); + primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings); } else { - replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler); + replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings); } } @@ -190,10 +204,10 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting assert routingAllocation.debugDecision(); if (unassignedShard.primary()) { assert primaryShardAllocator != null; - return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); + return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings); } else { assert replicaShardAllocator != null; - return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); + return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings); } } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 744c5aad583f5..9792a21c953d9 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -48,9 +48,11 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.Decision.Type; +import org.opensearch.common.settings.Settings; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; +import org.opensearch.index.IndexSettings; import java.util.ArrayList; import java.util.Collection; @@ -89,11 +91,14 @@ private static boolean isResponsibleFor(final ShardRouting shard) { || shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT); } + Settings settings; + @Override public AllocateUnassignedDecision makeAllocationDecision( final ShardRouting unassignedShard, final RoutingAllocation allocation, - final Logger logger + final Logger logger, + final Settings settings ) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for allocating this shard @@ -124,7 +129,9 @@ public AllocateUnassignedDecision makeAllocationDecision( // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); + final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id()); + // final IndexSettings indexSettings = indexMetadata.getIndex(). final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT; assert inSyncAllocationIds.isEmpty() == false; @@ -135,6 +142,7 @@ public AllocateUnassignedDecision makeAllocationDecision( allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, + indexSettings, logger ); final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; @@ -328,6 +336,7 @@ protected static NodeShardsResult buildNodeShardsResult( Set ignoreNodes, Set inSyncAllocationIds, FetchResult shardState, + IndexSettings indexSettings, Logger logger ) { List nodeShardStates = new ArrayList<>(); @@ -398,11 +407,11 @@ protected static NodeShardsResult buildNodeShardsResult( } // If index has segrep enabled, then use replication checkpoint info to order the replicas - if (true) { - comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); - } + if (indexSettings.isSegRepEnabled()) { + comparator = HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR; + } - nodeShardStates.sort(comparator); + nodeShardStates.sort(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); if (logger.isTraceEnabled()) { logger.trace( diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index c0b831b6fe4d0..bad85f7cfb57b 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.Nullable; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.StoreFileMetadata; @@ -180,7 +181,8 @@ private static boolean isResponsibleFor(final ShardRouting shard) { public AllocateUnassignedDecision makeAllocationDecision( final ShardRouting unassignedShard, final RoutingAllocation allocation, - final Logger logger + final Logger logger, + final Settings settings ) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for deciding on this shard diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 94b22e74dddbb..7118a3ace0f33 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -154,7 +154,9 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { final ShardId shardId = request.getShardId(); IndexService indexService = indicesService.indexService(request.getShardId().getIndex()); -// IndexSettings settings = indicesService.indexService(request.getShardId().getIndex()).getIndexSettings(); + final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + final IndexSettings indexSettings = new IndexSettings(metadata, settings); + // IndexSettings settings = indicesService.indexService(request.getShardId().getIndex()).getIndexSettings(); ReplicationCheckpoint replicationCheckpoint = null; try { IndexShard shard = indexService.getShard(shardId.getId()); @@ -179,9 +181,8 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { } else { // TODO: Fallback for BWC with older OpenSearch versions. // Remove once request.getCustomDataPath() always returns non-null - final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); if (metadata != null) { - customDataPath = new IndexSettings(metadata, settings).customDataPath(); + customDataPath = indexSettings.customDataPath(); } else { logger.trace("{} node doesn't have meta data for the requests index", shardId); throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java index 59a4a05f0db8e..49765d436d282 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java @@ -95,7 +95,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing throw new UnsupportedOperationException("cannot explain"); } } - }, null, null) + }, null, null, null) ); assertEquals(shard.currentNodeId(), cae.getCurrentNode().getId()); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java index 859d8ce3bb734..1709290a3ab4f 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -97,7 +97,8 @@ public void testClusterStateUpdateTask() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); ClusterState clusterState = createInitialClusterState(allocationService); ClusterRerouteRequest req = new ClusterRerouteRequest(); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java index e4b79ac54f8fd..8c349e60ac551 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java @@ -154,7 +154,8 @@ public void testErrorCondition() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -181,7 +182,8 @@ public void testPassNumRoutingShards() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -212,7 +214,8 @@ public void testPassNumRoutingShardsAndFail() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -249,7 +252,8 @@ public void testShrinkIndexSettings() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); diff --git a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java index 80e76d44b9c41..a6351676c3df2 100644 --- a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java @@ -177,7 +177,7 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte threadPool, new ActionFilters(new HashSet<>()), indexNameExpressionResolver, - new AllocationService(null, new TestGatewayAllocator(), null, null, null) + new AllocationService(null, new TestGatewayAllocator(), null, null, null, null) ); PlainActionFuture listener = new PlainActionFuture<>(); action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index ac03ab49bbbbd..97a157f933817 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -304,7 +304,8 @@ public void testValidateShrinkIndex() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -406,7 +407,8 @@ public void testValidateSplitIndex() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -546,7 +548,8 @@ private void runPrepareResizeIndexSettingsTest( new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); final RoutingTable initialRoutingTable = service.reroute(initialClusterState, "reroute").routingTable(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java index 38b71db27e02f..cb63a059eedad 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java @@ -159,7 +159,8 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }, new EmptyClusterInfoService(), - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); final String unrealisticAllocatorName = "unrealistic"; @@ -262,7 +263,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } public void testExplainsNonAllocationOfShardWithUnknownAllocator() { - final AllocationService allocationService = new AllocationService(null, null, null, null); + final AllocationService allocationService = new AllocationService(null, null, null, null, null); allocationService.setExistingShardsAllocators( Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()) ); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 1ba69694eaec1..0537840a38a06 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -426,7 +426,8 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }, EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); Metadata.Builder metadataBuilder = Metadata.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java index 35c6aea425b88..34f4b5a690ac2 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java @@ -162,7 +162,8 @@ private static AllocationService newAllocationService(Settings settings, Set new SnapshotShardSizeInfo(snapshotShardSizes.build()) + () -> new SnapshotShardSizeInfo(snapshotShardSizes.build()), + Settings.EMPTY ); state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java index 3f7a998accafe..fd1035f040fc3 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java @@ -88,7 +88,8 @@ public void testRandomDecisions() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); int indices = scaledRandomIntBetween(1, 20); Builder metaBuilder = Metadata.builder(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java index 2d4422fdf6ced..39aa808f08d96 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java @@ -72,7 +72,8 @@ public void setUp() throws Exception { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index c3f54fa7580ac..9e1f1b386cbc7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -140,7 +140,8 @@ public void testDiskThreshold() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); Metadata metadata = Metadata.builder() @@ -220,7 +221,8 @@ public void testDiskThreshold() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -252,7 +254,8 @@ public void testDiskThreshold() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -323,7 +326,8 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); Metadata metadata = Metadata.builder() @@ -373,7 +377,8 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -437,7 +442,8 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -470,7 +476,8 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -572,7 +579,8 @@ public void testDiskThresholdWithShardSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); Metadata metadata = Metadata.builder() @@ -645,7 +653,8 @@ public void testUnknownDiskUsage() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); Metadata metadata = Metadata.builder() @@ -755,7 +764,8 @@ public void testShardRelocationsTakenIntoAccount() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); Metadata metadata = Metadata.builder() @@ -1031,7 +1041,8 @@ public void testCanRemainWithShardRelocatingAway() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); // Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away // and therefor we will have sufficient disk space on node1. @@ -1145,7 +1156,8 @@ public void testForSingleDataNode() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); ClusterState result = strategy.reroute(clusterState, "reroute"); @@ -1268,7 +1280,8 @@ public void testWatermarksEnabledForSingleDataNode() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); ClusterState result = strategy.reroute(clusterState, "reroute"); @@ -1401,7 +1414,8 @@ public void testDiskThresholdWithSnapshotShardSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), clusterInfoService, - snapshotShardSizeInfoRef::get + snapshotShardSizeInfoRef::get, + Settings.EMPTY ); // reroute triggers snapshot shard size fetching diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java index 6a1b3c912ad4a..39a0e374f9fe3 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java @@ -227,7 +227,8 @@ private static AllocationService createAllocationService(Settings.Builder settin new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index 0b00d26182346..f10b11ff111a7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -79,7 +79,8 @@ public void testFilterInitialRecovery() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); ClusterState state = createInitialClusterState( service, @@ -187,7 +188,8 @@ public void testTierFilterIgnored() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); ClusterState state = createInitialClusterState( service, @@ -246,7 +248,8 @@ private void filterSettingsUpdateHelper( new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY); RoutingTable routingTable = state.routingTable(); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java index 2d8a26f8bbe87..60d60ab131390 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java @@ -88,7 +88,8 @@ private GatewayService createService(final Settings.Builder settings) { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + Settings.EMPTY ); return new GatewayService(settings.build(), allocationService, clusterService, null, null, null); } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index 5ab7812430a24..6cdcf9085c36c 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -63,6 +63,7 @@ import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -97,7 +98,7 @@ public void buildTestAllocator() { private void allocateAllUnassigned(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { - testAllocator.allocateUnassigned(iterator.next(), allocation, iterator); + testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, testAllocator.settings); } } @@ -218,8 +219,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() { allocId1, allocId2 ); - testAllocator.addData(node1, allocId1, randomBoolean(), new ReplicationCheckpoint(shardId, 1, 10, 101, 1)); - testAllocator.addData(node2, allocId2, randomBoolean(), new ReplicationCheckpoint(shardId, 1, 10, 120, 2)); + this.testAllocator.enableSegmentReplication(); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 1, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 1, 10, 120, 2)); allocateAllUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -250,6 +252,7 @@ public void testPreferReplicaWithHigherPrimaryTermOnSameSegmentInfoVersion() { allocId2, allocId3 ); + this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 1000, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); @@ -693,6 +696,8 @@ class TestAllocator extends PrimaryShardAllocator { private Map data; + private Settings settings = Settings.EMPTY; + public TestAllocator clear() { data = null; return this; @@ -745,5 +750,9 @@ protected AsyncShardFetch.FetchResult(shardId, data, Collections.emptySet()); } + + public void enableSegmentReplication() { + this.settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index 36ac93524d6aa..fc8d57acdb302 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -96,6 +96,8 @@ public class ReplicaShardAllocatorTests extends OpenSearchAllocationTestCase { private TestAllocator testAllocator; + private final Settings settings = Settings.EMPTY; + @Before public void buildTestAllocator() { this.testAllocator = new TestAllocator(); @@ -104,7 +106,7 @@ public void buildTestAllocator() { private void allocateAllUnassigned(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { - testAllocator.allocateUnassigned(iterator.next(), allocation, iterator); + testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, settings); } } diff --git a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java index 7fe17e570d157..1885d82691d4f 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java @@ -168,7 +168,8 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th new TestGatewayAllocator(), new BalancedShardsAllocator(SETTINGS), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + SETTINGS ); shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor( allocationService, diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index a13d337fa4d26..b2a699da1cc96 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -104,7 +104,8 @@ public static MockAllocationService createAllocationService(Settings settings, C new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE, - SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES, + Settings.EMPTY ); } @@ -114,7 +115,8 @@ public static MockAllocationService createAllocationService(Settings settings, C new TestGatewayAllocator(), new BalancedShardsAllocator(settings), clusterInfoService, - SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES, + Settings.EMPTY ); } @@ -136,7 +138,8 @@ public static MockAllocationService createAllocationService( gatewayAllocator, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE, - snapshotsInfoService + snapshotsInfoService, + Settings.EMPTY ); } @@ -312,9 +315,10 @@ public MockAllocationService( GatewayAllocator gatewayAllocator, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService + SnapshotsInfoService snapshotsInfoService, + Settings settings ) { - super(allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService, snapshotsInfoService); + super(allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService, snapshotsInfoService, settings); } public void setNanoTimeOverride(long nanoTime) { diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index 8499f3d77ca9b..cdd1301060ad7 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -157,7 +157,14 @@ public void allocateUnassigned( UnassignedAllocationHandler unassignedAllocationHandler ) { currentNodes = allocation.nodes(); - innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); + innerAllocatedUnassigned( + allocation, + primaryShardAllocator, + replicaShardAllocator, + shardRouting, + unassignedAllocationHandler, + this.settings + ); } /** From 49097461897d28b2cd0436acc76bdc178bf1e5bf Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 3 Aug 2022 13:13:52 -0700 Subject: [PATCH 06/11] Add unit tests for primary term first replica promotion & comparator fix --- .../gateway/PrimaryShardAllocator.java | 4 +- .../gateway/PrimaryShardAllocatorTests.java | 60 +++++++++++++++---- 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 9792a21c953d9..bb7e7b2b55a06 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -408,10 +408,10 @@ protected static NodeShardsResult buildNodeShardsResult( // If index has segrep enabled, then use replication checkpoint info to order the replicas if (indexSettings.isSegRepEnabled()) { - comparator = HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR; + comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); } - nodeShardStates.sort(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); + nodeShardStates.sort(comparator); if (logger.isTraceEnabled()) { logger.trace( diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index 6cdcf9085c36c..3fb843de1e74b 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -208,20 +208,23 @@ public void testShardLockObtainFailedException() { } /** - * Tests that replica with highest segment info version will be selected as target + * Tests that replica with highest primary ter version will be selected as target */ - public void testPreferReplicaWithHighestSegmentInfoVersion() { + public void testPreferReplicaWithHighestPrimaryTerm() { String allocId1 = randomAlphaOfLength(10); String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( yesAllocationDeciders(), CLUSTER_RECOVERED, allocId1, - allocId2 + allocId2, + allocId3 ); this.testAllocator.enableSegmentReplication(); - testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 1, 10, 101, 1)); - testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 1, 10, 120, 2)); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2)); + testAllocator.addData(node3, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); allocateAllUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -239,9 +242,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() { } /** - * Tests that replica with equal segment info version but higher primary term will be selected as target + * Tests that replica with highest segment info version will be selected as target on equal primary terms */ - public void testPreferReplicaWithHigherPrimaryTermOnSameSegmentInfoVersion() { + public void testPreferReplicaWithHighestSegmentInfoVersion() { String allocId1 = randomAlphaOfLength(10); String allocId2 = randomAlphaOfLength(10); String allocId3 = randomAlphaOfLength(10); @@ -253,7 +256,41 @@ public void testPreferReplicaWithHigherPrimaryTermOnSameSegmentInfoVersion() { allocId3 ); this.testAllocator.enableSegmentReplication(); - testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 1000, 10, 101, 1)); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3)); + testAllocator.addData(node3, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node2.getId()) + ); + // Assert node2's allocation id is used + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId2) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + + /** + * Tests that prefer allocation of older primary even though having lower replication checkpoint + */ + public void testOutOfSyncHighestRepCheckpointIsIgnored() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId3 + ); + this.testAllocator.enableSegmentReplication(); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); allocateAllUnassigned(allocation); @@ -262,12 +299,12 @@ public void testPreferReplicaWithHigherPrimaryTermOnSameSegmentInfoVersion() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat( allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node2.getId()) + equalTo(node3.getId()) ); // Assert node2's allocation id is used with highest replication checkpoint assertThat( allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId2) + equalTo(allocId3) ); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -286,7 +323,8 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { allocId2, allocId3 ); - testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 1000, 10, 101, 1)); + this.testAllocator.enableSegmentReplication(); + testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); allocateAllUnassigned(allocation); From 907df84cdadc008b8b486c5ee5ea575169959c2c Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 3 Aug 2022 14:00:38 -0700 Subject: [PATCH 07/11] Fix NPE on empty IndexMetadata --- ...ransportNodesListGatewayStartedShards.java | 33 +++++++------------ .../gateway/PrimaryShardAllocatorTests.java | 1 - 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 7118a3ace0f33..09eea3dcaa96d 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -56,7 +56,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -153,20 +152,7 @@ protected NodesGatewayStartedShards newResponse( protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { final ShardId shardId = request.getShardId(); - IndexService indexService = indicesService.indexService(request.getShardId().getIndex()); - final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); - final IndexSettings indexSettings = new IndexSettings(metadata, settings); - // IndexSettings settings = indicesService.indexService(request.getShardId().getIndex()).getIndexSettings(); - ReplicationCheckpoint replicationCheckpoint = null; - try { - IndexShard shard = indexService.getShard(shardId.getId()); - if (shard != null) { - replicationCheckpoint = shard.getLatestReplicationCheckpoint(); - } - } catch (Exception e) { - logger.warn("Error fetching replication checkpoint {}", replicationCheckpoint); - } - if (replicationCheckpoint == null) replicationCheckpoint = ReplicationCheckpoint.empty(shardId); + ReplicationCheckpoint replicationCheckpoint; logger.trace("{} loading local shard state info", shardId); ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( logger, @@ -181,8 +167,9 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { } else { // TODO: Fallback for BWC with older OpenSearch versions. // Remove once request.getCustomDataPath() always returns non-null + final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); if (metadata != null) { - customDataPath = indexSettings.customDataPath(); + customDataPath = new IndexSettings(metadata, settings).customDataPath(); } else { logger.trace("{} node doesn't have meta data for the requests index", shardId); throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); @@ -208,12 +195,11 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { exception ); String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - return new NodeGatewayStartedShards( clusterService.localNode(), allocationId, shardStateMetadata.primary, - replicationCheckpoint, + null, exception ); } @@ -221,6 +207,8 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + IndexShard shard = indicesService.getShardOrNull(shardId); + replicationCheckpoint = shard != null ? shard.getLatestReplicationCheckpoint() : null; return new NodeGatewayStartedShards( clusterService.localNode(), allocationId, @@ -229,7 +217,7 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { ); } logger.trace("{} no local shard info found", shardId); - return new NodeGatewayStartedShards(clusterService.localNode(), null, false, replicationCheckpoint); + return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null); } catch (Exception e) { throw new OpenSearchException("failed to load started shards", e); } @@ -390,7 +378,7 @@ public NodeGatewayStartedShards(StreamInput in) throws IOException { } else { storeException = null; } - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_3_0_0) && in.readBoolean()) { replicationCheckpoint = new ReplicationCheckpoint(in); } else { replicationCheckpoint = null; @@ -447,8 +435,11 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeBoolean(false); } - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_3_0_0) && replicationCheckpoint != null) { + out.writeBoolean(true); replicationCheckpoint.writeTo(out); + } else { + out.writeBoolean(false); } } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index 3fb843de1e74b..cfee43f1dce1a 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -275,7 +275,6 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() { assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } - /** * Tests that prefer allocation of older primary even though having lower replication checkpoint */ From a62d02c537b3e82524bf29dc29defbe277c2fe48 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 3 Aug 2022 14:45:12 -0700 Subject: [PATCH 08/11] Remove settings from AllocationService and directly inject in GatewayAllocator --- .../routing/allocation/Allocators.java | 3 +- .../org/opensearch/cluster/ClusterModule.java | 8 +--- .../routing/allocation/AllocationService.java | 13 ++---- .../gateway/PrimaryShardAllocator.java | 7 +--- ...ransportNodesListGatewayStartedShards.java | 15 ++++--- .../ClusterAllocationExplainActionTests.java | 2 +- .../cluster/reroute/ClusterRerouteTests.java | 3 +- .../shrink/TransportResizeActionTests.java | 12 ++---- .../health/ClusterStateHealthTests.java | 2 +- .../MetadataCreateIndexServiceTests.java | 9 ++-- .../allocation/AllocationServiceTests.java | 5 +-- .../allocation/BalanceConfigurationTests.java | 3 +- .../DecisionsImpactOnClusterHealthTests.java | 3 +- .../MaxRetryAllocationDeciderTests.java | 3 +- .../NodeVersionAllocationDeciderTests.java | 6 +-- .../RandomAllocationDeciderTests.java | 3 +- .../ResizeAllocationDeciderTests.java | 3 +- .../decider/DiskThresholdDeciderTests.java | 42 +++++++------------ .../EnableAllocationShortCircuitTests.java | 3 +- .../decider/FilterAllocationDeciderTests.java | 9 ++-- .../gateway/GatewayServiceTests.java | 3 +- .../indices/cluster/ClusterStateChanges.java | 3 +- .../cluster/OpenSearchAllocationTestCase.java | 14 +++---- 23 files changed, 58 insertions(+), 116 deletions(-) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java index 4a12a6f342f42..d700b9dab2cf3 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/Allocators.java @@ -94,8 +94,7 @@ public static AllocationService createAllocationService(Settings settings, Clust NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); } diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index e8976ac48de26..f8ba520e465e2 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -147,13 +147,7 @@ public ClusterModule( this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext); - this.allocationService = new AllocationService( - allocationDeciders, - shardsAllocator, - clusterInfoService, - snapshotsInfoService, - settings - ); + this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); } public static List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index dedd8e538bcaa..1c6e4732a2ab7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -55,7 +55,6 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.collect.ImmutableOpenMap; -import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; import org.opensearch.snapshots.SnapshotsInfoService; @@ -93,18 +92,15 @@ public class AllocationService { private final ClusterInfoService clusterInfoService; private SnapshotsInfoService snapshotsInfoService; - private final Settings settings; - // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator public AllocationService( AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService, - Settings settings + SnapshotsInfoService snapshotsInfoService ) { - this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, settings); + this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator)); } @@ -112,14 +108,12 @@ public AllocationService( AllocationDeciders allocationDeciders, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService, - Settings settings + SnapshotsInfoService snapshotsInfoService ) { this.allocationDeciders = allocationDeciders; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; - this.settings = settings; } /** @@ -225,7 +219,6 @@ public ClusterState applyFailedShards( return clusterState; } ClusterState tmpState = IndexMetadataUpdater.removeStaleIdsWithoutRoutings(clusterState, staleShards, logger); - // clusterState.getMetadata().index(failedShards.get(0).getMessage()) RoutingNodes routingNodes = getMutableRoutingNodes(tmpState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index bb7e7b2b55a06..d5f8bdbcf5d7c 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -91,8 +91,6 @@ private static boolean isResponsibleFor(final ShardRouting shard) { || shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT); } - Settings settings; - @Override public AllocateUnassignedDecision makeAllocationDecision( final ShardRouting unassignedShard, @@ -129,9 +127,8 @@ public AllocateUnassignedDecision makeAllocationDecision( // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); - final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); + final IndexSettings indexSettings = settings != null ? new IndexSettings(indexMetadata, settings) : null; final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id()); - // final IndexSettings indexSettings = indexMetadata.getIndex(). final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT; assert inSyncAllocationIds.isEmpty() == false; @@ -407,7 +404,7 @@ protected static NodeShardsResult buildNodeShardsResult( } // If index has segrep enabled, then use replication checkpoint info to order the replicas - if (indexSettings.isSegRepEnabled()) { + if (indexSettings != null && indexSettings.isSegRepEnabled()) { comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 09eea3dcaa96d..36d62580caa64 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -361,12 +361,9 @@ public String getCustomDataPath() { * @opensearch.internal */ public static class NodeGatewayStartedShards extends BaseNodeResponse { - private final String allocationId; private final boolean primary; - private final Exception storeException; - private final ReplicationCheckpoint replicationCheckpoint; public NodeGatewayStartedShards(StreamInput in) throws IOException { @@ -435,11 +432,13 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeBoolean(false); } - if (out.getVersion().onOrAfter(Version.V_3_0_0) && replicationCheckpoint != null) { - out.writeBoolean(true); - replicationCheckpoint.writeTo(out); - } else { - out.writeBoolean(false); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (replicationCheckpoint != null) { + out.writeBoolean(true); + replicationCheckpoint.writeTo(out); + } else { + out.writeBoolean(false); + } } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java index 49765d436d282..59a4a05f0db8e 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java @@ -95,7 +95,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing throw new UnsupportedOperationException("cannot explain"); } } - }, null, null, null) + }, null, null) ); assertEquals(shard.currentNodeId(), cae.getCurrentNode().getId()); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java index 1709290a3ab4f..859d8ce3bb734 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -97,8 +97,7 @@ public void testClusterStateUpdateTask() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); ClusterState clusterState = createInitialClusterState(allocationService); ClusterRerouteRequest req = new ClusterRerouteRequest(); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java index 8c349e60ac551..e4b79ac54f8fd 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java @@ -154,8 +154,7 @@ public void testErrorCondition() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -182,8 +181,7 @@ public void testPassNumRoutingShards() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -214,8 +212,7 @@ public void testPassNumRoutingShardsAndFail() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -252,8 +249,7 @@ public void testShrinkIndexSettings() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); diff --git a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java index a6351676c3df2..80e76d44b9c41 100644 --- a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java @@ -177,7 +177,7 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte threadPool, new ActionFilters(new HashSet<>()), indexNameExpressionResolver, - new AllocationService(null, new TestGatewayAllocator(), null, null, null, null) + new AllocationService(null, new TestGatewayAllocator(), null, null, null) ); PlainActionFuture listener = new PlainActionFuture<>(); action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 97a157f933817..ac03ab49bbbbd 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -304,8 +304,7 @@ public void testValidateShrinkIndex() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -407,8 +406,7 @@ public void testValidateSplitIndex() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); @@ -548,8 +546,7 @@ private void runPrepareResizeIndexSettingsTest( new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); final RoutingTable initialRoutingTable = service.reroute(initialClusterState, "reroute").routingTable(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java index cb63a059eedad..38b71db27e02f 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java @@ -159,8 +159,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }, new EmptyClusterInfoService(), - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); final String unrealisticAllocatorName = "unrealistic"; @@ -263,7 +262,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } public void testExplainsNonAllocationOfShardWithUnknownAllocator() { - final AllocationService allocationService = new AllocationService(null, null, null, null, null); + final AllocationService allocationService = new AllocationService(null, null, null, null); allocationService.setExistingShardsAllocators( Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()) ); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 0537840a38a06..1ba69694eaec1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -426,8 +426,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }, EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); Metadata.Builder metadataBuilder = Metadata.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java index 34f4b5a690ac2..35c6aea425b88 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java @@ -162,8 +162,7 @@ private static AllocationService newAllocationService(Settings settings, Set new SnapshotShardSizeInfo(snapshotShardSizes.build()), - Settings.EMPTY + () -> new SnapshotShardSizeInfo(snapshotShardSizes.build()) ); state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java index fd1035f040fc3..3f7a998accafe 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RandomAllocationDeciderTests.java @@ -88,8 +88,7 @@ public void testRandomDecisions() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); int indices = scaledRandomIntBetween(1, 20); Builder metaBuilder = Metadata.builder(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java index 39aa808f08d96..2d4422fdf6ced 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ResizeAllocationDeciderTests.java @@ -72,8 +72,7 @@ public void setUp() throws Exception { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 9e1f1b386cbc7..c3f54fa7580ac 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -140,8 +140,7 @@ public void testDiskThreshold() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); Metadata metadata = Metadata.builder() @@ -221,8 +220,7 @@ public void testDiskThreshold() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -254,8 +252,7 @@ public void testDiskThreshold() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -326,8 +323,7 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); Metadata metadata = Metadata.builder() @@ -377,8 +373,7 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -442,8 +437,7 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -476,8 +470,7 @@ public void testDiskThresholdWithAbsoluteSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); clusterState = strategy.reroute(clusterState, "reroute"); @@ -579,8 +572,7 @@ public void testDiskThresholdWithShardSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); Metadata metadata = Metadata.builder() @@ -653,8 +645,7 @@ public void testUnknownDiskUsage() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); Metadata metadata = Metadata.builder() @@ -764,8 +755,7 @@ public void testShardRelocationsTakenIntoAccount() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); Metadata metadata = Metadata.builder() @@ -1041,8 +1031,7 @@ public void testCanRemainWithShardRelocatingAway() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); // Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away // and therefor we will have sufficient disk space on node1. @@ -1156,8 +1145,7 @@ public void testForSingleDataNode() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); ClusterState result = strategy.reroute(clusterState, "reroute"); @@ -1280,8 +1268,7 @@ public void testWatermarksEnabledForSingleDataNode() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); ClusterState result = strategy.reroute(clusterState, "reroute"); @@ -1414,8 +1401,7 @@ public void testDiskThresholdWithSnapshotShardSizes() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), clusterInfoService, - snapshotShardSizeInfoRef::get, - Settings.EMPTY + snapshotShardSizeInfoRef::get ); // reroute triggers snapshot shard size fetching diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java index 39a0e374f9fe3..6a1b3c912ad4a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/EnableAllocationShortCircuitTests.java @@ -227,8 +227,7 @@ private static AllocationService createAllocationService(Settings.Builder settin new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index f10b11ff111a7..0b00d26182346 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -79,8 +79,7 @@ public void testFilterInitialRecovery() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); ClusterState state = createInitialClusterState( service, @@ -188,8 +187,7 @@ public void testTierFilterIgnored() { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); ClusterState state = createInitialClusterState( service, @@ -248,8 +246,7 @@ private void filterSettingsUpdateHelper( new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY); RoutingTable routingTable = state.routingTable(); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java index 60d60ab131390..2d8a26f8bbe87 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java @@ -88,8 +88,7 @@ private GatewayService createService(final Settings.Builder settings) { new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - Settings.EMPTY + EmptySnapshotsInfoService.INSTANCE ); return new GatewayService(settings.build(), allocationService, clusterService, null, null, null); } diff --git a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java index 1885d82691d4f..7fe17e570d157 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java @@ -168,8 +168,7 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th new TestGatewayAllocator(), new BalancedShardsAllocator(SETTINGS), EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE, - SETTINGS + EmptySnapshotsInfoService.INSTANCE ); shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor( allocationService, diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index b2a699da1cc96..a13d337fa4d26 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -104,8 +104,7 @@ public static MockAllocationService createAllocationService(Settings settings, C new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE, - SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES, - Settings.EMPTY + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES ); } @@ -115,8 +114,7 @@ public static MockAllocationService createAllocationService(Settings settings, C new TestGatewayAllocator(), new BalancedShardsAllocator(settings), clusterInfoService, - SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES, - Settings.EMPTY + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES ); } @@ -138,8 +136,7 @@ public static MockAllocationService createAllocationService( gatewayAllocator, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE, - snapshotsInfoService, - Settings.EMPTY + snapshotsInfoService ); } @@ -315,10 +312,9 @@ public MockAllocationService( GatewayAllocator gatewayAllocator, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService, - Settings settings + SnapshotsInfoService snapshotsInfoService ) { - super(allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService, snapshotsInfoService, settings); + super(allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService, snapshotsInfoService); } public void setNanoTimeOverride(long nanoTime) { From ac47ffd6736feebb144ceb65ccb614cdde690d34 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 3 Aug 2022 17:34:55 -0700 Subject: [PATCH 09/11] Add more unit tests and minor code clean up Signed-off-by: Suraj Singh --- .../gateway/PrimaryShardAllocator.java | 4 +- ...ransportNodesListGatewayStartedShards.java | 6 +-- .../gateway/PrimaryShardAllocatorTests.java | 46 ++++++++++++++++--- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index d5f8bdbcf5d7c..5cf50807d935f 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -318,8 +318,8 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS NodeGatewayStartedShards::primary ).reversed(); - private static final Comparator HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::replicationCheckpoint + private static final Comparator HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.nullsLast( + Comparator.comparing(NodeGatewayStartedShards::replicationCheckpoint) ); /** diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 36d62580caa64..3047e27b7a037 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -152,7 +152,6 @@ protected NodesGatewayStartedShards newResponse( protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { final ShardId shardId = request.getShardId(); - ReplicationCheckpoint replicationCheckpoint; logger.trace("{} loading local shard state info", shardId); ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( logger, @@ -207,13 +206,12 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - IndexShard shard = indicesService.getShardOrNull(shardId); - replicationCheckpoint = shard != null ? shard.getLatestReplicationCheckpoint() : null; + final IndexShard shard = indicesService.getShardOrNull(shardId); return new NodeGatewayStartedShards( clusterService.localNode(), allocationId, shardStateMetadata.primary, - replicationCheckpoint + shard != null ? shard.getLatestReplicationCheckpoint() : null ); } logger.trace("{} no local shard info found", shardId); diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index cfee43f1dce1a..d1ed1ec6404c5 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -224,7 +224,7 @@ public void testPreferReplicaWithHighestPrimaryTerm() { this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2)); - testAllocator.addData(node3, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); allocateAllUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -241,6 +241,40 @@ public void testPreferReplicaWithHighestPrimaryTerm() { assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } + /** + * Tests that replica with highest primary ter version will be selected as target + */ + public void testPreferReplicaWithNullReplicationCheckpoint() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + this.testAllocator.enableSegmentReplication(); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId()) + ); + // Assert node3's allocation id should be used as it has highest replication checkpoint + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId3) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + /** * Tests that replica with highest segment info version will be selected as target on equal primary terms */ @@ -258,7 +292,7 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() { this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3)); - testAllocator.addData(node3, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); allocateAllUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -276,7 +310,7 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() { } /** - * Tests that prefer allocation of older primary even though having lower replication checkpoint + * Tests that prefer allocation of replica at lower checkpoint but in sync set */ public void testOutOfSyncHighestRepCheckpointIsIgnored() { String allocId1 = randomAlphaOfLength(10); @@ -300,7 +334,7 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() { allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node3.getId()) ); - // Assert node2's allocation id is used with highest replication checkpoint + // Assert node3's allocation id is used assertThat( allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId3) @@ -309,7 +343,7 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() { } /** - * Tests that prefer allocation of older primary even though having lower replication checkpoint + * Tests that prefer allocation of older primary over replica with higher replication checkpoint */ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { String allocId1 = randomAlphaOfLength(10); @@ -334,7 +368,7 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()) ); - // Assert node2's allocation id is used with highest replication checkpoint + // Assert node1's allocation id is used with highest replication checkpoint assertThat( allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId1) From de07dac82eb4aa63121404765887982219a69731 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 8 Aug 2022 11:55:51 -0700 Subject: [PATCH 10/11] Address review comments & integration test Signed-off-by: Suraj Singh --- .../gateway/BaseGatewayShardAllocator.java | 13 ++++----- .../opensearch/gateway/GatewayAllocator.java | 28 +++++------------- .../gateway/PrimaryShardAllocator.java | 29 +++++++++---------- .../gateway/ReplicaShardAllocator.java | 4 +-- ...ransportNodesListGatewayStartedShards.java | 4 ++- .../gateway/PrimaryShardAllocatorTests.java | 16 ++-------- .../gateway/ReplicaShardAllocatorTests.java | 4 +-- .../test/gateway/TestGatewayAllocator.java | 18 +++--------- 8 files changed, 36 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index dc767fe3a2554..59ef894958cbe 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -43,7 +43,6 @@ import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.common.settings.Settings; import java.util.ArrayList; import java.util.List; @@ -63,7 +62,7 @@ public abstract class BaseGatewayShardAllocator { /** * Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist. - * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger, Settings)} + * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)} * to make decisions on assigning shards to nodes. * @param shardRouting the shard to allocate * @param allocation the allocation state container object @@ -72,10 +71,9 @@ public abstract class BaseGatewayShardAllocator { public void allocateUnassigned( ShardRouting shardRouting, RoutingAllocation allocation, - ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler, - Settings settings + ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler ) { - final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger, settings); + final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger); if (allocateUnassignedDecision.isDecisionTaken() == false) { // no decision was taken by this allocator @@ -108,7 +106,7 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation /** * Make a decision on the allocation of an unassigned shard. This method is used by - * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler, Settings)} to make decisions + * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions * about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated. * * @param unassignedShard the unassigned shard to allocate @@ -119,8 +117,7 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation public abstract AllocateUnassignedDecision makeAllocationDecision( ShardRouting unassignedShard, RoutingAllocation allocation, - Logger logger, - Settings settings + Logger logger ); /** diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index e15c23f26da10..cdcf813d9ede0 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -51,7 +51,6 @@ import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasables; -import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; @@ -80,8 +79,6 @@ public class GatewayAllocator implements ExistingShardsAllocator { private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; - protected final Settings settings; - private final ConcurrentMap< ShardId, AsyncShardFetch> asyncFetchStarted = ConcurrentCollections @@ -94,13 +91,11 @@ public class GatewayAllocator implements ExistingShardsAllocator { public GatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, - TransportNodesListShardStoreMetadata storeAction, - Settings settings + TransportNodesListShardStoreMetadata storeAction ) { this.rerouteService = rerouteService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); - this.settings = settings; } @Override @@ -116,7 +111,6 @@ protected GatewayAllocator() { this.rerouteService = null; this.primaryShardAllocator = null; this.replicaShardAllocator = null; - this.settings = null; } @Override @@ -171,14 +165,7 @@ public void allocateUnassigned( ) { assert primaryShardAllocator != null; assert replicaShardAllocator != null; - innerAllocatedUnassigned( - allocation, - primaryShardAllocator, - replicaShardAllocator, - shardRouting, - unassignedAllocationHandler, - this.settings - ); + innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } // allow for testing infra to change shard allocators implementation @@ -187,14 +174,13 @@ protected static void innerAllocatedUnassigned( PrimaryShardAllocator primaryShardAllocator, ReplicaShardAllocator replicaShardAllocator, ShardRouting shardRouting, - ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler, - Settings settings + ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler ) { assert shardRouting.unassigned(); if (shardRouting.primary()) { - primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings); + primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler); } else { - replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings); + replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler); } } @@ -204,10 +190,10 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting assert routingAllocation.debugDecision(); if (unassignedShard.primary()) { assert primaryShardAllocator != null; - return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings); + return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); } else { assert replicaShardAllocator != null; - return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings); + return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); } } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 5cf50807d935f..f6b20b7dfb581 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -48,11 +48,9 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.Decision.Type; -import org.opensearch.common.settings.Settings; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; -import org.opensearch.index.IndexSettings; import java.util.ArrayList; import java.util.Collection; @@ -95,8 +93,7 @@ private static boolean isResponsibleFor(final ShardRouting shard) { public AllocateUnassignedDecision makeAllocationDecision( final ShardRouting unassignedShard, final RoutingAllocation allocation, - final Logger logger, - final Settings settings + final Logger logger ) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for allocating this shard @@ -127,7 +124,6 @@ public AllocateUnassignedDecision makeAllocationDecision( // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); - final IndexSettings indexSettings = settings != null ? new IndexSettings(indexMetadata, settings) : null; final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id()); final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT; @@ -139,7 +135,6 @@ public AllocateUnassignedDecision makeAllocationDecision( allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, - indexSettings, logger ); final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; @@ -318,7 +313,7 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS NodeGatewayStartedShards::primary ).reversed(); - private static final Comparator HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.nullsLast( + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.nullsLast( Comparator.comparing(NodeGatewayStartedShards::replicationCheckpoint) ); @@ -333,7 +328,6 @@ protected static NodeShardsResult buildNodeShardsResult( Set ignoreNodes, Set inSyncAllocationIds, FetchResult shardState, - IndexSettings indexSettings, Logger logger ) { List nodeShardStates = new ArrayList<>(); @@ -391,21 +385,24 @@ protected static NodeShardsResult buildNodeShardsResult( } } - Comparator comparator; // allocation preference + /** + * Orders the active shards copies based on below comparators + * 1. No store exception + * 2. Shard copies previously primary shard + * 3. Shard copies with highest replication checkpoint. This comparator is NO-OP for doc rep enabled indices. + */ + final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids Comparator matchingAllocationsFirst = Comparator.comparing( (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR); + .thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); - } - - // If index has segrep enabled, then use replication checkpoint info to order the replicas - if (indexSettings != null && indexSettings.isSegRepEnabled()) { - comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } nodeShardStates.sort(comparator); diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index bad85f7cfb57b..c0b831b6fe4d0 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.Nullable; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.StoreFileMetadata; @@ -181,8 +180,7 @@ private static boolean isResponsibleFor(final ShardRouting shard) { public AllocateUnassignedDecision makeAllocationDecision( final ShardRouting unassignedShard, final RoutingAllocation allocation, - final Logger logger, - final Settings settings + final Logger logger ) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for deciding on this shard diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 3047e27b7a037..953b4def9d653 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -473,7 +473,9 @@ public String toString() { if (storeException != null) { buf.append(",storeException=").append(storeException); } - buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + if (replicationCheckpoint != null) { + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + } buf.append("]"); return buf.toString(); } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index d1ed1ec6404c5..9982bb0d71f4f 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -63,7 +63,6 @@ import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -98,7 +97,7 @@ public void buildTestAllocator() { private void allocateAllUnassigned(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { - testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, testAllocator.settings); + testAllocator.allocateUnassigned(iterator.next(), allocation, iterator); } } @@ -208,7 +207,7 @@ public void testShardLockObtainFailedException() { } /** - * Tests that replica with highest primary ter version will be selected as target + * Tests that replica with the highest primary term version will be selected as target */ public void testPreferReplicaWithHighestPrimaryTerm() { String allocId1 = randomAlphaOfLength(10); @@ -221,7 +220,6 @@ public void testPreferReplicaWithHighestPrimaryTerm() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); @@ -255,7 +253,6 @@ public void testPreferReplicaWithNullReplicationCheckpoint() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); testAllocator.addData(node2, allocId2, false); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2)); @@ -289,7 +286,6 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); @@ -322,7 +318,6 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() { allocId1, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); @@ -356,7 +351,6 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); @@ -767,8 +761,6 @@ class TestAllocator extends PrimaryShardAllocator { private Map data; - private Settings settings = Settings.EMPTY; - public TestAllocator clear() { data = null; return this; @@ -821,9 +813,5 @@ protected AsyncShardFetch.FetchResult(shardId, data, Collections.emptySet()); } - - public void enableSegmentReplication() { - this.settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); - } } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index fc8d57acdb302..36ac93524d6aa 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -96,8 +96,6 @@ public class ReplicaShardAllocatorTests extends OpenSearchAllocationTestCase { private TestAllocator testAllocator; - private final Settings settings = Settings.EMPTY; - @Before public void buildTestAllocator() { this.testAllocator = new TestAllocator(); @@ -106,7 +104,7 @@ public void buildTestAllocator() { private void allocateAllUnassigned(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { - testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, settings); + testAllocator.allocateUnassigned(iterator.next(), allocation, iterator); } } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index cdd1301060ad7..a36dc26685eb4 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -71,9 +71,9 @@ public class TestGatewayAllocator extends GatewayAllocator { Map> knownAllocations = new HashMap<>(); - - Map shardIdNodeToReplicationCheckPointMap = new HashMap<>(); DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; + Map shardIdNodeToReplicationCheckPointMap = new HashMap<>(); + PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator() { @Override protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { @@ -103,10 +103,7 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR }; private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String nodeName) { - return shardIdNodeToReplicationCheckPointMap.getOrDefault( - getReplicationCheckPointKey(shardId, nodeName), - ReplicationCheckpoint.empty(shardId) - ); + return shardIdNodeToReplicationCheckPointMap.getOrDefault(getReplicationCheckPointKey(shardId, nodeName), null); } ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator() { @@ -157,14 +154,7 @@ public void allocateUnassigned( UnassignedAllocationHandler unassignedAllocationHandler ) { currentNodes = allocation.nodes(); - innerAllocatedUnassigned( - allocation, - primaryShardAllocator, - replicaShardAllocator, - shardRouting, - unassignedAllocationHandler, - this.settings - ); + innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } /** From 1f07d13b5bca6f21570b25100580abe131beb9ec Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 9 Aug 2022 12:23:15 -0700 Subject: [PATCH 11/11] Fix comparator on null ReplicationCheckpoint Signed-off-by: Suraj Singh --- .../gateway/PrimaryShardAllocator.java | 11 ++++--- .../gateway/PrimaryShardAllocatorTests.java | 33 +++++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index f6b20b7dfb581..4dc9396751fc9 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -313,8 +313,9 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS NodeGatewayStartedShards::primary ).reversed(); - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.nullsLast( - Comparator.comparing(NodeGatewayStartedShards::replicationCheckpoint) + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( + NodeGatewayStartedShards::replicationCheckpoint, + Comparator.nullsLast(Comparator.naturalOrder()) ); /** @@ -387,9 +388,9 @@ protected static NodeShardsResult buildNodeShardsResult( /** * Orders the active shards copies based on below comparators - * 1. No store exception - * 2. Shard copies previously primary shard - * 3. Shard copies with highest replication checkpoint. This comparator is NO-OP for doc rep enabled indices. + * 1. No store exception i.e. shard copy is readable + * 2. Prefer previous primary shard + * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. */ final Comparator comparator; // allocation preference if (matchAnyShard) { diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index 9982bb0d71f4f..3c39ec9f03b2a 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -272,6 +272,39 @@ public void testPreferReplicaWithNullReplicationCheckpoint() { assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } + /** + * Tests that null ReplicationCheckpoint are ignored + */ + public void testPreferReplicaWithAllNullReplicationCheckpoint() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, false, null, null); + testAllocator.addData(node2, allocId2, false, null, null); + testAllocator.addData(node3, allocId3, true, null, null); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId()) + ); + // Assert node3's allocation id should be used as it was previous primary + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId3) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + /** * Tests that replica with highest segment info version will be selected as target on equal primary terms */