Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for reroute operation during node-left #16468

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand All @@ -48,29 +49,39 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.index.Index;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexService;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.BlockClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -409,4 +420,172 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
}

public void testDisassociateNodesWhileShardInit() throws InterruptedException {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(
Settings.builder()
.put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
String node2 = internalCluster().startDataOnlyNode(
Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()
);

final ClusterService clusterService = internalCluster().clusterService(clusterManagerName);
blockShardStartedResponse(clusterManagerName, clusterService);

final String index = "index";

// create index with 3 primary and 1 replica each
prepareCreate(index).setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
// .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
).get();
ensureGreen(index);

// close to have some unassigned started shards shards..
client().admin().indices().prepareClose(index).get();

// block so that replicas are always in init and not started
blockReplicaStart.set(true);
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName);
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
// open index
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();

builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
ClusterState state = allocationService.reroute(updatedState, "reroute");
return state;
}

@Override
public void onFailure(String source, Exception e) {
logger.error(e.getMessage(), e);
}
});

ensureYellow(index);
assertTrue(waitUntil(() -> {
ClusterState state = clusterService.state();
return state.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3;

}));

logger.info("Initializing shards");
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));

// trigger 2nd reroute after shard in initialized
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return allocationService.reroute(currentState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});

ensureYellow(index);
assertTrue(waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3));
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// remove the primary node of replica shard which is in init
ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0);
ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId());

ClusterState.Builder builder = ClusterState.builder(currentState);
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId()));
currentState = builder.build();
logger.info("removed the node {}", primaryShard.currentNodeId());
logger.info("shard {}", next);
ClusterState state = allocationService.disassociateDeadNodes(currentState, true, "reroute");
return state;
}

@Override
public void onFailure(String source, Exception e) {}
});
assertTrue(waitUntil(() -> {
ClusterState state = clusterService.state();
logger.info("current state {} ", state);
return clusterService.state().nodes().getSize() == 3;

}));

logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
blockReplicaStart.set(false);

clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();

return allocationService.reroute(updatedState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});

ensureGreen(index);
}

AtomicBoolean blockReplicaStart = new AtomicBoolean(false);

private void blockShardStartedResponse(String master, ClusterService service) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> {

if (blockReplicaStart.get()) {
ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request;
String stringRep = req.toString();
logger.info("ShardStateAction.StartedShardEntry {}", stringRep);

String incomingRequest = req.toString();
Optional<ShardRouting> matchReplica = service.state()
.routingTable()
.allShardsSatisfyingPredicate(r -> !r.primary())
.getShardRoutings()
.stream()
.filter(r -> r.allocationId() != null)
.filter(r -> incomingRequest.contains(r.allocationId().getId()))
.findAny();

if (matchReplica.isPresent()) {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} else {
handler.messageReceived(request, channel, task);
}
} else {
handler.messageReceived(request, channel, task);
}
});
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,13 @@ private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integ
for (Map.Entry<ShardId, V> shardData : shardDataFromNode.entrySet()) {
if (shardData.getValue() != null) {
ShardId shardId = shardData.getKey();
if (emptyShardResponsePredicate.test(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
if (shardIdKey.get(shardId) != null) {// the response might be for shard which is no longer present in cache
if (emptyShardResponsePredicate.test(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
Expand Down Expand Up @@ -51,13 +52,25 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
*/
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
List<Runnable> shardCancellationActions = new ArrayList<>();
Map<ShardId, List<ShardRouting>> initReplicasFromRouting = new HashMap<>();
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).stream().filter(r -> !r.primary()).forEach(r -> {
initReplicasFromRouting.putIfAbsent(r.shardId(), new ArrayList<>());
initReplicasFromRouting.get(r.shardId()).add(r);
});

// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
for (List<ShardRouting> shardBatch : shardBatches) {
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
// iterate over shards to check for match for each of those
for (ShardRouting shard : shardBatch) {
if (shard != null && !shard.primary()) {
// check if the shard is in Initializing state in RoutingTable
// as the batch is not refreshed yet
if (!initReplicasFromRouting.containsKey(shard.shardId())) {
logger.trace("skipping the shardRouting {} as the state is updated in routing table", shard);
continue;
}
// need to iterate over all the nodes to find matching shard
if (shouldSkipFetchForRecovery(shard)) {
// shard should just be skipped for fetchData, no need to remove from batch
Expand All @@ -72,11 +85,19 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<List<Sh
continue; // still fetching
}
for (ShardRouting shard : eligibleShards) {
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState);

Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(shard, allocation, nodeShardStores);
if (cancellationAction != null) {
shardCancellationActions.add(cancellationAction);
for (ShardRouting initShardsFromAllocation : initReplicasFromRouting.get(shard.shardId())) {
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(
initShardsFromAllocation,
shardState
);
Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(
initShardsFromAllocation,
allocation,
nodeShardStores
);
if (cancellationAction != null) {
shardCancellationActions.add(cancellationAction);
}
}
}
}
Expand Down
Loading