From 61d4d43c8a9af909dbdd44058b845e91b8a98f4b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 28 Aug 2023 17:36:06 -0700 Subject: [PATCH] [Segment Replication] Add ClusterState utility to identify SEGMENT replication (#9593) * [Segment Replication] Add ClusterState utility to identify SEGMENT replication Signed-off-by: Suraj Singh * Address review comment Signed-off-by: Suraj Singh * Address review comments Signed-off-by: Suraj Singh --------- Signed-off-by: Suraj Singh --- .../action/get/TransportGetAction.java | 14 +------ .../org/opensearch/cluster/ClusterState.java | 16 ++++++++ .../opensearch/cluster/ClusterStateTests.java | 37 ++++++++++++++++++- 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 583815b91ae68..0c444732fb12b 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -36,7 +36,6 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.single.shard.TransportSingleShardAction; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; @@ -49,12 +48,10 @@ import org.opensearch.index.get.GetResult; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Optional; /** * Performs the get operation. @@ -92,20 +89,11 @@ protected boolean resolveIndex(GetRequest request) { return true; } - static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) { - return Optional.ofNullable(state.getMetadata().index(indexName)) - .map( - indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) - .equals(ReplicationType.SEGMENT) - ) - .orElse(false); - } - /** * Returns true if GET request should be routed to primary shards, else false. */ protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { - return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null; + return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index 1b87a60c2ccf5..2fd58d3db4975 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -61,6 +61,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.Discovery; +import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.Collections; @@ -409,6 +410,21 @@ public boolean supersedes(ClusterState other) { } + /** + * Utility to identify whether input index belongs to SEGMENT replication in established cluster state. + * + * @param indexName Index name + * @return true if index belong SEGMENT replication, false otherwise + */ + public boolean isSegmentReplicationEnabled(String indexName) { + return Optional.ofNullable(this.getMetadata().index(indexName)) + .map( + indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) + .equals(ReplicationType.SEGMENT) + ) + .orElse(false); + } + /** * Metrics for cluster state. * diff --git a/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java b/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java index 63fe65d70d020..c4fb3271ae3ce 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java @@ -57,6 +57,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.TestCustomMetadata; @@ -73,6 +74,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -84,7 +86,7 @@ public void testSupersedes() { final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version); final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version); final DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).build(); - ClusterName name = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY); + ClusterName name = CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY); ClusterState noClusterManager1 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build(); ClusterState noClusterManager2 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build(); ClusterState withClusterManager1a = ClusterState.builder(name) @@ -115,6 +117,39 @@ public void testSupersedes() { ); } + public void testIsSegmentReplicationEnabled() { + final String indexName = "test"; + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); + Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(builder) + .numberOfShards(1) + .numberOfReplicas(1); + Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build()); + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + assertTrue(clusterState.isSegmentReplicationEnabled(indexName)); + } + + public void testIsSegmentReplicationDisabled() { + final String indexName = "test"; + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1); + Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build()); + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + assertFalse(clusterState.isSegmentReplicationEnabled(indexName)); + } + public void testBuilderRejectsNullCustom() { final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT); final String key = randomAlphaOfLength(10);