diff --git a/CHANGELOG.md b/CHANGELOG.md index 8456a4d466279..7ca7840800b60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679)) - Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819)) - Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776)) +- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 0ad2b511c4417..6c0940bfc4fd2 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -572,11 +572,27 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod assert existingNodes.isEmpty() == false; CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings()); - if (STRICT.equals(remoteStoreCompatibilityMode)) { - DiscoveryNode existingNode = existingNodes.get(0); + List<String> reposToSkip = new ArrayList<>(1); + Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream() + .filter( + node -> node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null + ) + .findFirst(); + // If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node. + // This ensures a new node with remote routing table repo is able to join the cluster. + if (remoteRoutingTableNode.isEmpty()) { + String joiningNodeRepoName = joiningNode.getAttributes() + .get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY); + if (joiningNodeRepoName != null) { + reposToSkip.add(joiningNodeRepoName); + } + } + + if (STRICT.equals(remoteStoreCompatibilityMode)) { + DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0)); if (joiningNode.isRemoteStoreNode()) { - ensureRemoteStoreNodesCompatibility(joiningNode, existingNode); + ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, reposToSkip); } else { if (existingNode.isRemoteStoreNode()) { throw new IllegalStateException( @@ -598,19 +614,25 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod throw new IllegalStateException(reason); } if (joiningNode.isRemoteStoreNode()) { - Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); - remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode)); + Optional<DiscoveryNode> remoteDN = remoteRoutingTableNode.isPresent() + ? remoteRoutingTableNode + : existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); + remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip)); } } } } - private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) { + private static void ensureRemoteStoreNodesCompatibility( + DiscoveryNode joiningNode, + DiscoveryNode existingNode, + List<String> reposToSkip + ) { if (joiningNode.isRemoteStoreNode()) { if (existingNode.isRemoteStoreNode()) { RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode); RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode); - if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) { + if (existingRemoteStoreNodeAttribute.equalsWithRepoSkip(joiningRemoteStoreNodeAttribute, reposToSkip) == false) { throw new IllegalStateException( "a remote store node [" + joiningNode diff --git a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java index 9b52bdd1b16c5..4b3dc7964a87b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java @@ -51,8 +51,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.List; +import java.util.stream.Collectors; import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; @@ -164,6 +166,40 @@ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetadata other) { return true; } + /** + * Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos. + * This will support + * @param other other repositories metadata + * @param reposToSkip list of repos to skip check for equality + * @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip. + */ + public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadata other, List<String> reposToSkip) { + if (other == null) { + return false; + } + List<RepositoryMetadata> currentRepositories = repositories.stream() + .filter(repo -> !reposToSkip.contains(repo.name())) + .collect(Collectors.toList()); + List<RepositoryMetadata> otherRepositories = other.repositories.stream() + .filter(repo -> !reposToSkip.contains(repo.name())) + .collect(Collectors.toList()); + + if (otherRepositories.size() != currentRepositories.size()) { + return false; + } + // Sort repos by name for ordered comparison + Comparator<RepositoryMetadata> compareByName = (o1, o2) -> o1.name().compareTo(o2.name()); + currentRepositories.sort(compareByName); + otherRepositories.sort(compareByName); + + for (int i = 0; i < currentRepositories.size(); i++) { + if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) { + return false; + } + } + return true; + } + @Override public int hashCode() { return repositories.hashCode(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java new file mode 100644 index 0000000000000..ba2208e17df1f --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.node.Node; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; +import java.util.function.Supplier; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; + +/** + * A Service which provides APIs to upload and download routing table from remote store. + * + * @opensearch.internal + */ +public class RemoteRoutingTableService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class); + private final Settings settings; + private final Supplier<RepositoriesService> repositoriesService; + private BlobStoreRepository blobStoreRepository; + + public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) { + assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; + this.repositoriesService = repositoriesService; + this.settings = settings; + } + + @Override + protected void doClose() throws IOException { + if (blobStoreRepository != null) { + IOUtils.close(blobStoreRepository); + } + } + + @Override + protected void doStart() { + assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled"; + final String remoteStoreRepo = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + assert remoteStoreRepo != null : "Remote routing table repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + + @Override + protected void doStop() {} + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/package-info.java b/server/src/main/java/org/opensearch/cluster/routing/remote/package-info.java new file mode 100644 index 0000000000000..9fe016e783f20 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Package containing class to perform operations on remote routing table */ +package org.opensearch.cluster.routing.remote; diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 7a364de1c5dc6..238df1bd90113 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -36,6 +36,7 @@ protected FeatureFlagSettings( FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, FeatureFlags.TIERED_REMOTE_INDEX_SETTING, FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, - FeatureFlags.PLUGGABLE_CACHE_SETTING + FeatureFlags.PLUGGABLE_CACHE_SETTING, + FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 62cfbd861d4d9..82f43921d2d28 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -67,6 +67,11 @@ public class FeatureFlags { */ public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled"; + /** + * Gates the functionality of remote routing table. + */ + public static final String REMOTE_PUBLICATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.publication.enabled"; + public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting( REMOTE_STORE_MIGRATION_EXPERIMENTAL, false, @@ -89,6 +94,12 @@ public class FeatureFlags { public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope); + public static final Setting<Boolean> REMOTE_PUBLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting( + REMOTE_PUBLICATION_EXPERIMENTAL, + false, + Property.NodeScope + ); + private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of( REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, EXTENSIONS_SETTING, @@ -96,7 +107,8 @@ public class FeatureFlags { TELEMETRY_SETTING, DATETIME_FORMATTER_CACHING_SETTING, TIERED_REMOTE_INDEX_SETTING, - PLUGGABLE_CACHE_SETTING + PLUGGABLE_CACHE_SETTING, + REMOTE_PUBLICATION_EXPERIMENTAL_SETTING ); /** * Should store the settings from opensearch.yml. diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 835d68d82c6b3..c0e745e43f9b9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -18,6 +18,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; @@ -68,6 +69,7 @@ import static java.util.Objects.requireNonNull; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -202,6 +204,7 @@ public class RemoteClusterStateService implements Closeable { private final List<IndexMetadataUploadListener> indexMetadataUploadListeners; private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; + private Optional<RemoteRoutingTableService> remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; private volatile TimeValue indexMetadataUploadTimeout; @@ -253,6 +256,9 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); this.indexMetadataUploadListeners = indexMetadataUploadListeners; + this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings) + ? Optional.of(new RemoteRoutingTableService(repositoriesService, settings)) + : Optional.empty(); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -749,6 +755,9 @@ public void close() throws IOException { if (blobStoreRepository != null) { IOUtils.close(blobStoreRepository); } + if (this.remoteRoutingTableService.isPresent()) { + this.remoteRoutingTableService.get().close(); + } } public void start() { @@ -760,6 +769,7 @@ public void start() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; + this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start); } private ClusterMetadataManifest uploadManifest( @@ -933,6 +943,11 @@ public TimeValue getMetadataManifestUploadTimeout() { return this.metadataManifestUploadTimeout; } + // Package private for unit test + Optional<RemoteRoutingTableService> getRemoteRoutingTableService() { + return this.remoteRoutingTableService; + } + static String getManifestFileName(long term, long version, boolean committed, int codecVersion) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version> return String.join( diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index b10ec0d99c3d5..a0f745a4270c4 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -28,6 +29,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; + /** * This is an abstraction for validating and storing information specific to remote backed storage nodes. * @@ -46,6 +49,8 @@ public class RemoteStoreNodeAttribute { + "." + CryptoMetadata.SETTINGS_KEY; public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings."; + public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing_table.repository"; + private final RepositoriesMetadata repositoriesMetadata; public static List<String> SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = List.of( @@ -157,6 +162,10 @@ private Set<String> getValidatedRepositoryNames(DiscoveryNode node) { } else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) { repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)); } + if (node.getAttributes().containsKey(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)) { + repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); + } + return repositoryNames; } @@ -187,6 +196,15 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { && isRemoteClusterStateAttributePresent(settings); } + private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { + return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) + .isEmpty() == false; + } + + public static boolean isRemoteRoutingTableEnabled(Settings settings) { + return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings); + } + public RepositoriesMetadata getRepositoriesMetadata() { return this.repositoriesMetadata; } @@ -231,6 +249,21 @@ public int hashCode() { return hashCode; } + /** + * Checks if 2 instances are equal, with option to skip check for a list of repos. + * * + * @param o other instance + * @param reposToSkip list of repos to skip check for equality + * @return {@code true} iff both instances are equal, not including the repositories in both instances if they are part of reposToSkip. + */ + public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RemoteStoreNodeAttribute that = (RemoteStoreNodeAttribute) o; + return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 8dc7f3a472b11..f8f3ce76ae325 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -73,6 +73,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; @@ -1026,6 +1027,145 @@ public void testNodeJoinInMixedMode() { JoinTaskExecutor.ensureNodesCompatibility(joiningNode2, currentNodes, metadata); } + public void testRemoteRoutingTableRepoAbsentNodeJoin() { + + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); + } + + public void testRemoteRoutingTableNodeJoinRepoPresentInJoiningNode() { + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO)); + DiscoveryNode joiningNode = newDiscoveryNode(attr); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); + } + + public void testRemoteRoutingTableNodeJoinRepoPresentInExistingNode() { + Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO)); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + attr, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + assertThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()) + ); + } + + public void testRemoteRoutingTableNodeJoinRepoPresentInBothNode() { + Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO)); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + attr, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(attr); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); + } + + public void testRemoteRoutingTableNodeJoinNodeWithRemoteAndRoutingRepoDifference() { + Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO)); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + attr, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final DiscoveryNode existingNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode2).add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(attr); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); + } + + public void testRemoteRoutingTableNodeJoinNodeWithRemoteAndRoutingRepoDifferenceMixedMode() { + Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO)); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + attr, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final DiscoveryNode existingNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final Settings settings = Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .build(); + final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode2).add(existingNode).localNodeId(existingNode.getId()).build()) + .metadata(metadata) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(attr); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); + } + private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories) throws Exception { @@ -1067,6 +1207,7 @@ private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) { private static final String TRANSLOG_REPO = "translog-repo"; private static final String CLUSTER_STATE_REPO = "cluster-state-repo"; private static final String COMMON_REPO = "remote-repo"; + private static final String ROUTING_TABLE_REPO = "routing-table-repo"; private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) { return remoteStoreNodeAttributes(segmentRepoName, translogRepoName, CLUSTER_STATE_REPO); @@ -1131,6 +1272,28 @@ private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) { }; } + private Map<String, String> remoteRoutingTableAttributes(String repoName) { + String routingTableRepositoryTypeAttributeKey = String.format( + Locale.getDefault(), + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + repoName + ); + String routingTableRepositorySettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + repoName + ); + + return new HashMap<>() { + { + put(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName); + putIfAbsent(routingTableRepositoryTypeAttributeKey, "s3"); + putIfAbsent(routingTableRepositorySettingsAttributeKeyPrefix + "bucket", "state_bucket"); + putIfAbsent(routingTableRepositorySettingsAttributeKeyPrefix + "base_path", "/state/path"); + } + }; + } + private void validateAttributes(Map<String, String> remoteStoreNodeAttributes, ClusterState currentState, DiscoveryNode existingNode) { DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes); Exception e = assertThrows( diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java new file mode 100644 index 0000000000000..9a9cbfa153259 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.remote; + +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.repositories.FilterRepository; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.function.Supplier; + +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { + + private RemoteRoutingTableService remoteRoutingTableService; + private Supplier<RepositoriesService> repositoriesServiceSupplier; + private RepositoriesService repositoriesService; + private BlobStoreRepository blobStoreRepository; + + @Before + public void setup() { + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + + Settings settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") + .build(); + + blobStoreRepository = mock(BlobStoreRepository.class); + when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); + + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + + remoteRoutingTableService = new RemoteRoutingTableService(repositoriesServiceSupplier, settings); + } + + @After + public void teardown() throws Exception { + super.tearDown(); + remoteRoutingTableService.close(); + } + + public void testFailInitializationWhenRemoteRoutingDisabled() { + final Settings settings = Settings.builder().build(); + assertThrows(AssertionError.class, () -> new RemoteRoutingTableService(repositoriesServiceSupplier, settings)); + } + + public void testFailStartWhenRepositoryNotSet() { + doThrow(new RepositoryMissingException("repository missing")).when(repositoriesService).repository("routing_repository"); + assertThrows(RepositoryMissingException.class, () -> remoteRoutingTableService.start()); + } + + public void testFailStartWhenNotBlobRepository() { + final FilterRepository filterRepository = mock(FilterRepository.class); + when(repositoriesService.repository("routing_repository")).thenReturn(filterRepository); + assertThrows(AssertionError.class, () -> remoteRoutingTableService.start()); + } + +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 1b242b921c0d7..aa5996d734d27 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -33,6 +33,7 @@ import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.ParseField; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; @@ -86,6 +87,7 @@ import org.mockito.ArgumentMatchers; import static java.util.stream.Collectors.toList; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; @@ -99,6 +101,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -1496,6 +1499,33 @@ public void testGlobalMetadataUploadWaitTimeSetting() { assertEquals(globalMetadataUploadTimeout, remoteClusterStateService.getGlobalMetadataUploadTimeout().seconds()); } + public void testRemoteRoutingTableNotInitializedWhenDisabled() { + assertFalse(remoteClusterStateService.getRemoteRoutingTableService().isPresent()); + } + + public void testRemoteRoutingTableInitializedWhenEnabled() { + Settings newSettings = Settings.builder() + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + clusterSettings.applySettings(newSettings); + + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + + remoteClusterStateService = new RemoteClusterStateService( + "test-node-id", + repositoriesServiceSupplier, + newSettings, + clusterSettings, + () -> 0L, + threadPool, + List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)) + ); + assertTrue(remoteClusterStateService.getRemoteRoutingTableService().isPresent()); + } + private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException { mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false, Collections.emptyMap()); } diff --git a/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java b/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java index c4ba271d27ae9..de7f8977686a7 100644 --- a/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java +++ b/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java @@ -19,6 +19,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.List; import java.util.Locale; import java.util.Map; @@ -28,6 +29,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_CRYPTO_SETTINGS_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -148,4 +150,77 @@ public void testNoCryptoMetadata() throws UnknownHostException { RepositoryMetadata repositoryMetadata = remoteStoreNodeAttribute.getRepositoriesMetadata().repositories().get(0); assertNull(repositoryMetadata.cryptoMetadata()); } + + public void testEqualsWithRepoSkip() throws UnknownHostException { + String repoName = "remote-store-A"; + String repoTypeSettingKey = String.format(Locale.ROOT, REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, repoName); + String repoSettingsKey = String.format(Locale.ROOT, REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, repoName); + Map<String, String> attr = Map.of( + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + repoTypeSettingKey, + "s3", + repoSettingsKey, + "abc", + repoSettingsKey + "base_path", + "xyz" + ); + DiscoveryNode node = new DiscoveryNode( + "C", + new TransportAddress(InetAddress.getByName("localhost"), 9876), + attr, + emptySet(), + Version.CURRENT + ); + + RemoteStoreNodeAttribute remoteStoreNodeAttribute = new RemoteStoreNodeAttribute(node); + + String routingTableRepoName = "remote-store-B"; + String routingTableRepoTypeSettingKey = String.format( + Locale.ROOT, + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + routingTableRepoName + ); + String routingTableRepoSettingsKey = String.format( + Locale.ROOT, + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + routingTableRepoName + ); + + Map<String, String> attr2 = Map.of( + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + repoName, + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, + routingTableRepoName, + repoTypeSettingKey, + "s3", + repoSettingsKey, + "abc", + repoSettingsKey + "base_path", + "xyz", + routingTableRepoTypeSettingKey, + "s3", + routingTableRepoSettingsKey, + "xyz" + ); + DiscoveryNode node2 = new DiscoveryNode( + "C", + new TransportAddress(InetAddress.getByName("localhost"), 9876), + attr2, + emptySet(), + Version.CURRENT + ); + RemoteStoreNodeAttribute remoteStoreNodeAttribute2 = new RemoteStoreNodeAttribute(node2); + + assertFalse(remoteStoreNodeAttribute.equalsWithRepoSkip(remoteStoreNodeAttribute2, List.of())); + assertTrue(remoteStoreNodeAttribute.equalsWithRepoSkip(remoteStoreNodeAttribute2, List.of(routingTableRepoName))); + } }