diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index e7189457c2398..dd4a1fe9a5b95 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -139,7 +139,7 @@ import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; /** * Service responsible for submitting create index requests @@ -978,7 +978,7 @@ private static void updateReplicationStrategy( indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings); } else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) { indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings); - } else if (isRemoteStoreAttributePresent(clusterSettings)) { + } else if (isRemoteDataAttributePresent(clusterSettings)) { indexReplicationType = ReplicationType.SEGMENT; } else { indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.getDefault(clusterSettings); @@ -992,7 +992,7 @@ private static void updateReplicationStrategy( * @param clusterSettings cluster level settings */ private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) { - if (isRemoteStoreAttributePresent(clusterSettings)) { + if (isRemoteDataAttributePresent(clusterSettings)) { settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true) .put( SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, @@ -1603,7 +1603,7 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu * @param clusterSettings cluster setting */ static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) { - if (isRemoteStoreAttributePresent(settings) == false + if (isRemoteDataAttributePresent(settings) == false || IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false || clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) { return; diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 7b2a6c34d3db6..7575c6ff5fb34 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -131,12 +131,8 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na } private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) { + Set repositoryNames = getValidatedRepositoryNames(node); List repositoryMetadataList = new ArrayList<>(); - Set repositoryNames = new HashSet<>(); - - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)); - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)); - repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)); for (String repositoryName : repositoryNames) { repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName)); @@ -145,12 +141,36 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) { return new RepositoriesMetadata(repositoryMetadataList); } + private Set getValidatedRepositoryNames(DiscoveryNode node) { + Set repositoryNames = new HashSet<>(); + if (node.getAttributes().containsKey(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) + || node.getAttributes().containsKey(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) { + repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)); + repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)); + repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)); + } else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) { + repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)); + } + return repositoryNames; + } + public static boolean isRemoteStoreAttributePresent(Settings settings) { return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false; } + public static boolean isRemoteDataAttributePresent(Settings settings) { + return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false + || settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false; + } + + public static boolean isRemoteClusterStateAttributePresent(Settings settings) { + return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY) + .isEmpty() == false; + } + public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { - return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings); + return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) + && isRemoteClusterStateAttributePresent(settings); } public RepositoriesMetadata getRepositoriesMetadata() { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index b4355b9328c98..76644e0309d18 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -455,7 +455,8 @@ public void testJoinClusterWithNonRemoteStoreNodeJoining() { } public void testJoinClusterWithRemoteStoreNodeJoining() { - DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + Map map = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + DiscoveryNode joiningNode = newDiscoveryNode(map); ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) .nodes(DiscoveryNodes.builder().add(joiningNode).build()) .build(); @@ -660,12 +661,94 @@ public void testPreventJoinClusterWithRemoteStoreNodeWithPartialAttributesJoinin ); assertTrue( e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]") + || e.getMessage() + .equals( + "a remote store node [" + + joiningNode + + "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node [" + + currentState.getNodes().getNodes().values().stream().findFirst().get() + + "]" + ) ); remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue()); } } + public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() { + Map existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + existingNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO)); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); + } + + public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() { + Map existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + existingNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO)); + Exception e = assertThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()) + ); + assertTrue( + e.getMessage() + .equals( + "a remote store node [" + + joiningNode + + "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node [" + + currentState.getNodes().getNodes().values().stream().findFirst().get() + + "]" + ) + ); + } + + public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() { + Map existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + existingNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + Exception e = assertThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()) + ); + assertTrue( + e.getMessage() + .equals( + "a remote store node [" + + joiningNode + + "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node [" + + currentState.getNodes().getNodes().values().stream().findFirst().get() + + "]" + ) + ); + } + public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception { Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); final AllocationService allocationService = mock(AllocationService.class); @@ -951,6 +1034,23 @@ private Map remoteStoreNodeAttributes(String segmentRepoName, St REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, translogRepoName ); + + return new HashMap<>() { + { + put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName); + put(segmentRepositoryTypeAttributeKey, "s3"); + put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket"); + put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path"); + put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName); + putIfAbsent(translogRepositoryTypeAttributeKey, "s3"); + putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket"); + putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path"); + putAll(remoteStateNodeAttributes(clusterStateRepo)); + } + }; + } + + private Map remoteStateNodeAttributes(String clusterStateRepo) { String clusterStateRepositoryTypeAttributeKey = String.format( Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, @@ -964,14 +1064,6 @@ private Map remoteStoreNodeAttributes(String segmentRepoName, St return new HashMap<>() { { - put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName); - put(segmentRepositoryTypeAttributeKey, "s3"); - put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket"); - put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path"); - put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName); - putIfAbsent(translogRepositoryTypeAttributeKey, "s3"); - putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket"); - putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path"); put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, clusterStateRepo); putIfAbsent(clusterStateRepositoryTypeAttributeKey, "s3"); putIfAbsent(clusterStateRepositorySettingsAttributeKeyPrefix + "bucket", "state_bucket"); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 131db733d92fe..2788b5aef0419 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -1910,7 +1910,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() { request, Settings.EMPTY, null, - Settings.builder().put("node.attr.remote_store.setting", "test").build(), + Settings.builder().put("node.attr.remote_store.segment.repository", "test").build(), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), Collections.emptySet(),