Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip Filter Allocation Decider during mixed mode for existing indices… #12960

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteMigrationAllocationDeciderIT extends MigrationBaseTestCase {

// When the primary is on doc rep node, existing replica copy can get allocated on excluded docrep node.
public void testFilterAllocationSkipsReplica() throws IOException {
addRemote = false;
List<String> docRepNodes = internalCluster().startNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")
.build()
);
ensureGreen("test");

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertTrue(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", String.join(",", docRepNodes)))
.execute()
.actionGet()
.isAcknowledged()
);
internalCluster().stopRandomDataNode();
ensureGreen("test");
}

// When the primary is on remote node, new replica copy shouldn't get allocated on an excluded docrep node.
public void testFilterAllocationSkipsReplicaOnExcludedNode() throws IOException {
addRemote = false;
List<String> nodes = internalCluster().startNodes(2);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")
.build()
);
ensureGreen("test");
addRemote = true;

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
String remoteNode = internalCluster().startNode();

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.execute()
.actionGet();
client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertEquals(remoteNode, primaryNodeName("test"));

assertTrue(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", String.join(",", nodes)))
.execute()
.actionGet()
.isAcknowledged()
);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName("test")));
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertTrue(clusterHealthResponse.isTimedOut());
ensureYellow("test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Map;

Expand Down Expand Up @@ -102,14 +104,32 @@
private volatile DiscoveryNodeFilters clusterRequireFilters;
private volatile DiscoveryNodeFilters clusterIncludeFilters;
private volatile DiscoveryNodeFilters clusterExcludeFilters;
private volatile RemoteStoreNodeService.Direction migrationDirection;
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode;
gbbafna marked this conversation as resolved.
Show resolved Hide resolved

public FilterAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.getAsMap(settings));
setClusterExcludeFilters(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getAsMap(settings));
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings);

clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
clusterSettings.addSettingsUpdateConsumer(RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, this::setMigrationDirection);
clusterSettings.addSettingsUpdateConsumer(
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
this::setCompatibilityMode
);
}

private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) {
this.migrationDirection = migrationDirection;
}

Check warning on line 129 in server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java#L128-L129

Added lines #L128 - L129 were not covered by tests

private void setCompatibilityMode(RemoteStoreNodeService.CompatibilityMode compatibilityMode) {
this.compatibilityMode = compatibilityMode;

Check warning on line 132 in server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java#L132

Added line #L132 was not covered by tests
}

@Override
Expand All @@ -127,10 +147,28 @@
"initial allocation of the shrunken index is only allowed on nodes [%s] that hold a copy of every shard in the index";
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
}

Decision decision = isRemoteStoreMigrationReplicaDecision(shardRouting, allocation);
if (decision != null) return decision;
}
return shouldFilter(shardRouting, node.node(), allocation);
}

public Decision isRemoteStoreMigrationReplicaDecision(ShardRouting shardRouting, RoutingAllocation allocation) {
assert shardRouting.unassigned();
boolean primaryOnRemote = RemoteStoreMigrationAllocationDecider.isPrimaryOnRemote(shardRouting.shardId(), allocation);
if (shardRouting.primary() == false
&& shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED
&& (compatibilityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED))
&& (migrationDirection.equals(RemoteStoreNodeService.Direction.REMOTE_STORE))
&& primaryOnRemote == false) {
String explanation =
"in remote store migration, allocation filters are not applicable for replica copies whose primary is on doc rep node";
return allocation.decision(Decision.YES, NAME, explanation);
}
return null;
}

@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetadata, node.node(), allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import org.opensearch.node.remotestore.RemoteStoreNodeService.Direction;
Expand All @@ -60,9 +61,8 @@ public class RemoteStoreMigrationAllocationDecider extends AllocationDecider {

public static final String NAME = "remote_store_migration";

private Direction migrationDirection;
private CompatibilityMode compatibilityMode;
private boolean remoteStoreBackedIndex;
volatile private Direction migrationDirection;
volatile private CompatibilityMode compatibilityMode;

public RemoteStoreMigrationAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
Expand Down Expand Up @@ -106,9 +106,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

// check for remote store backed indices
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
if (IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.exists(indexMetadata.getSettings())) {
remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());
}
boolean remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());
if (remoteStoreBackedIndex && targetNode.isRemoteStoreNode() == false) {
// allocations and relocations must be to a remote node
String reason = String.format(
Expand All @@ -133,15 +131,20 @@ private Decision primaryShardDecision(ShardRouting primaryShardRouting, Discover
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, primaryShardRouting, targetNode, ""));
}

// Checks if primary shard is on a remote node.
static boolean isPrimaryOnRemote(ShardId shardId, RoutingAllocation allocation) {
ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(shardId);
if (primaryShardRouting != null) {
DiscoveryNode primaryShardNode = allocation.nodes().getNodes().get(primaryShardRouting.currentNodeId());
return primaryShardNode.isRemoteStoreNode();
}
return false;
}

private Decision replicaShardDecision(ShardRouting replicaShardRouting, DiscoveryNode targetNode, RoutingAllocation allocation) {
if (targetNode.isRemoteStoreNode()) {
ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(replicaShardRouting.shardId());
boolean primaryHasMigratedToRemote = false;
if (primaryShardRouting != null) {
DiscoveryNode primaryShardNode = allocation.nodes().getNodes().get(primaryShardRouting.currentNodeId());
primaryHasMigratedToRemote = primaryShardNode.isRemoteStoreNode();
}
if (primaryHasMigratedToRemote == false) {
boolean primaryOnRemote = RemoteStoreMigrationAllocationDecider.isPrimaryOnRemote(replicaShardRouting.shardId(), allocation);
if (primaryOnRemote == false) {
return allocation.decision(
Decision.NO,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand All @@ -50,6 +51,8 @@
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.test.gateway.TestGatewayAllocator;

Expand All @@ -61,6 +64,9 @@
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;

public class FilterAllocationDeciderTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -406,4 +412,57 @@ public void testWildcardIPFilter() {
"test ip validation"
);
}

public void testMixedModeRemoteStoreAllocation() {
// For mixed mode remote store direction cluster's existing indices replica creation ,
// we don't consider filter allocation decider for replica of existing indices
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Settings initialSettings = Settings.builder()
.put("cluster.routing.allocation.exclude._id", "node2")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED)
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
.build();

FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings);
AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(
filterAllocationDecider,
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider()
)
);
AllocationService service = new AllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY);
RoutingTable routingTable = state.routingTable();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);
ShardRouting sr = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
);
Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
sr,
state.getRoutingNodes().node("node2"),
allocation
);
assertEquals(decision.toString(), Type.YES, decision.type());

sr = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);
decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
}
}
Loading