From 83a55257e7abf6ab144b5a1c24280524695df838 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Thu, 24 Oct 2024 09:26:16 +0530 Subject: [PATCH] rare reroute with npe --- .../coordination/RareClusterStateIT.java | 179 ++++++++++++++++++ .../action/shard/ShardStateAction.java | 1 + .../cluster/service/MasterService.java | 2 +- .../gateway/ReplicaShardAllocator.java | 4 +- .../gateway/ReplicaShardBatchAllocator.java | 28 +++ 5 files changed, 211 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java index b3cb15d028090..cc0264f375103 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; @@ -48,29 +49,39 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.index.Index; +import org.opensearch.core.transport.TransportResponse; import org.opensearch.discovery.Discovery; import org.opensearch.index.IndexService; import org.opensearch.index.mapper.DocumentMapper; import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.IndicesService; +import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.BlockClusterStateProcessing; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.opensearch.transport.TransportSettings; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.action.DocWriteResponse.Result.CREATED; +import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -409,4 +420,172 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED)); } + public void testDisassociateNodesWhileShardInit() throws InterruptedException { + final String clusterManagerName = internalCluster().startClusterManagerOnlyNode( + Settings.builder() + .put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s") + .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true) + .build() + ); + internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()); + String node2 = internalCluster().startDataOnlyNode( + Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build() + ); + + final ClusterService clusterService = internalCluster().clusterService(clusterManagerName); + blockShardStartedResponse(clusterManagerName, clusterService); + + final String index = "index"; + + // create index with 3 primary and 1 replica each + prepareCreate(index).setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + // .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries") + ).get(); + ensureGreen(index); + + // close to have some unassigned started shards shards.. + client().admin().indices().prepareClose(index).get(); + + // block so that replicas are always in init and not started + blockReplicaStart.set(true); + final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName); + clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterState.Builder builder = ClusterState.builder(currentState); + // open index + final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index)) + .state(IndexMetadata.State.OPEN) + .build(); + + builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true)); + builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index)); + ClusterState updatedState = builder.build(); + RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + routingTable.addAsRecovery(updatedState.metadata().index(index)); + updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); + ClusterState state = allocationService.reroute(updatedState, "reroute"); + return state; + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(e.getMessage(), e); + } + }); + + ensureYellow(index); + assertTrue(waitUntil(() -> { + ClusterState state = clusterService.state(); + return state.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3; + + })); + + logger.info("Initializing shards"); + logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)); + + // trigger 2nd reroute after shard in initialized + clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return allocationService.reroute(currentState, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + + ensureYellow(index); + assertTrue(waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3)); + clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // remove the primary node of replica shard which is in init + ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0); + ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId()); + + ClusterState.Builder builder = ClusterState.builder(currentState); + builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId())); + currentState = builder.build(); + logger.info("removed the node {}", primaryShard.currentNodeId()); + logger.info("shard {}", next); + ClusterState state = allocationService.disassociateDeadNodes(currentState, true, "reroute"); + return state; + } + + @Override + public void onFailure(String source, Exception e) {} + }); + assertTrue(waitUntil(() -> { + ClusterState state = clusterService.state(); + logger.info("current state {} ", state); + return clusterService.state().nodes().getSize() == 3; + + })); + + logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)); + blockReplicaStart.set(false); + + clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterState.Builder builder = ClusterState.builder(currentState); + final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index)) + .state(IndexMetadata.State.OPEN) + .build(); + builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true)); + builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index)); + ClusterState updatedState = builder.build(); + RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + routingTable.addAsRecovery(updatedState.metadata().index(index)); + updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); + + return allocationService.reroute(updatedState, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + + ensureGreen(index); + } + + AtomicBoolean blockReplicaStart = new AtomicBoolean(false); + + private void blockShardStartedResponse(String master, ClusterService service) { + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); + primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> { + + if (blockReplicaStart.get()) { + ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request; + String stringRep = req.toString(); + logger.info("ShardStateAction.StartedShardEntry {}", stringRep); + + String incomingRequest = req.toString(); + Optional matchReplica = service.state() + .routingTable() + .allShardsSatisfyingPredicate(r -> !r.primary()) + .getShardRoutings() + .stream() + .filter(r -> r.allocationId() != null) + .filter(r -> incomingRequest.contains(r.allocationId().getId())) + .findAny(); + + if (matchReplica.isPresent()) { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } else { + handler.messageReceived(request, channel, task); + } + } else { + handler.messageReceived(request, channel, task); + } + }); + } + + @Override + protected Collection> nodePlugins() { + return List.of(MockTransportService.TestPlugin.class); + } } diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java index cb5749a91d448..3bafb31dfa222 100644 --- a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java @@ -843,6 +843,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { * @opensearch.internal */ public static class StartedShardEntry extends TransportRequest { + final ShardId shardId; final String allocationId; final long primaryTerm; diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 713de8cdd0fda..31bf2affe4a84 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -917,7 +917,7 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterSt throw new AssertionError("update task submitted to ClusterManagerService cannot remove cluster-manager"); } } catch (Exception e) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}", previousClusterState.version(), diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index c30ee8479ac97..9f4137b4d8172 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -101,7 +101,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch( RoutingNodes routingNodes = allocation.routingNodes(); ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); if (primaryShard == null) { - logger.trace("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard); + logger.info("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard); return null; } assert primaryShard.currentNodeId() != null; @@ -111,7 +111,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch( if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard - logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); + logger.info("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); return null; } diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 020a543ac5fc5..dc52f472b0c65 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -10,8 +10,10 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.NodeAllocationResult; @@ -33,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Allocates replica shards in a batch mode @@ -51,6 +54,10 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator { */ public void processExistingRecoveries(RoutingAllocation allocation, List> shardBatches) { List shardCancellationActions = new ArrayList<>(); + + Map initShardsFromRouting = allocation.routingTable().indicesRouting(); + Map> initShardsForIndex = new HashMap<>(); + // iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch for (List shardBatch : shardBatches) { List eligibleShards = new ArrayList<>(); @@ -58,6 +65,27 @@ public void processExistingRecoveries(RoutingAllocation allocation, List initShards = initShardsForIndex.putIfAbsent( + indexName, + initShardsFromRouting.get(shard.shardId().getIndexName()) + .shardsWithState(ShardRoutingState.INITIALIZING) + .stream() + .map(ShardRouting::shardId) + .collect(Collectors.toSet()) + ); + + if (initShards == null || !initShards.contains(shard.shardId())) { + logger.info("skipping the shardRouting {} as the state is updated in routing table", shard); + continue; + } + // need to iterate over all the nodes to find matching shard if (shouldSkipFetchForRecovery(shard)) { // shard should just be skipped for fetchData, no need to remove from batch