Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Apr 4, 2024
1 parent 03821d5 commit 94a2f89
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteMigrationAllocationDeciderIT extends MigrationBaseTestCase {

// When the primary is on doc rep node, existing replica copy can get allocated on excluded docrep node.
public void testFilterAllocationSkipsReplica() throws IOException {
addRemote = false;
List<String> docRepNodes = internalCluster().startNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")
.build()
);
ensureGreen("test");

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertTrue(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", String.join(",", docRepNodes)))
.execute()
.actionGet()
.isAcknowledged()
);
internalCluster().stopRandomDataNode();
ensureGreen("test");
}

// When the primary is on remote node, new replica copy shouldn't get allocated on an excluded docrep node.
public void testFilterAllocationSkipsReplicaOnExcludedNode() throws IOException {
addRemote = false;
List<String> nodes = internalCluster().startNodes(2);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")
.build()
);
ensureGreen("test");
addRemote = true;

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
String remoteNode = internalCluster().startNode();

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.execute()
.actionGet();
client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertEquals(remoteNode, primaryNodeName("test"));

assertTrue(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", String.join(",", nodes)))
.execute()
.actionGet()
.isAcknowledged()
);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName("test")));
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertTrue(clusterHealthResponse.isTimedOut());
ensureYellow("test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,22 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
}

Decision decision = isMixedModeReplicaDecision(shardRouting, allocation);
Decision decision = isRemoteStoreMigrationReplicaDecision(shardRouting, allocation);
if (decision != null) return decision;
}
return shouldFilter(shardRouting, node.node(), allocation);
}

public Decision isMixedModeReplicaDecision(ShardRouting shardRouting, RoutingAllocation allocation) {
public Decision isRemoteStoreMigrationReplicaDecision(ShardRouting shardRouting, RoutingAllocation allocation) {
assert shardRouting.unassigned();
boolean primaryOnRemote = RemoteStoreMigrationAllocationDecider.isPrimaryOnRemote(shardRouting.shardId(), allocation);
if (shardRouting.primary() == false
&& shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED
&& (compatibilityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED))
&& (migrationDirection.equals(RemoteStoreNodeService.Direction.REMOTE_STORE))) {
&& (migrationDirection.equals(RemoteStoreNodeService.Direction.REMOTE_STORE))
&& primaryOnRemote == false) {
String explanation =
"in mixed mode with remote store direction, allocation filters are not applicable for existing replica copies";
"in remote store migration, allocation filters are not applicable for replica copies whose primary is on doc rep node";
return allocation.decision(Decision.YES, NAME, explanation);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import org.opensearch.node.remotestore.RemoteStoreNodeService.Direction;
Expand Down Expand Up @@ -130,15 +131,20 @@ private Decision primaryShardDecision(ShardRouting primaryShardRouting, Discover
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, primaryShardRouting, targetNode, ""));
}

// Checks if primary shard is on a remote node.
static boolean isPrimaryOnRemote(ShardId shardId, RoutingAllocation allocation) {
ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(shardId);
if (primaryShardRouting != null) {
DiscoveryNode primaryShardNode = allocation.nodes().getNodes().get(primaryShardRouting.currentNodeId());
return primaryShardNode.isRemoteStoreNode();
}
return false;
}

private Decision replicaShardDecision(ShardRouting replicaShardRouting, DiscoveryNode targetNode, RoutingAllocation allocation) {
if (targetNode.isRemoteStoreNode()) {
ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(replicaShardRouting.shardId());
boolean primaryHasMigratedToRemote = false;
if (primaryShardRouting != null) {
DiscoveryNode primaryShardNode = allocation.nodes().getNodes().get(primaryShardRouting.currentNodeId());
primaryHasMigratedToRemote = primaryShardNode.isRemoteStoreNode();
}
if (primaryHasMigratedToRemote == false) {
boolean primaryOnRemote = RemoteStoreMigrationAllocationDecider.isPrimaryOnRemote(replicaShardRouting.shardId(), allocation);
if (primaryOnRemote == false) {
return allocation.decision(
Decision.NO,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ public void testMixedModeRemoteStoreAllocation() {
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);
ShardRouting sr = ShardRouting.newUnassigned(
routingTable.index("idx").shard(0).shardId(),
routingTable.index("sourceIndex").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
Expand All @@ -457,7 +457,7 @@ public void testMixedModeRemoteStoreAllocation() {
assertEquals(decision.toString(), Type.YES, decision.type());

sr = ShardRouting.newUnassigned(
routingTable.index("idx").shard(0).shardId(),
routingTable.index("sourceIndex").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
Expand Down

0 comments on commit 94a2f89

Please sign in to comment.