Skip to content

Commit

Permalink
Skip Filter Allocation Decider during mixed mode for existing indices…
Browse files Browse the repository at this point in the history
…' replica copies

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Mar 28, 2024
1 parent 8ad0dc0 commit e55ec77
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Map;

Expand Down Expand Up @@ -102,14 +104,32 @@ public class FilterAllocationDecider extends AllocationDecider {
private volatile DiscoveryNodeFilters clusterRequireFilters;
private volatile DiscoveryNodeFilters clusterIncludeFilters;
private volatile DiscoveryNodeFilters clusterExcludeFilters;
private volatile RemoteStoreNodeService.Direction migrationDirection;
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode;

public FilterAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.getAsMap(settings));
setClusterExcludeFilters(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getAsMap(settings));
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings);

clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
clusterSettings.addSettingsUpdateConsumer(RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, this::setMigrationDirection);
clusterSettings.addSettingsUpdateConsumer(
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
this::setCompatibilityMode
);
}

private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) {
this.migrationDirection = migrationDirection;
}

private void setCompatibilityMode(RemoteStoreNodeService.CompatibilityMode compatibilityMode) {
this.compatibilityMode = compatibilityMode;
}

@Override
Expand All @@ -127,10 +147,26 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
"initial allocation of the shrunken index is only allowed on nodes [%s] that hold a copy of every shard in the index";
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
}

Decision decision = isMixedModeReplicaDecision(shardRouting, allocation);
if (decision != null) return decision;
}
return shouldFilter(shardRouting, node.node(), allocation);
}

public Decision isMixedModeReplicaDecision(ShardRouting shardRouting, RoutingAllocation allocation) {
assert shardRouting.unassigned();
if (shardRouting.primary() == false
&& shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED
&& (compatibilityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED))
&& (migrationDirection.equals(RemoteStoreNodeService.Direction.REMOTE_STORE))) {
String explanation =
"in mixed mode with remote store direction, allocation filters are not applicable for existing replica copies";
return allocation.decision(Decision.YES, NAME, explanation);
}
return null;
}

@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetadata, node.node(), allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ public class RemoteStoreMigrationAllocationDecider extends AllocationDecider {

public static final String NAME = "remote_store_migration";

private Direction migrationDirection;
private CompatibilityMode compatibilityMode;
private boolean remoteStoreBackedIndex;
volatile private Direction migrationDirection;
volatile private CompatibilityMode compatibilityMode;

public RemoteStoreMigrationAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
Expand Down Expand Up @@ -106,9 +105,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

// check for remote store backed indices
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
if (IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.exists(indexMetadata.getSettings())) {
remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());
}
boolean remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());
if (remoteStoreBackedIndex && targetNode.isRemoteStoreNode() == false) {
// allocations and relocations must be to a remote node
String reason = String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand All @@ -50,6 +51,8 @@
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.test.gateway.TestGatewayAllocator;

Expand All @@ -61,6 +64,9 @@
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;

public class FilterAllocationDeciderTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -406,4 +412,57 @@ public void testWildcardIPFilter() {
"test ip validation"
);
}

public void testMixedModeRemoteStoreAllocation() {
// For mixed mode remote store direction cluster's existing indices replica creation ,
// we don't consider filter allocation decider for replica of existing indices
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Settings initialSettings = Settings.builder()
.put("cluster.routing.allocation.exclude._id", "node2")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED)
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
.build();

FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings);
AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(
filterAllocationDecider,
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider()
)
);
AllocationService service = new AllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY);
RoutingTable routingTable = state.routingTable();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);
ShardRouting sr = ShardRouting.newUnassigned(
routingTable.index("idx").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
);
Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
sr,
state.getRoutingNodes().node("node2"),
allocation
);
assertEquals(decision.toString(), Type.YES, decision.type());

sr = ShardRouting.newUnassigned(
routingTable.index("idx").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);
decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
}
}

0 comments on commit e55ec77

Please sign in to comment.