diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 8e63133e87806..82229f244239f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -951,7 +951,7 @@ protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatew this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } - private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + protected void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index 45a0bd7b18afd..c6705a678e077 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -108,7 +108,7 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled( listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -143,6 +143,49 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() { System.nanoTime() ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.NORMAL, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(0, initializingShards.size()); + assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); + assertTrue(rerouteScheduled.get()); + } + + public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduledWithHighPriority() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", "high"); + // passing 0 for timed out latch such that all shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + setupStateAndService(metadata, routingTable); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { if (randomBoolean()) { listener.onFailure(new OpenSearchException("simulated")); @@ -193,7 +236,7 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -237,7 +280,7 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -284,7 +327,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -326,7 +369,7 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -371,7 +414,7 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -416,7 +459,7 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -462,7 +505,7 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -522,7 +565,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index ebc2e59fa5a30..be2486846d401 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.ClusterInfo; @@ -53,6 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; @@ -437,10 +437,51 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); + rerouteLatch.countDown(); + }; + CountDownLatch timedOutShardsLatch = new CountDownLatch(20); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 10); + assertEquals(1, rerouteLatch.getCount()); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 0); + assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners + final boolean terminated = terminate(threadPool); + assert terminated; + clusterService.close(); + } + + public void testCollectTimedOutShardsAndScheduleRerouteWithHighPriority_Success() throws InterruptedException { + createIndexAndUpdateClusterState(2, 5, 2); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); + final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } assertEquals(Priority.HIGH, priority); rerouteLatch.countDown(); }; @@ -448,11 +489,13 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.HIGH); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners @@ -466,22 +509,29 @@ public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onFailure(new OpenSearchException("simulated")); + listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); rerouteLatch.countDown(); }; CountDownLatch timedOutShardsLatch = new CountDownLatch(20); testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners