From e03b7015b32e43286c471a4a7e60fa44ace8ba56 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Thu, 31 Aug 2023 11:49:44 +0530 Subject: [PATCH] [Remote State] Create service to publish cluster state to remote store (#9160) * Upload all index metadata to remote store using BlobStoreRepository interface Signed-off-by: Sooraj Sinha Signed-off-by: Ivan Brusic --- CHANGELOG.md | 1 + .../common/settings/ClusterSettings.java | 7 +- .../opensearch/gateway/GatewayMetaState.java | 138 +++++- .../remote/ClusterMetadataManifest.java | 446 ++++++++++++++++++ .../remote/RemoteClusterStateService.java | 370 +++++++++++++++ .../gateway/remote/package-info.java | 12 + .../blobstore/BlobStoreRepository.java | 4 + .../GatewayMetaStatePersistedStateTests.java | 72 +++ .../remote/ClusterMetadataManifestTests.java | 149 ++++++ .../RemoteClusterStateServiceTests.java | 254 ++++++++++ 10 files changed, 1443 insertions(+), 10 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/package-info.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff22076f5530..e65cc30aecd7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081)) - Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513)) - Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379)) +- [Remote State] Create service to publish cluster state to remote store ([#9160](https://github.com/opensearch-project/OpenSearch/pull/9160)) - [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520)) - Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622)) - Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 6cb4992932b8e..e00e7e3bf4ea7 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -97,6 +97,7 @@ import org.opensearch.gateway.DanglingIndicesState; import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; @@ -664,7 +665,11 @@ public void apply(Settings value, Settings current, Settings previous) { // Related to monitoring of task cancellation TaskCancellationMonitoringSettings.IS_ENABLED_SETTING, - TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING + TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING, + + // Remote cluster state settings + RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index af894bdbc117e..02f1e5049b95c 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -60,8 +60,11 @@ import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.io.IOUtils; import org.opensearch.env.NodeMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.plugins.MetadataUpgrader; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -84,19 +87,19 @@ /** * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts. * - * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that - * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link - * ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and - * non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster. + * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being + * loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be + * stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes + * receive the real cluster state from the elected cluster-manager after joining the cluster. * * @opensearch.internal */ public class GatewayMetaState implements Closeable { /** - * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially - * stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is - * restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state. + * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since + * it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a + * cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state. */ public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG"; @@ -234,8 +237,8 @@ Metadata upgradeMetadataForNode( } /** - * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current - * version. The MetadataIndexUpgradeService might also update obsolete settings if needed. + * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService + * might also update obsolete settings if needed. * * @return input metadata if no upgrade is needed or an upgraded metadata */ @@ -599,4 +602,121 @@ public void close() throws IOException { IOUtils.close(persistenceWriter.getAndSet(null)); } } + + /** + * Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}. + */ + public static class RemotePersistedState implements PersistedState { + + private static final Logger logger = LogManager.getLogger(RemotePersistedState.class); + + private ClusterState lastAcceptedState; + private ClusterMetadataManifest lastAcceptedManifest; + private final RemoteClusterStateService remoteClusterStateService; + + public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) { + this.remoteClusterStateService = remoteClusterStateService; + } + + @Override + public long getCurrentTerm() { + return lastAcceptedState != null ? lastAcceptedState.term() : 0L; + } + + @Override + public ClusterState getLastAcceptedState() { + return lastAcceptedState; + } + + @Override + public void setCurrentTerm(long currentTerm) { + // no-op + // For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes. + // But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required. + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + try { + if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. + logger.info("Cluster is not yet ready to publish state to remote store"); + lastAcceptedState = clusterState; + return; + } + final ClusterMetadataManifest manifest; + if (shouldWriteFullClusterState(clusterState)) { + manifest = remoteClusterStateService.writeFullMetadata(clusterState); + } else { + assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true + : "Previous manifest and previous ClusterState are not in sync"; + manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); + } + assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync"; + lastAcceptedManifest = manifest; + lastAcceptedState = clusterState; + } catch (RepositoryMissingException e) { + // TODO This logic needs to be modified once PR for repo registration during bootstrap is pushed + // https://github.com/opensearch-project/OpenSearch/pull/9105/ + // After the above PR is pushed, we can remove this silent failure and throw the exception instead. + logger.error("Remote repository is not yet registered"); + lastAcceptedState = clusterState; + } catch (Exception e) { + handleExceptionOnWrite(e); + } + } + + private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) { + assert manifest != null : "ClusterMetadataManifest is null"; + assert clusterState != null : "ClusterState is null"; + assert clusterState.metadata().indices().size() == manifest.getIndices().size() + : "Number of indices in last accepted state and manifest are different"; + manifest.getIndices().stream().forEach(md -> { + assert clusterState.metadata().indices().containsKey(md.getIndexName()) + : "Last accepted state does not contain the index : " + md.getIndexName(); + assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID()) + : "Last accepted state and manifest do not have same UUID for index : " + md.getIndexName(); + }); + return true; + } + + private boolean shouldWriteFullClusterState(ClusterState clusterState) { + if (lastAcceptedState == null + || lastAcceptedManifest == null + || lastAcceptedState.term() != clusterState.term() + || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) { + return true; + } + return false; + } + + @Override + public void markLastAcceptedStateAsCommitted() { + try { + if (lastAcceptedState == null + || lastAcceptedManifest == null + || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. + logger.trace("Cluster is not yet ready to publish state to remote store"); + return; + } + final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted( + lastAcceptedState, + lastAcceptedManifest + ); + lastAcceptedManifest = committedManifest; + } catch (Exception e) { + handleExceptionOnWrite(e); + } + } + + @Override + public void close() throws IOException { + remoteClusterStateService.close(); + } + + private void handleExceptionOnWrite(Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java new file mode 100644 index 0000000000000..cac77f9996438 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -0,0 +1,446 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.Version; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Manifest file which contains the details of the uploaded entity metadata + * + * @opensearch.internal + */ +public class ClusterMetadataManifest implements Writeable, ToXContentFragment { + + private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); + private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); + private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); + private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid"); + private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version"); + private static final ParseField NODE_ID_FIELD = new ParseField("node_id"); + private static final ParseField COMMITTED_FIELD = new ParseField("committed"); + private static final ParseField INDICES_FIELD = new ParseField("indices"); + + private static long term(Object[] fields) { + return (long) fields[0]; + } + + private static long version(Object[] fields) { + return (long) fields[1]; + } + + private static String clusterUUID(Object[] fields) { + return (String) fields[2]; + } + + private static String stateUUID(Object[] fields) { + return (String) fields[3]; + } + + private static Version opensearchVersion(Object[] fields) { + return Version.fromId((int) fields[4]); + } + + private static String nodeId(Object[] fields) { + return (String) fields[5]; + } + + private static boolean committed(Object[] fields) { + return (boolean) fields[6]; + } + + private static List indices(Object[] fields) { + return (List) fields[7]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> new ClusterMetadataManifest( + term(fields), + version(fields), + clusterUUID(fields), + stateUUID(fields), + opensearchVersion(fields), + nodeId(fields), + committed(fields), + indices(fields) + ) + ); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD); + PARSER.declareObjectArray( + ConstructingObjectParser.constructorArg(), + (p, c) -> UploadedIndexMetadata.fromXContent(p), + INDICES_FIELD + ); + } + + private final List indices; + private final long clusterTerm; + private final long stateVersion; + private final String clusterUUID; + private final String stateUUID; + private final Version opensearchVersion; + private final String nodeId; + private final boolean committed; + + public List getIndices() { + return indices; + } + + public long getClusterTerm() { + return clusterTerm; + } + + public long getStateVersion() { + return stateVersion; + } + + public String getClusterUUID() { + return clusterUUID; + } + + public String getStateUUID() { + return stateUUID; + } + + public Version getOpensearchVersion() { + return opensearchVersion; + } + + public String getNodeId() { + return nodeId; + } + + public boolean isCommitted() { + return committed; + } + + public ClusterMetadataManifest( + long clusterTerm, + long version, + String clusterUUID, + String stateUUID, + Version opensearchVersion, + String nodeId, + boolean committed, + List indices + ) { + this.clusterTerm = clusterTerm; + this.stateVersion = version; + this.clusterUUID = clusterUUID; + this.stateUUID = stateUUID; + this.opensearchVersion = opensearchVersion; + this.nodeId = nodeId; + this.committed = committed; + this.indices = Collections.unmodifiableList(indices); + } + + public ClusterMetadataManifest(StreamInput in) throws IOException { + this.clusterTerm = in.readVLong(); + this.stateVersion = in.readVLong(); + this.clusterUUID = in.readString(); + this.stateUUID = in.readString(); + this.opensearchVersion = Version.fromId(in.readInt()); + this.nodeId = in.readString(); + this.committed = in.readBoolean(); + this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(ClusterMetadataManifest manifest) { + return new Builder(manifest); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(CLUSTER_TERM_FIELD.getPreferredName(), getClusterTerm()) + .field(STATE_VERSION_FIELD.getPreferredName(), getStateVersion()) + .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()) + .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()) + .field(OPENSEARCH_VERSION_FIELD.getPreferredName(), getOpensearchVersion().id) + .field(NODE_ID_FIELD.getPreferredName(), getNodeId()) + .field(COMMITTED_FIELD.getPreferredName(), isCommitted()); + builder.startArray(INDICES_FIELD.getPreferredName()); + { + for (UploadedIndexMetadata uploadedIndexMetadata : indices) { + uploadedIndexMetadata.toXContent(builder, params); + } + } + builder.endArray(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(clusterTerm); + out.writeVLong(stateVersion); + out.writeString(clusterUUID); + out.writeString(stateUUID); + out.writeInt(opensearchVersion.id); + out.writeString(nodeId); + out.writeBoolean(committed); + out.writeCollection(indices); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClusterMetadataManifest that = (ClusterMetadataManifest) o; + return Objects.equals(indices, that.indices) + && clusterTerm == that.clusterTerm + && stateVersion == that.stateVersion + && Objects.equals(clusterUUID, that.clusterUUID) + && Objects.equals(stateUUID, that.stateUUID) + && Objects.equals(opensearchVersion, that.opensearchVersion) + && Objects.equals(nodeId, that.nodeId) + && Objects.equals(committed, that.committed); + } + + @Override + public int hashCode() { + return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, nodeId, committed); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Builder for ClusterMetadataManifest + * + * @opensearch.internal + */ + public static class Builder { + + private List indices; + private long clusterTerm; + private long stateVersion; + private String clusterUUID; + private String stateUUID; + private Version opensearchVersion; + private String nodeId; + private boolean committed; + + public Builder indices(List indices) { + this.indices = indices; + return this; + } + + public Builder clusterTerm(long clusterTerm) { + this.clusterTerm = clusterTerm; + return this; + } + + public Builder stateVersion(long stateVersion) { + this.stateVersion = stateVersion; + return this; + } + + public Builder clusterUUID(String clusterUUID) { + this.clusterUUID = clusterUUID; + return this; + } + + public Builder stateUUID(String stateUUID) { + this.stateUUID = stateUUID; + return this; + } + + public Builder opensearchVersion(Version opensearchVersion) { + this.opensearchVersion = opensearchVersion; + return this; + } + + public Builder nodeId(String nodeId) { + this.nodeId = nodeId; + return this; + } + + public Builder committed(boolean committed) { + this.committed = committed; + return this; + } + + public List getIndices() { + return indices; + } + + public Builder() { + indices = new ArrayList<>(); + } + + public Builder(ClusterMetadataManifest manifest) { + this.clusterTerm = manifest.clusterTerm; + this.stateVersion = manifest.stateVersion; + this.clusterUUID = manifest.clusterUUID; + this.stateUUID = manifest.stateUUID; + this.opensearchVersion = manifest.opensearchVersion; + this.nodeId = manifest.nodeId; + this.committed = manifest.committed; + this.indices = new ArrayList<>(manifest.indices); + } + + public ClusterMetadataManifest build() { + return new ClusterMetadataManifest( + clusterTerm, + stateVersion, + clusterUUID, + stateUUID, + opensearchVersion, + nodeId, + committed, + indices + ); + } + + } + + /** + * Metadata for uploaded index metadata + * + * @opensearch.internal + */ + public static class UploadedIndexMetadata implements Writeable, ToXContentFragment { + + private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); + private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); + private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + + private static String indexName(Object[] fields) { + return (String) fields[0]; + } + + private static String indexUUID(Object[] fields) { + return (String) fields[1]; + } + + private static String uploadedFilename(Object[] fields) { + return (String) fields[2]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "uploaded_index_metadata", + fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)) + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); + } + + private final String indexName; + private final String indexUUID; + private final String uploadedFilename; + + public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) { + this.indexName = indexName; + this.indexUUID = indexUUID; + this.uploadedFilename = uploadedFileName; + } + + public UploadedIndexMetadata(StreamInput in) throws IOException { + this.indexName = in.readString(); + this.indexUUID = in.readString(); + this.uploadedFilename = in.readString(); + } + + public String getUploadedFilename() { + return uploadedFilename; + } + + public String getIndexName() { + return indexName; + } + + public String getIndexUUID() { + return indexUUID; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject() + .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) + .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) + .endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeString(indexUUID); + out.writeString(uploadedFilename); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final UploadedIndexMetadata that = (UploadedIndexMetadata) o; + return Objects.equals(indexName, that.indexName) + && Objects.equals(indexUUID, that.indexUUID) + && Objects.equals(uploadedFilename, that.uploadedFilename); + } + + @Override + public int hashCode() { + return Objects.hash(indexName, indexUUID, uploadedFilename); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java new file mode 100644 index 0000000000000..491c04bab3adb --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -0,0 +1,370 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.Nullable; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; + +/** + * A Service which provides APIs to upload and download cluster metadata from remote store. + * + * @opensearch.internal + */ +public class RemoteClusterStateService implements Closeable { + + public static final String METADATA_NAME_FORMAT = "%s.dat"; + + public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "index-metadata", + METADATA_NAME_FORMAT, + IndexMetadata::fromXContent + ); + + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( + "cluster-metadata-manifest", + METADATA_MANIFEST_NAME_FORMAT, + ClusterMetadataManifest::fromXContent + ); + /** + * Used to specify if cluster state metadata should be published to remote store + */ + // TODO The remote state enabled and repository settings should be read from node attributes. + // Dependent on https://github.com/opensearch-project/OpenSearch/pull/9105/ + public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.state.enabled", + false, + Property.NodeScope, + Property.Final + ); + /** + * Used to specify default repo to use for cluster state metadata upload + */ + public static final Setting REMOTE_CLUSTER_STATE_REPOSITORY_SETTING = Setting.simpleString( + "cluster.remote_store.state.repository", + "", + Property.NodeScope, + Property.Final + ); + private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + + private static final String DELIMITER = "__"; + + private final String nodeId; + private final Supplier repositoriesService; + private final Settings settings; + private final LongSupplier relativeTimeMillisSupplier; + private BlobStoreRepository blobStoreRepository; + private volatile TimeValue slowWriteLoggingThreshold; + + public RemoteClusterStateService( + String nodeId, + Supplier repositoriesService, + Settings settings, + ClusterSettings clusterSettings, + LongSupplier relativeTimeMillisSupplier + ) { + assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; + this.nodeId = nodeId; + this.repositoriesService = repositoriesService; + this.settings = settings; + this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; + this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); + clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + } + + /** + * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be + * invoked by the elected cluster manager when the remote cluster state is enabled. + * + * @return A manifest object which contains the details of uploaded entity metadata. + */ + @Nullable + public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) throws IOException { + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + ensureRepositorySet(); + + final List allUploadedIndexMetadata = new ArrayList<>(); + // todo parallel upload + // any validations before/after upload ? + for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + final String indexMetadataKey = writeIndexMetadata( + clusterState.getClusterName().value(), + clusterState.getMetadata().clusterUUID(), + indexMetadata, + indexMetadataFileName(indexMetadata) + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataKey + ); + allUploadedIndexMetadata.add(uploadedIndexMetadata); + } + final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false); + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { + logger.warn( + "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", + durationMillis, + slowWriteLoggingThreshold, + allUploadedIndexMetadata.size() + ); + } else { + // todo change to debug + logger.info( + "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices", + durationMillis, + allUploadedIndexMetadata.size() + ); + } + return manifest; + } + + /** + * This method uploads the diff between the previous cluster state and the current cluster state. The previous manifest file is needed to create the new + * manifest. The new manifest file is created by using the unchanged metadata from the previous manifest and the new metadata changes from the current cluster + * state. + * + * @return The uploaded ClusterMetadataManifest file + */ + @Nullable + public ClusterMetadataManifest writeIncrementalMetadata( + ClusterState previousClusterState, + ClusterState clusterState, + ClusterMetadataManifest previousManifest + ) throws IOException { + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); + final Map previousStateIndexMetadataVersionByName = new HashMap<>(); + for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { + previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); + } + + int numIndicesUpdated = 0; + int numIndicesUnchanged = 0; + final Map allUploadedIndexMetadata = previousManifest.getIndices() + .stream() + .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); + for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { + final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); + if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { + logger.trace( + "updating metadata for [{}], changing version from [{}] to [{}]", + indexMetadata.getIndex(), + previousVersion, + indexMetadata.getVersion() + ); + numIndicesUpdated++; + final String indexMetadataKey = writeIndexMetadata( + clusterState.getClusterName().value(), + clusterState.getMetadata().clusterUUID(), + indexMetadata, + indexMetadataFileName(indexMetadata) + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataKey + ); + allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); + } else { + numIndicesUnchanged++; + } + previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); + } + + for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { + allUploadedIndexMetadata.remove(removedIndexName); + } + final ClusterMetadataManifest manifest = uploadManifest( + clusterState, + allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), + false + ); + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { + logger.warn( + "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, + slowWriteLoggingThreshold, + numIndicesUpdated, + numIndicesUnchanged + ); + } else { + logger.trace( + "writing cluster state took [{}ms]; " + "wrote metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, + numIndicesUpdated, + numIndicesUnchanged + ); + } + return manifest; + } + + @Nullable + public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) + throws IOException { + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + assert clusterState != null : "Last accepted cluster state is not set"; + assert previousManifest != null : "Last cluster metadata manifest is not set"; + return uploadManifest(clusterState, previousManifest.getIndices(), true); + } + + public ClusterState getLatestClusterState(String clusterUUID) { + // todo + return null; + } + + @Override + public void close() throws IOException { + if (blobStoreRepository != null) { + IOUtils.close(blobStoreRepository); + } + } + + // Visible for testing + void ensureRepositorySet() { + if (blobStoreRepository != null) { + return; + } + final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings); + assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + + private ClusterMetadataManifest uploadManifest( + ClusterState clusterState, + List uploadedIndexMetadata, + boolean committed + ) throws IOException { + synchronized (this) { + final String manifestFileName = getManifestFileName(clusterState.term(), clusterState.version()); + final ClusterMetadataManifest manifest = new ClusterMetadataManifest( + clusterState.term(), + clusterState.getVersion(), + clusterState.metadata().clusterUUID(), + clusterState.stateUUID(), + Version.CURRENT, + nodeId, + committed, + uploadedIndexMetadata + ); + writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); + return manifest; + } + } + + private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata uploadIndexMetadata, String fileName) + throws IOException { + final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, uploadIndexMetadata.getIndexUUID()); + INDEX_METADATA_FORMAT.write(uploadIndexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor()); + // returning full path + return indexMetadataContainer.path().buildAsString() + fileName; + } + + private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) + throws IOException { + final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); + CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); + } + + private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(clusterUUID) + .add("index") + .add(indexUUID) + ); + } + + private BlobContainer manifestContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(clusterUUID) + .add("manifest") + ); + } + + private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { + this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; + } + + private static String getManifestFileName(long term, long version) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637_456536447 + return String.join( + DELIMITER, + "manifest", + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + } + + private static String indexMetadataFileName(IndexMetadata indexMetadata) { + return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/package-info.java b/server/src/main/java/org/opensearch/gateway/remote/package-info.java new file mode 100644 index 0000000000000..286e739f66289 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing class to perform operations on remote cluster state + */ +package org.opensearch.gateway.remote; diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 108a022a2612b..ad8168f48558f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -801,6 +801,10 @@ public RepositoryMetadata getMetadata() { return metadata; } + public Compressor getCompressor() { + return compressor; + } + @Override public RepositoryStats stats() { final BlobStore store = blobStore.get(); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index c6b44eaa9d364..47fea55242240 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -59,6 +60,9 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.TestEnvironment; +import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -75,11 +79,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.mockito.Mockito; + import static org.opensearch.test.NodeRoles.nonClusterManagerNode; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase { @@ -647,6 +655,70 @@ Directory createDirectory(Path path) { } } + public void testRemotePersistedState() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any())).thenReturn(manifest); + + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest); + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService); + + assertThat(remotePersistedState.getLastAcceptedState(), nullValue()); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(0L)); + + final long clusterTerm = randomNonNegativeLong(); + final ClusterState clusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + remotePersistedState.setLastAcceptedState(clusterState); + Mockito.verify(remoteClusterStateService, times(0)).writeFullMetadata(Mockito.any()); + + assertThat(remotePersistedState.getLastAcceptedState(), equalTo(clusterState)); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + + final ClusterState secondClusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + remotePersistedState.setLastAcceptedState(secondClusterState); + Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(Mockito.any()); + + assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + + remotePersistedState.markLastAcceptedStateAsCommitted(); + Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any()); + + assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + + } + + public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any()); + + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService); + + final long clusterTerm = randomNonNegativeLong(); + final ClusterState clusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + remotePersistedState.setLastAcceptedState(clusterState); + + final ClusterState secondClusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(secondClusterState)); + } + private static BigArrays getBigArrays() { return usually() ? BigArrays.NON_RECYCLING_INSTANCE diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java new file mode 100644 index 0000000000000..eafa191581d65 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.Version; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.test.EqualsHashCodeTestUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.VersionUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ClusterMetadataManifestTests extends OpenSearchTestCase { + + public void testClusterMetadataManifestXContent() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid", + Version.CURRENT, + "test-node-id", + false, + Collections.singletonList(uploadedIndexMetadata) + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + + public void testClusterMetadataManifestSerializationEqualsHashCode() { + ClusterMetadataManifest initialManifest = new ClusterMetadataManifest( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + VersionUtils.randomOpenSearchVersion(random()), + randomAlphaOfLength(10), + randomBoolean(), + randomUploadedIndexMetadataList() + ); + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + initialManifest, + orig -> OpenSearchTestCase.copyWriteable( + orig, + new NamedWriteableRegistry(Collections.emptyList()), + ClusterMetadataManifest::new + ), + manifest -> { + ClusterMetadataManifest.Builder builder = ClusterMetadataManifest.builder(manifest); + switch (randomInt(7)) { + case 0: + builder.clusterTerm(randomNonNegativeLong()); + break; + case 1: + builder.stateVersion(randomNonNegativeLong()); + break; + case 2: + builder.clusterUUID(randomAlphaOfLength(10)); + break; + case 3: + builder.stateUUID(randomAlphaOfLength(10)); + break; + case 4: + builder.opensearchVersion(VersionUtils.randomOpenSearchVersion(random())); + break; + case 5: + builder.nodeId(randomAlphaOfLength(10)); + break; + case 6: + builder.committed(randomBoolean()); + break; + case 7: + builder.indices(randomUploadedIndexMetadataList()); + break; + } + return builder.build(); + } + ); + } + + private List randomUploadedIndexMetadataList() { + final int size = randomIntBetween(1, 10); + final List uploadedIndexMetadataList = new ArrayList<>(size); + while (uploadedIndexMetadataList.size() < size) { + assertTrue(uploadedIndexMetadataList.add(randomUploadedIndexMetadata())); + } + return uploadedIndexMetadataList; + } + + private UploadedIndexMetadata randomUploadedIndexMetadata() { + return new UploadedIndexMetadata(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + + public void testUploadedIndexMetadataSerializationEqualsHashCode() { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + uploadedIndexMetadata, + orig -> OpenSearchTestCase.copyWriteable(orig, new NamedWriteableRegistry(Collections.emptyList()), UploadedIndexMetadata::new), + metadata -> randomlyChangingUploadedIndexMetadata(uploadedIndexMetadata) + ); + } + + private UploadedIndexMetadata randomlyChangingUploadedIndexMetadata(UploadedIndexMetadata uploadedIndexMetadata) { + switch (randomInt(2)) { + case 0: + return new UploadedIndexMetadata( + randomAlphaOfLength(10), + uploadedIndexMetadata.getIndexUUID(), + uploadedIndexMetadata.getUploadedFilename() + ); + case 1: + return new UploadedIndexMetadata( + uploadedIndexMetadata.getIndexName(), + randomAlphaOfLength(10), + uploadedIndexMetadata.getUploadedFilename() + ); + case 2: + return new UploadedIndexMetadata( + uploadedIndexMetadata.getIndexName(), + uploadedIndexMetadata.getIndexUUID(), + randomAlphaOfLength(10) + ); + } + return uploadedIndexMetadata; + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java new file mode 100644 index 0000000000000..215673642cce5 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -0,0 +1,254 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.repositories.FilterRepository; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Assert; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import org.mockito.ArgumentMatchers; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteClusterStateServiceTests extends OpenSearchTestCase { + + private RemoteClusterStateService remoteClusterStateService; + private Supplier repositoriesServiceSupplier; + private RepositoriesService repositoriesService; + private BlobStoreRepository blobStoreRepository; + + @Before + public void setup() { + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + final Settings settings = Settings.builder() + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), "remote_store_repository") + .build(); + blobStoreRepository = mock(BlobStoreRepository.class); + when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); + remoteClusterStateService = new RemoteClusterStateService( + "test-node-id", + repositoriesServiceSupplier, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L + ); + } + + public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().build(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + Assert.assertThat(manifest, nullValue()); + } + + public void testFailInitializationWhenRemoteStateDisabled() throws IOException { + final Settings settings = Settings.builder().build(); + assertThrows( + AssertionError.class, + () -> new RemoteClusterStateService( + "test-node-id", + repositoriesServiceSupplier, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L + ) + ); + } + + public void testFailWriteFullMetadataWhenRepositoryNotSet() { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + doThrow(new RepositoryMissingException("repository missing")).when(repositoriesService).repository("remote_store_repository"); + assertThrows(RepositoryMissingException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); + } + + public void testFailWriteFullMetadataWhenNotBlobRepository() { + final FilterRepository filterRepository = mock(FilterRepository.class); + when(repositoriesService.repository("remote_store_repository")).thenReturn(filterRepository); + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); + } + + public void testWriteFullMetadataSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + } + + public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().build(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); + Assert.assertThat(manifest, nullValue()); + } + + public void testFailWriteIncrementalMetadataWhenTermChanged() { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(2L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + assertThrows( + AssertionError.class, + () -> remoteClusterStateService.writeIncrementalMetadata(previousClusterState, clusterState, null) + ); + } + + public void testWriteIncrementalMetadataSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + + remoteClusterStateService.ensureRepositorySet(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousManifest + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + } + + public void testMarkLastStateAsCommittedSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + remoteClusterStateService.ensureRepositorySet(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); + + final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + } + + private void mockBlobStoreObjects() { + final BlobStore blobStore = mock(BlobStore.class); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); + final BlobPath blobPath = mock(BlobPath.class); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobPath.add(anyString())).thenReturn(blobPath); + when(blobPath.buildAsString()).thenReturn("/blob/path/"); + final BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.path()).thenReturn(blobPath); + when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + } + + private static ClusterState.Builder generateClusterStateWithOneIndex() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder().put(indexMetadata, true).clusterUUID("cluster-uuid").coordinationMetadata(coordinationMetadata).build() + ); + } + + private static DiscoveryNodes nodesWithLocalNodeClusterManager() { + return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build(); + } + +}