From 551e3cb7aeb7543bc4d5e563aa1153ee2f3734e0 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Fri, 25 Aug 2023 12:49:03 +0530 Subject: [PATCH 01/10] Upload all index metadata to remote Signed-off-by: Sooraj Sinha --- .../common/settings/ClusterSettings.java | 7 +- .../opensearch/gateway/GatewayMetaState.java | 121 +++++- .../gateway/remote/ClusterMetadataMarker.java | 381 ++++++++++++++++++ .../remote/RemoteClusterStateService.java | 358 ++++++++++++++++ .../gateway/remote/package-info.java | 12 + .../blobstore/BlobStoreRepository.java | 4 + .../remote/ClusterMetadataMarkerTests.java | 46 +++ .../RemoteClusterStateServiceTests.java | 254 ++++++++++++ 8 files changed, 1173 insertions(+), 10 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/package-info.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 32d14a3519659..381e9d1e95ae4 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -96,6 +96,7 @@ import org.opensearch.gateway.DanglingIndicesState; import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; @@ -660,7 +661,11 @@ public void apply(Settings value, Settings current, Settings previous) { // Related to monitoring of task cancellation TaskCancellationMonitoringSettings.IS_ENABLED_SETTING, - TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING + TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING, + + // Remote cluster state settings + RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index af894bdbc117e..c6ccb4e15a516 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -60,6 +60,8 @@ import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.io.IOUtils; import org.opensearch.env.NodeMetadata; +import org.opensearch.gateway.remote.ClusterMetadataMarker; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.plugins.MetadataUpgrader; import org.opensearch.threadpool.ThreadPool; @@ -84,19 +86,19 @@ /** * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts. * - * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that - * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link - * ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and - * non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster. + * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being + * loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be + * stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes + * receive the real cluster state from the elected cluster-manager after joining the cluster. * * @opensearch.internal */ public class GatewayMetaState implements Closeable { /** - * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially - * stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is - * restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state. + * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since + * it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a + * cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state. */ public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG"; @@ -234,8 +236,8 @@ Metadata upgradeMetadataForNode( } /** - * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current - * version. The MetadataIndexUpgradeService might also update obsolete settings if needed. + * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService + * might also update obsolete settings if needed. * * @return input metadata if no upgrade is needed or an upgraded metadata */ @@ -599,4 +601,105 @@ public void close() throws IOException { IOUtils.close(persistenceWriter.getAndSet(null)); } } + + /** + * Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}. + */ + public static class RemotePersistedState implements PersistedState { + + private ClusterState lastAcceptedState; + private ClusterMetadataMarker lastAcceptedMarker; + private final RemoteClusterStateService remoteClusterStateService; + + public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) { + this.remoteClusterStateService = remoteClusterStateService; + } + + @Override + public long getCurrentTerm() { + return lastAcceptedState != null ? lastAcceptedState.term() : 0L; + } + + @Override + public ClusterState getLastAcceptedState() { + if (lastAcceptedMarker != null) { + assert lastAcceptedState != null : "Last accepted state is not set"; + assert lastAcceptedState.metadata().indices().size() == lastAcceptedMarker.getIndices().size() + : "Number of indices in last accepted state and marker are different"; + lastAcceptedMarker.getIndices().stream().forEach(md -> { + assert lastAcceptedState.metadata().indices().containsKey(md.getIndexName()) + : "Last accepted state and marker are not in sync"; + assert lastAcceptedState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID()) + : "Last accepted state and marker are not in sync"; + }); + } + return lastAcceptedState; + } + + @Override + public void setCurrentTerm(long currentTerm) { + // no-op + // For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes. + // But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required. + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + try { + if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. + lastAcceptedState = clusterState; + return; + } + final ClusterMetadataMarker marker; + if (shouldWriteFullClusterState(clusterState)) { + marker = remoteClusterStateService.writeFullMetadata(clusterState); + } else { + marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker); + } + lastAcceptedMarker = marker; + lastAcceptedState = clusterState; + } catch (Exception e) { + handleExceptionOnWrite(e); + } + } + + private boolean shouldWriteFullClusterState(ClusterState clusterState) { + if (lastAcceptedState == null + || lastAcceptedMarker == null + || lastAcceptedState.term() != clusterState.term() + || lastAcceptedMarker.getOpensearchVersion() != Version.CURRENT) { + return true; + } + return false; + } + + @Override + public void markLastAcceptedStateAsCommitted() { + try { + if (lastAcceptedState == null + || lastAcceptedMarker == null + || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. + return; + } + final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted( + lastAcceptedState, + lastAcceptedMarker + ); + lastAcceptedMarker = committedMarker; + } catch (Exception e) { + handleExceptionOnWrite(e); + } + } + + @Override + public void close() throws IOException { + PersistedState.super.close(); + } + + private void handleExceptionOnWrite(Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java new file mode 100644 index 0000000000000..14be75ef166b6 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java @@ -0,0 +1,381 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.Version; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Marker file which contains the details of the uploaded entity metadata + * + * @opensearch.internal + */ +public class ClusterMetadataMarker implements Writeable, ToXContentFragment { + + private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); + private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); + private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); + private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid"); + private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version"); + private static final ParseField COMMITTED_FIELD = new ParseField("committed"); + private static final ParseField INDICES_FIELD = new ParseField("indices"); + + private static long term(Object[] fields) { + return (long) fields[0]; + } + + private static long version(Object[] fields) { + return (long) fields[1]; + } + + private static String clusterUUID(Object[] fields) { + return (String) fields[2]; + } + + private static String stateUUID(Object[] fields) { + return (String) fields[3]; + } + + private static Version opensearchVersion(Object[] fields) { + return Version.fromId((int) fields[4]); + } + + private static boolean committed(Object[] fields) { + return (boolean) fields[5]; + } + + private static List indices(Object[] fields) { + return (List) fields[6]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "cluster_metadata_marker", + fields -> new ClusterMetadataMarker( + term(fields), + version(fields), + clusterUUID(fields), + stateUUID(fields), + opensearchVersion(fields), + committed(fields), + indices(fields) + ) + ); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD); + PARSER.declareObjectArray( + ConstructingObjectParser.constructorArg(), + (p, c) -> UploadedIndexMetadata.fromXContent(p), + INDICES_FIELD + ); + } + + private final List indices; + private final long clusterTerm; + private final long stateVersion; + private final String clusterUUID; + private final String stateUUID; + private final Version opensearchVersion; + private final boolean committed; + + public List getIndices() { + return indices; + } + + public long getClusterTerm() { + return clusterTerm; + } + + public long getStateVersion() { + return stateVersion; + } + + public String getClusterUUID() { + return clusterUUID; + } + + public String getStateUUID() { + return stateUUID; + } + + public Version getOpensearchVersion() { + return opensearchVersion; + } + + public boolean isCommitted() { + return committed; + } + + public ClusterMetadataMarker( + long clusterTerm, + long version, + String clusterUUID, + String stateUUID, + Version opensearchVersion, + boolean committed, + List indices + ) { + this.clusterTerm = clusterTerm; + this.stateVersion = version; + this.clusterUUID = clusterUUID; + this.stateUUID = stateUUID; + this.opensearchVersion = opensearchVersion; + this.committed = committed; + this.indices = Collections.unmodifiableList(indices); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(CLUSTER_TERM_FIELD.getPreferredName(), getClusterTerm()) + .field(STATE_VERSION_FIELD.getPreferredName(), getStateVersion()) + .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()) + .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()) + .field(OPENSEARCH_VERSION_FIELD.getPreferredName(), getOpensearchVersion().id) + .field(COMMITTED_FIELD.getPreferredName(), isCommitted()); + builder.startArray(INDICES_FIELD.getPreferredName()); + { + for (UploadedIndexMetadata uploadedIndexMetadata : indices) { + uploadedIndexMetadata.toXContent(builder, params); + } + } + builder.endArray(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(clusterTerm); + out.writeVLong(stateVersion); + out.writeString(clusterUUID); + out.writeString(stateUUID); + out.writeInt(opensearchVersion.id); + out.writeBoolean(committed); + out.writeCollection(indices); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClusterMetadataMarker that = (ClusterMetadataMarker) o; + return Objects.equals(indices, that.indices) + && clusterTerm == that.clusterTerm + && stateVersion == that.stateVersion + && Objects.equals(clusterUUID, that.clusterUUID) + && Objects.equals(stateUUID, that.stateUUID) + && Objects.equals(opensearchVersion, that.opensearchVersion) + && Objects.equals(committed, that.committed); + } + + @Override + public int hashCode() { + return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Builder for ClusterMetadataMarker + * + * @opensearch.internal + */ + public static class Builder { + + private List indices; + private long clusterTerm; + private long stateVersion; + private String clusterUUID; + private String stateUUID; + private Version opensearchVersion; + private boolean committed; + + public Builder indices(List indices) { + this.indices = indices; + return this; + } + + public Builder clusterTerm(long clusterTerm) { + this.clusterTerm = clusterTerm; + return this; + } + + public Builder stateVersion(long stateVersion) { + this.stateVersion = stateVersion; + return this; + } + + public Builder clusterUUID(String clusterUUID) { + this.clusterUUID = clusterUUID; + return this; + } + + public Builder stateUUID(String stateUUID) { + this.stateUUID = stateUUID; + return this; + } + + public Builder opensearchVersion(Version opensearchVersion) { + this.opensearchVersion = opensearchVersion; + return this; + } + + public Builder committed(boolean committed) { + this.committed = committed; + return this; + } + + public List getIndices() { + return indices; + } + + public Builder() { + indices = new ArrayList<>(); + } + + public ClusterMetadataMarker build() { + return new ClusterMetadataMarker(clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed, indices); + } + + } + + /** + * Metadata for uploaded index metadata + * + * @opensearch.internal + */ + public static class UploadedIndexMetadata implements Writeable, ToXContentFragment { + + private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); + private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); + private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + + private static String indexName(Object[] fields) { + return (String) fields[0]; + } + + private static String indexUUID(Object[] fields) { + return (String) fields[1]; + } + + private static String uploadedFilename(Object[] fields) { + return (String) fields[2]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "uploaded_index_metadata", + fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)) + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); + } + + private final String indexName; + private final String indexUUID; + private final String uploadedFilename; + + public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) { + this.indexName = indexName; + this.indexUUID = indexUUID; + this.uploadedFilename = uploadedFileName; + } + + public String getUploadedFilename() { + return uploadedFilename; + } + + public String getIndexName() { + return indexName; + } + + public String getIndexUUID() { + return indexUUID; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject() + .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) + .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) + .endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeString(indexUUID); + out.writeString(uploadedFilename); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final UploadedIndexMetadata that = (UploadedIndexMetadata) o; + return Objects.equals(indexName, that.indexName) + && Objects.equals(indexUUID, that.indexUUID) + && Objects.equals(uploadedFilename, that.uploadedFilename); + } + + @Override + public int hashCode() { + return Objects.hash(indexName, indexUUID, uploadedFilename); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java new file mode 100644 index 0000000000000..f5e0bb14e6743 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -0,0 +1,358 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.Nullable; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; + +/** + * A Service which provides APIs to upload and download cluster metadata from remote store. + * + * @opensearch.internal + */ +public class RemoteClusterStateService { + + public static final String METADATA_NAME_FORMAT = "%s.dat"; + + public static final String METADATA_MARKER_NAME_FORMAT = "%s"; + + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "index-metadata", + METADATA_NAME_FORMAT, + IndexMetadata::fromXContent + ); + + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MARKER_FORMAT = new ChecksumBlobStoreFormat<>( + "cluster-metadata-marker", + METADATA_MARKER_NAME_FORMAT, + ClusterMetadataMarker::fromXContent + ); + /** + * Used to specify if cluster state metadata should be publish to remote store + */ + public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.state.enabled", + false, + Property.NodeScope, + Property.Final + ); + /** + * Used to specify default repo to use for cluster state metadata upload + */ + public static final Setting REMOTE_CLUSTER_STATE_REPOSITORY_SETTING = Setting.simpleString( + "cluster.remote_store.state.repository", + "", + Property.NodeScope, + Property.Final + ); + private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + + private static final String DELIMITER = "__"; + + private final Supplier repositoriesService; + private final Settings settings; + private final LongSupplier relativeTimeMillisSupplier; + private BlobStoreRepository blobStoreRepository; + private volatile TimeValue slowWriteLoggingThreshold; + + public RemoteClusterStateService( + Supplier repositoriesService, + Settings settings, + ClusterSettings clusterSettings, + LongSupplier relativeTimeMillisSupplier + ) { + this.repositoriesService = repositoriesService; + this.settings = settings; + this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; + this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); + clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + } + + /** + * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be + * invoked by the elected cluster manager when the remote cluster state is enabled. + * + * @return A metadata/marker object which contains the details of uploaded entity metadata. + */ + @Nullable + public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws IOException { + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; + ensureRepositorySet(); + + final List allUploadedIndexMetadata = new ArrayList<>(); + // todo parallel upload + // any validations before/after upload ? + for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + final String indexMetadataKey = writeIndexMetadata( + clusterState.getClusterName().value(), + clusterState.getMetadata().clusterUUID(), + indexMetadata, + indexMetadataFileName(indexMetadata) + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataKey + ); + allUploadedIndexMetadata.add(uploadedIndexMetadata); + } + final ClusterMetadataMarker marker = uploadMarker(clusterState, allUploadedIndexMetadata, false); + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { + logger.warn( + "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", + durationMillis, + slowWriteLoggingThreshold, + allUploadedIndexMetadata.size() + ); + } else { + // todo change to debug + logger.info( + "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices", + durationMillis, + allUploadedIndexMetadata.size() + ); + } + return marker; + } + + /** + * This method uploads the diff between the previous cluster state and the current cluster state. The previous marker file is needed to create the new + * marker. The new marker file is created by using the unchanged metadata from the previous marker and the new metadata changes from the current cluster + * state. + * + * @return The uploaded ClusterMetadataMarker file + */ + @Nullable + public ClusterMetadataMarker writeIncrementalMetadata( + ClusterState previousClusterState, + ClusterState clusterState, + ClusterMetadataMarker previousMarker + ) throws IOException { + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); + assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; + final Map previousStateIndexMetadataVersionByName = new HashMap<>(); + for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { + previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); + } + + int numIndicesUpdated = 0; + int numIndicesUnchanged = 0; + final Map allUploadedIndexMetadata = previousMarker.getIndices() + .stream() + .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); + for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { + final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); + if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { + logger.trace( + "updating metadata for [{}], changing version from [{}] to [{}]", + indexMetadata.getIndex(), + previousVersion, + indexMetadata.getVersion() + ); + numIndicesUpdated++; + final String indexMetadataKey = writeIndexMetadata( + clusterState.getClusterName().value(), + clusterState.getMetadata().clusterUUID(), + indexMetadata, + indexMetadataFileName(indexMetadata) + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataKey + ); + allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); + } else { + numIndicesUnchanged++; + } + previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); + } + + for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { + allUploadedIndexMetadata.remove(removedIndexName); + } + final ClusterMetadataMarker marker = uploadMarker( + clusterState, + allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), + false + ); + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { + logger.warn( + "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, + slowWriteLoggingThreshold, + numIndicesUpdated, + numIndicesUnchanged + ); + } else { + // todo change to debug + logger.info( + "writing cluster state took [{}ms]; " + "wrote and metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, + numIndicesUpdated, + numIndicesUnchanged + ); + } + return marker; + } + + @Nullable + public ClusterMetadataMarker markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataMarker previousMarker) + throws IOException { + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + assert clusterState != null : "Last accepted cluster state is not set"; + assert previousMarker != null : "Last cluster metadata marker is not set"; + assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; + return uploadMarker(clusterState, previousMarker.getIndices(), true); + } + + public ClusterState getLatestClusterState(String clusterUUID) { + // todo + return null; + } + + // Visible for testing + void ensureRepositorySet() { + if (blobStoreRepository != null) { + return; + } + final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings); + assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + + private ClusterMetadataMarker uploadMarker( + ClusterState clusterState, + List uploadedIndexMetadata, + boolean committed + ) throws IOException { + synchronized (this) { + final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version()); + final ClusterMetadataMarker marker = new ClusterMetadataMarker( + clusterState.term(), + clusterState.getVersion(), + clusterState.metadata().clusterUUID(), + clusterState.stateUUID(), + Version.CURRENT, + committed, + uploadedIndexMetadata + ); + writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName); + return marker; + } + } + + private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata uploadIndexMetadata, String fileName) + throws IOException { + final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, uploadIndexMetadata.getIndexUUID()); + INDEX_METADATA_FORMAT.write(uploadIndexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor()); + // returning full path + return indexMetadataContainer.path().buildAsString() + fileName; + } + + private void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker uploadMarker, String fileName) + throws IOException { + final BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID); + CLUSTER_METADATA_MARKER_FORMAT.write(uploadMarker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor()); + } + + private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(clusterUUID) + .add("index") + .add(indexUUID) + ); + } + + private BlobContainer markerContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(clusterUUID) + .add("marker") + ); + } + + private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { + this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; + } + + private static String getMarkerFileName(long term, long version) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker + return String.join( + DELIMITER, + "marker", + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + } + + private static String indexMetadataFileName(IndexMetadata indexMetadata) { + return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/package-info.java b/server/src/main/java/org/opensearch/gateway/remote/package-info.java new file mode 100644 index 0000000000000..286e739f66289 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing class to perform operations on remote cluster state + */ +package org.opensearch.gateway.remote; diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 108a022a2612b..ad8168f48558f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -801,6 +801,10 @@ public RepositoryMetadata getMetadata() { return metadata; } + public Compressor getCompressor() { + return compressor; + } + @Override public RepositoryStats stats() { final BlobStore store = blobStore.get(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java new file mode 100644 index 0000000000000..d2d7b85579a89 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.Version; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; + +public class ClusterMetadataMarkerTests extends OpenSearchTestCase { + + public void testXContent() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + ClusterMetadataMarker originalMarker = new ClusterMetadataMarker( + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid", + Version.CURRENT, + false, + Collections.singletonList(uploadedIndexMetadata) + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalMarker.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataMarker fromXContentMarker = ClusterMetadataMarker.fromXContent(parser); + assertEquals(originalMarker, fromXContentMarker); + } + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java new file mode 100644 index 0000000000000..243b4e0f55516 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -0,0 +1,254 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.repositories.FilterRepository; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Assert; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import org.mockito.ArgumentMatchers; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class RemoteClusterStateServiceTests extends OpenSearchTestCase { + + private RemoteClusterStateService remoteClusterStateService; + private Supplier repositoriesServiceSupplier; + private RepositoriesService repositoriesService; + private BlobStoreRepository blobStoreRepository; + + @Before + public void setup() { + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + final Settings settings = Settings.builder() + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), "remote_store_repository") + .build(); + blobStoreRepository = mock(BlobStoreRepository.class); + when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); + remoteClusterStateService = new RemoteClusterStateService( + repositoriesServiceSupplier, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L + ); + } + + public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().build(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState); + Assert.assertThat(marker, nullValue()); + } + + public void testFailWriteFullMetadataWhenRemoteStateDisabled() throws IOException { + final Settings settings = Settings.builder().build(); + remoteClusterStateService = spy( + new RemoteClusterStateService( + repositoriesServiceSupplier, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + () -> 0L + ) + ); + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); + } + + public void testFailWriteFullMetadataWhenRepositoryNotSet() { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + doThrow(new RepositoryMissingException("repository missing")).when(repositoriesService).repository("remote_store_repository"); + assertThrows(RepositoryMissingException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); + } + + public void testFailWriteFullMetadataWhenNotBlobRepository() { + final FilterRepository filterRepository = mock(FilterRepository.class); + when(repositoriesService.repository("remote_store_repository")).thenReturn(filterRepository); + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); + } + + public void testWriteFullMetadataSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(marker.getIndices().size(), is(1)); + assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm())); + assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion())); + assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); + assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + } + + public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().build(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); + Assert.assertThat(marker, nullValue()); + } + + public void testFailWriteIncrementalMetadataWhenTermChanged() { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(2L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + assertThrows( + AssertionError.class, + () -> remoteClusterStateService.writeIncrementalMetadata(previousClusterState, clusterState, null) + ); + } + + public void testWriteIncrementalMetadataSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + + final ClusterMetadataMarker previousMarker = ClusterMetadataMarker.builder().indices(Collections.emptyList()).build(); + + remoteClusterStateService.ensureRepositorySet(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousMarker + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(marker.getIndices().size(), is(1)); + assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm())); + assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion())); + assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); + assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + } + + public void testMarkLastStateAsCommittedSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + remoteClusterStateService.ensureRepositorySet(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + final ClusterMetadataMarker previousMarker = ClusterMetadataMarker.builder().indices(indices).build(); + + final ClusterMetadataMarker marker = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousMarker); + + final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(marker.getIndices().size(), is(1)); + assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm())); + assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion())); + assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); + assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + } + + private void mockBlobStoreObjects() { + final BlobStore blobStore = mock(BlobStore.class); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); + final BlobPath blobPath = mock(BlobPath.class); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobPath.add(anyString())).thenReturn(blobPath); + when(blobPath.buildAsString()).thenReturn("/blob/path/"); + final BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.path()).thenReturn(blobPath); + when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + } + + private static ClusterState.Builder generateClusterStateWithOneIndex() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder().put(indexMetadata, true).clusterUUID("cluster-uuid").coordinationMetadata(coordinationMetadata).build() + ); + } + + private static DiscoveryNodes nodesWithLocalNodeClusterManager() { + return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build(); + } + +} From ab5ff8c13bbc96b6e10be0eeee518c899bfdd909 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Sun, 27 Aug 2023 21:28:41 +0530 Subject: [PATCH 02/10] Add nodeId field in marker Signed-off-by: Sooraj Sinha --- .../opensearch/gateway/GatewayMetaState.java | 2 +- .../gateway/remote/ClusterMetadataMarker.java | 73 ++++++++++++- .../remote/RemoteClusterStateService.java | 15 ++- .../remote/ClusterMetadataMarkerTests.java | 101 +++++++++++++++++- .../RemoteClusterStateServiceTests.java | 2 + 5 files changed, 186 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index c6ccb4e15a516..ac48540f22f15 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -695,7 +695,7 @@ public void markLastAcceptedStateAsCommitted() { @Override public void close() throws IOException { - PersistedState.super.close(); + remoteClusterStateService.close(); } private void handleExceptionOnWrite(Exception e) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java index 14be75ef166b6..b7c89965c6ac1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java @@ -11,6 +11,7 @@ import org.opensearch.Version; import org.opensearch.core.ParseField; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ConstructingObjectParser; @@ -37,6 +38,7 @@ public class ClusterMetadataMarker implements Writeable, ToXContentFragment { private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid"); private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version"); + private static final ParseField NODE_ID_FIELD = new ParseField("node_id"); private static final ParseField COMMITTED_FIELD = new ParseField("committed"); private static final ParseField INDICES_FIELD = new ParseField("indices"); @@ -60,12 +62,16 @@ private static Version opensearchVersion(Object[] fields) { return Version.fromId((int) fields[4]); } + private static String nodeId(Object[] fields) { + return (String) fields[5]; + } + private static boolean committed(Object[] fields) { - return (boolean) fields[5]; + return (boolean) fields[6]; } private static List indices(Object[] fields) { - return (List) fields[6]; + return (List) fields[7]; } private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( @@ -76,6 +82,7 @@ private static List indices(Object[] fields) { clusterUUID(fields), stateUUID(fields), opensearchVersion(fields), + nodeId(fields), committed(fields), indices(fields) ) @@ -87,6 +94,7 @@ private static List indices(Object[] fields) { PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD); PARSER.declareObjectArray( ConstructingObjectParser.constructorArg(), @@ -101,6 +109,7 @@ private static List indices(Object[] fields) { private final String clusterUUID; private final String stateUUID; private final Version opensearchVersion; + private final String nodeId; private final boolean committed; public List getIndices() { @@ -127,6 +136,10 @@ public Version getOpensearchVersion() { return opensearchVersion; } + public String getNodeId() { + return nodeId; + } + public boolean isCommitted() { return committed; } @@ -137,6 +150,7 @@ public ClusterMetadataMarker( String clusterUUID, String stateUUID, Version opensearchVersion, + String nodeId, boolean committed, List indices ) { @@ -145,14 +159,30 @@ public ClusterMetadataMarker( this.clusterUUID = clusterUUID; this.stateUUID = stateUUID; this.opensearchVersion = opensearchVersion; + this.nodeId = nodeId; this.committed = committed; this.indices = Collections.unmodifiableList(indices); } + public ClusterMetadataMarker(StreamInput in) throws IOException { + this.clusterTerm = in.readVLong(); + this.stateVersion = in.readVLong(); + this.clusterUUID = in.readString(); + this.stateUUID = in.readString(); + this.opensearchVersion = Version.fromId(in.readInt()); + this.nodeId = in.readString(); + this.committed = in.readBoolean(); + this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); + } + public static Builder builder() { return new Builder(); } + public static Builder builder(ClusterMetadataMarker marker) { + return new Builder(marker); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(CLUSTER_TERM_FIELD.getPreferredName(), getClusterTerm()) @@ -160,6 +190,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()) .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()) .field(OPENSEARCH_VERSION_FIELD.getPreferredName(), getOpensearchVersion().id) + .field(NODE_ID_FIELD.getPreferredName(), getNodeId()) .field(COMMITTED_FIELD.getPreferredName(), isCommitted()); builder.startArray(INDICES_FIELD.getPreferredName()); { @@ -178,6 +209,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterUUID); out.writeString(stateUUID); out.writeInt(opensearchVersion.id); + out.writeString(nodeId); out.writeBoolean(committed); out.writeCollection(indices); } @@ -197,12 +229,13 @@ public boolean equals(Object o) { && Objects.equals(clusterUUID, that.clusterUUID) && Objects.equals(stateUUID, that.stateUUID) && Objects.equals(opensearchVersion, that.opensearchVersion) + && Objects.equals(nodeId, that.nodeId) && Objects.equals(committed, that.committed); } @Override public int hashCode() { - return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed); + return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, nodeId, committed); } @Override @@ -227,6 +260,7 @@ public static class Builder { private String clusterUUID; private String stateUUID; private Version opensearchVersion; + private String nodeId; private boolean committed; public Builder indices(List indices) { @@ -259,6 +293,11 @@ public Builder opensearchVersion(Version opensearchVersion) { return this; } + public Builder nodeId(String nodeId) { + this.nodeId = nodeId; + return this; + } + public Builder committed(boolean committed) { this.committed = committed; return this; @@ -272,8 +311,28 @@ public Builder() { indices = new ArrayList<>(); } + public Builder(ClusterMetadataMarker marker) { + this.clusterTerm = marker.clusterTerm; + this.stateVersion = marker.stateVersion; + this.clusterUUID = marker.clusterUUID; + this.stateUUID = marker.stateUUID; + this.opensearchVersion = marker.opensearchVersion; + this.nodeId = marker.nodeId; + this.committed = marker.committed; + this.indices = new ArrayList<>(marker.indices); + } + public ClusterMetadataMarker build() { - return new ClusterMetadataMarker(clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed, indices); + return new ClusterMetadataMarker( + clusterTerm, + stateVersion, + clusterUUID, + stateUUID, + opensearchVersion, + nodeId, + committed, + indices + ); } } @@ -322,6 +381,12 @@ public UploadedIndexMetadata(String indexName, String indexUUID, String uploaded this.uploadedFilename = uploadedFileName; } + public UploadedIndexMetadata(StreamInput in) throws IOException { + this.indexName = in.readString(); + this.indexUUID = in.readString(); + this.uploadedFilename = in.readString(); + } + public String getUploadedFilename() { return uploadedFilename; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index f5e0bb14e6743..705210f39c2f1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -20,6 +20,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.RepositoriesService; @@ -27,6 +28,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -46,7 +48,7 @@ * * @opensearch.internal */ -public class RemoteClusterStateService { +public class RemoteClusterStateService implements Closeable { public static final String METADATA_NAME_FORMAT = "%s.dat"; @@ -85,6 +87,7 @@ public class RemoteClusterStateService { private static final String DELIMITER = "__"; + private final String nodeId; private final Supplier repositoriesService; private final Settings settings; private final LongSupplier relativeTimeMillisSupplier; @@ -92,11 +95,13 @@ public class RemoteClusterStateService { private volatile TimeValue slowWriteLoggingThreshold; public RemoteClusterStateService( + String nodeId, Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier ) { + this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; @@ -264,6 +269,13 @@ public ClusterState getLatestClusterState(String clusterUUID) { return null; } + @Override + public void close() throws IOException { + if (blobStoreRepository != null) { + IOUtils.close(blobStoreRepository); + } + } + // Visible for testing void ensureRepositorySet() { if (blobStoreRepository != null) { @@ -289,6 +301,7 @@ private ClusterMetadataMarker uploadMarker( clusterState.metadata().clusterUUID(), clusterState.stateUUID(), Version.CURRENT, + nodeId, committed, uploadedIndexMetadata ); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java index d2d7b85579a89..22a3383ac035e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java @@ -11,18 +11,23 @@ import org.opensearch.Version; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.VersionUtils; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; public class ClusterMetadataMarkerTests extends OpenSearchTestCase { - public void testXContent() throws IOException { + public void testClusterMetadataMarkerXContent() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); ClusterMetadataMarker originalMarker = new ClusterMetadataMarker( 1L, @@ -30,6 +35,7 @@ public void testXContent() throws IOException { "test-cluster-uuid", "test-state-uuid", Version.CURRENT, + "test-node-id", false, Collections.singletonList(uploadedIndexMetadata) ); @@ -43,4 +49,97 @@ public void testXContent() throws IOException { assertEquals(originalMarker, fromXContentMarker); } } + + public void testClusterMetadataMarkerSerializationEqualsHashCode() { + ClusterMetadataMarker initialMarker = new ClusterMetadataMarker( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomAlphaOfLength(10), + randomAlphaOfLength(10), + VersionUtils.randomOpenSearchVersion(random()), + randomAlphaOfLength(10), + randomBoolean(), + randomUploadedIndexMetadataList() + ); + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + initialMarker, + orig -> OpenSearchTestCase.copyWriteable(orig, new NamedWriteableRegistry(Collections.emptyList()), ClusterMetadataMarker::new), + marker -> { + ClusterMetadataMarker.Builder builder = ClusterMetadataMarker.builder(marker); + switch (randomInt(7)) { + case 0: + builder.clusterTerm(randomNonNegativeLong()); + break; + case 1: + builder.stateVersion(randomNonNegativeLong()); + break; + case 2: + builder.clusterUUID(randomAlphaOfLength(10)); + break; + case 3: + builder.stateUUID(randomAlphaOfLength(10)); + break; + case 4: + builder.opensearchVersion(VersionUtils.randomOpenSearchVersion(random())); + break; + case 5: + builder.nodeId(randomAlphaOfLength(10)); + break; + case 6: + builder.committed(randomBoolean()); + break; + case 7: + builder.indices(randomUploadedIndexMetadataList()); + break; + } + return builder.build(); + } + ); + } + + private List randomUploadedIndexMetadataList() { + final int size = randomIntBetween(1, 10); + final List uploadedIndexMetadataList = new ArrayList<>(size); + while (uploadedIndexMetadataList.size() < size) { + assertTrue(uploadedIndexMetadataList.add(randomUploadedIndexMetadata())); + } + return uploadedIndexMetadataList; + } + + private UploadedIndexMetadata randomUploadedIndexMetadata() { + return new UploadedIndexMetadata(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + + public void testUploadedIndexMetadataSerializationEqualsHashCode() { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + uploadedIndexMetadata, + orig -> OpenSearchTestCase.copyWriteable(orig, new NamedWriteableRegistry(Collections.emptyList()), UploadedIndexMetadata::new), + metadata -> randomlyChangingUploadedIndexMetadata(uploadedIndexMetadata) + ); + } + + private UploadedIndexMetadata randomlyChangingUploadedIndexMetadata(UploadedIndexMetadata uploadedIndexMetadata) { + switch (randomInt(2)) { + case 0: + return new UploadedIndexMetadata( + randomAlphaOfLength(10), + uploadedIndexMetadata.getIndexUUID(), + uploadedIndexMetadata.getUploadedFilename() + ); + case 1: + return new UploadedIndexMetadata( + uploadedIndexMetadata.getIndexName(), + randomAlphaOfLength(10), + uploadedIndexMetadata.getUploadedFilename() + ); + case 2: + return new UploadedIndexMetadata( + uploadedIndexMetadata.getIndexName(), + uploadedIndexMetadata.getIndexUUID(), + randomAlphaOfLength(10) + ); + } + return uploadedIndexMetadata; + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 243b4e0f55516..2323f7abac3c9 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -66,6 +66,7 @@ public void setup() { blobStoreRepository = mock(BlobStoreRepository.class); when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); remoteClusterStateService = new RemoteClusterStateService( + "test-node-id", repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -83,6 +84,7 @@ public void testFailWriteFullMetadataWhenRemoteStateDisabled() throws IOExceptio final Settings settings = Settings.builder().build(); remoteClusterStateService = spy( new RemoteClusterStateService( + "test-node-id", repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), From 0486d2350abc46bf920dfdd607752f1dc6fe5c19 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Tue, 29 Aug 2023 12:56:07 +0530 Subject: [PATCH 03/10] Move assertion to setLastAcceptedState Signed-off-by: Sooraj Sinha --- .../opensearch/gateway/GatewayMetaState.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index ac48540f22f15..1c35f4b5ffc8d 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -622,17 +622,6 @@ public long getCurrentTerm() { @Override public ClusterState getLastAcceptedState() { - if (lastAcceptedMarker != null) { - assert lastAcceptedState != null : "Last accepted state is not set"; - assert lastAcceptedState.metadata().indices().size() == lastAcceptedMarker.getIndices().size() - : "Number of indices in last accepted state and marker are different"; - lastAcceptedMarker.getIndices().stream().forEach(md -> { - assert lastAcceptedState.metadata().indices().containsKey(md.getIndexName()) - : "Last accepted state and marker are not in sync"; - assert lastAcceptedState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID()) - : "Last accepted state and marker are not in sync"; - }); - } return lastAcceptedState; } @@ -657,6 +646,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } else { marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker); } + assert verifyMarkerAndClusterState(marker, clusterState) == true : "Marker and ClusterState are not in sync"; lastAcceptedMarker = marker; lastAcceptedState = clusterState; } catch (Exception e) { @@ -664,6 +654,20 @@ public void setLastAcceptedState(ClusterState clusterState) { } } + private boolean verifyMarkerAndClusterState(ClusterMetadataMarker marker, ClusterState clusterState) { + assert marker != null: "ClusterMetadataMarker is null"; + assert clusterState != null : "ClusterState is null"; + assert clusterState.metadata().indices().size() == marker.getIndices().size() + : "Number of indices in last accepted state and marker are different"; + marker.getIndices().stream().forEach(md -> { + assert clusterState.metadata().indices().containsKey(md.getIndexName()) + : "Last accepted state and marker are not in sync"; + assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID()) + : "Last accepted state and marker are not in sync"; + }); + return true; + } + private boolean shouldWriteFullClusterState(ClusterState clusterState) { if (lastAcceptedState == null || lastAcceptedMarker == null From 4824bb5b2014b61632b249d4eddec262d532421f Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 30 Aug 2023 11:52:44 +0530 Subject: [PATCH 04/10] Handle RepositoryMissingException during bootstrap Signed-off-by: Sooraj Sinha --- .../java/org/opensearch/gateway/GatewayMetaState.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 1c35f4b5ffc8d..85e6ee7baf3e7 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -64,6 +64,7 @@ import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.plugins.MetadataUpgrader; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -607,6 +608,8 @@ public void close() throws IOException { */ public static class RemotePersistedState implements PersistedState { + private static final Logger logger = LogManager.getLogger(RemotePersistedState.class); + private ClusterState lastAcceptedState; private ClusterMetadataMarker lastAcceptedMarker; private final RemoteClusterStateService remoteClusterStateService; @@ -637,6 +640,7 @@ public void setLastAcceptedState(ClusterState clusterState) { try { if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. + logger.trace("Cluster is not yet ready to publish state to remote store"); lastAcceptedState = clusterState; return; } @@ -649,6 +653,9 @@ public void setLastAcceptedState(ClusterState clusterState) { assert verifyMarkerAndClusterState(marker, clusterState) == true : "Marker and ClusterState are not in sync"; lastAcceptedMarker = marker; lastAcceptedState = clusterState; + } catch (RepositoryMissingException e) { + logger.error("Remote repository is not yet registered"); + lastAcceptedState = clusterState; } catch (Exception e) { handleExceptionOnWrite(e); } @@ -685,6 +692,7 @@ public void markLastAcceptedStateAsCommitted() { || lastAcceptedMarker == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. + logger.trace("Cluster is not yet ready to publish state to remote store"); return; } final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted( From 8b8d43de06824172f5cdcc2a6f97b419c267d695 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 30 Aug 2023 14:05:41 +0530 Subject: [PATCH 05/10] Add changelog Signed-off-by: Sooraj Sinha --- CHANGELOG.md | 5 +++-- .../main/java/org/opensearch/gateway/GatewayMetaState.java | 5 ++--- .../opensearch/gateway/remote/RemoteClusterStateService.java | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eabc17f81917f..280808f70330e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,7 +89,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212)) - [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081)) - Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513)) -- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379)) +- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379)) +- [Remote State] Create service to publish cluster state to remote store ([#9160](https://github.com/opensearch-project/OpenSearch/pull/9160)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) @@ -181,4 +182,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 85e6ee7baf3e7..26705c0742742 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -662,13 +662,12 @@ public void setLastAcceptedState(ClusterState clusterState) { } private boolean verifyMarkerAndClusterState(ClusterMetadataMarker marker, ClusterState clusterState) { - assert marker != null: "ClusterMetadataMarker is null"; + assert marker != null : "ClusterMetadataMarker is null"; assert clusterState != null : "ClusterState is null"; assert clusterState.metadata().indices().size() == marker.getIndices().size() : "Number of indices in last accepted state and marker are different"; marker.getIndices().stream().forEach(md -> { - assert clusterState.metadata().indices().containsKey(md.getIndexName()) - : "Last accepted state and marker are not in sync"; + assert clusterState.metadata().indices().containsKey(md.getIndexName()) : "Last accepted state and marker are not in sync"; assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID()) : "Last accepted state and marker are not in sync"; }); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 705210f39c2f1..51bfced442073 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -242,7 +242,7 @@ public ClusterMetadataMarker writeIncrementalMetadata( } else { // todo change to debug logger.info( - "writing cluster state took [{}ms]; " + "wrote and metadata for [{}] indices and skipped [{}] unchanged indices", + "writing cluster state took [{}ms]; " + "wrote metadata for [{}] indices and skipped [{}] unchanged indices", durationMillis, numIndicesUpdated, numIndicesUnchanged @@ -354,7 +354,7 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { } private static String getMarkerFileName(long term, long version) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/marker_2147483642_2147483637_456536447 return String.join( DELIMITER, "marker", From b1c7ab9abcae29e7b3b8d56d80e1c983830cf231 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 30 Aug 2023 16:18:45 +0530 Subject: [PATCH 06/10] Add assert for previous cluster state and marker Signed-off-by: Sooraj Sinha --- .../src/main/java/org/opensearch/gateway/GatewayMetaState.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 26705c0742742..26751ddc5478e 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -648,6 +648,8 @@ public void setLastAcceptedState(ClusterState clusterState) { if (shouldWriteFullClusterState(clusterState)) { marker = remoteClusterStateService.writeFullMetadata(clusterState); } else { + assert verifyMarkerAndClusterState(lastAcceptedMarker, lastAcceptedState) == true + : "Previous Marker and previous ClusterState are not in sync"; marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker); } assert verifyMarkerAndClusterState(marker, clusterState) == true : "Marker and ClusterState are not in sync"; From 8f8e034d30572fe7b0093dbef3d230237c3890c3 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 30 Aug 2023 18:24:13 +0530 Subject: [PATCH 07/10] Add RemotePersistedState tests Signed-off-by: Sooraj Sinha --- .../opensearch/gateway/GatewayMetaState.java | 10 ++- .../remote/RemoteClusterStateService.java | 9 +-- .../GatewayMetaStatePersistedStateTests.java | 72 +++++++++++++++++++ 3 files changed, 84 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 26751ddc5478e..c9e7ea973380b 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -640,7 +640,7 @@ public void setLastAcceptedState(ClusterState clusterState) { try { if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. - logger.trace("Cluster is not yet ready to publish state to remote store"); + logger.info("Cluster is not yet ready to publish state to remote store"); lastAcceptedState = clusterState; return; } @@ -656,6 +656,9 @@ assert verifyMarkerAndClusterState(lastAcceptedMarker, lastAcceptedState) == tru lastAcceptedMarker = marker; lastAcceptedState = clusterState; } catch (RepositoryMissingException e) { + // TODO This logic needs to be modified once PR for repo registration during bootstrap is pushed + // https://github.com/opensearch-project/OpenSearch/pull/9105/ + // After the above PR is pushed, we can remove this silent failure and throw the exception instead. logger.error("Remote repository is not yet registered"); lastAcceptedState = clusterState; } catch (Exception e) { @@ -669,9 +672,10 @@ private boolean verifyMarkerAndClusterState(ClusterMetadataMarker marker, Cluste assert clusterState.metadata().indices().size() == marker.getIndices().size() : "Number of indices in last accepted state and marker are different"; marker.getIndices().stream().forEach(md -> { - assert clusterState.metadata().indices().containsKey(md.getIndexName()) : "Last accepted state and marker are not in sync"; + assert clusterState.metadata().indices().containsKey(md.getIndexName()) + : "Last accepted state does not contain the index : " + md.getIndexName(); assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID()) - : "Last accepted state and marker are not in sync"; + : "Last accepted state and marker do not have same UUID for index : " + md.getIndexName(); }); return true; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 51bfced442073..17b7791277892 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -66,8 +66,10 @@ public class RemoteClusterStateService implements Closeable { ClusterMetadataMarker::fromXContent ); /** - * Used to specify if cluster state metadata should be publish to remote store + * Used to specify if cluster state metadata should be published to remote store */ + // TODO The remote state enabled and repository settings should be read from node attributes. + // Dependent on https://github.com/opensearch-project/OpenSearch/pull/9105/ public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( "cluster.remote_store.state.enabled", false, @@ -101,6 +103,7 @@ public RemoteClusterStateService( ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier ) { + assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; @@ -122,7 +125,6 @@ public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws logger.error("Local node is not elected cluster manager. Exiting"); return null; } - assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; ensureRepositorySet(); final List allUploadedIndexMetadata = new ArrayList<>(); @@ -240,8 +242,7 @@ public ClusterMetadataMarker writeIncrementalMetadata( numIndicesUnchanged ); } else { - // todo change to debug - logger.info( + logger.trace( "writing cluster state took [{}ms]; " + "wrote metadata for [{}] indices and skipped [{}] unchanged indices", durationMillis, numIndicesUpdated, diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index c6b44eaa9d364..b34b7f3ffbd1f 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -59,6 +60,9 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.TestEnvironment; +import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; +import org.opensearch.gateway.remote.ClusterMetadataMarker; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -75,11 +79,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.mockito.Mockito; + import static org.opensearch.test.NodeRoles.nonClusterManagerNode; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase { @@ -647,6 +655,70 @@ Directory createDirectory(Path path) { } } + public void testRemotePersistedState() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + final ClusterMetadataMarker marker = ClusterMetadataMarker.builder().clusterTerm(1L).stateVersion(5L).build(); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any())).thenReturn(marker); + + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(marker); + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService); + + assertThat(remotePersistedState.getLastAcceptedState(), nullValue()); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(0L)); + + final long clusterTerm = randomNonNegativeLong(); + final ClusterState clusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + remotePersistedState.setLastAcceptedState(clusterState); + Mockito.verify(remoteClusterStateService, times(0)).writeFullMetadata(Mockito.any()); + + assertThat(remotePersistedState.getLastAcceptedState(), equalTo(clusterState)); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + + final ClusterState secondClusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + remotePersistedState.setLastAcceptedState(secondClusterState); + Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(Mockito.any()); + + assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + + remotePersistedState.markLastAcceptedStateAsCommitted(); + Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any()); + + assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); + assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + + } + + public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any()); + + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService); + + final long clusterTerm = randomNonNegativeLong(); + final ClusterState clusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + remotePersistedState.setLastAcceptedState(clusterState); + + final ClusterState secondClusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(secondClusterState)); + } + private static BigArrays getBigArrays() { return usually() ? BigArrays.NON_RECYCLING_INSTANCE From 6998fbdc439048b9406e837dadbda745d4948737 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 30 Aug 2023 20:47:28 +0530 Subject: [PATCH 08/10] Rename ClusterMetadataMarker to ClusterMetadataManifest Signed-off-by: Sooraj Sinha --- .../opensearch/gateway/GatewayMetaState.java | 42 +++++----- ...rker.java => ClusterMetadataManifest.java} | 46 +++++------ .../remote/RemoteClusterStateService.java | 72 +++++++++--------- .../GatewayMetaStatePersistedStateTests.java | 8 +- ...java => ClusterMetadataManifestTests.java} | 30 ++++---- .../RemoteClusterStateServiceTests.java | 76 +++++++++---------- 6 files changed, 139 insertions(+), 135 deletions(-) rename server/src/main/java/org/opensearch/gateway/remote/{ClusterMetadataMarker.java => ClusterMetadataManifest.java} (91%) rename server/src/test/java/org/opensearch/gateway/remote/{ClusterMetadataMarkerTests.java => ClusterMetadataManifestTests.java} (82%) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index c9e7ea973380b..02f1e5049b95c 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -60,7 +60,7 @@ import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.io.IOUtils; import org.opensearch.env.NodeMetadata; -import org.opensearch.gateway.remote.ClusterMetadataMarker; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.plugins.MetadataUpgrader; @@ -611,7 +611,7 @@ public static class RemotePersistedState implements PersistedState { private static final Logger logger = LogManager.getLogger(RemotePersistedState.class); private ClusterState lastAcceptedState; - private ClusterMetadataMarker lastAcceptedMarker; + private ClusterMetadataManifest lastAcceptedManifest; private final RemoteClusterStateService remoteClusterStateService; public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) { @@ -644,16 +644,16 @@ public void setLastAcceptedState(ClusterState clusterState) { lastAcceptedState = clusterState; return; } - final ClusterMetadataMarker marker; + final ClusterMetadataManifest manifest; if (shouldWriteFullClusterState(clusterState)) { - marker = remoteClusterStateService.writeFullMetadata(clusterState); + manifest = remoteClusterStateService.writeFullMetadata(clusterState); } else { - assert verifyMarkerAndClusterState(lastAcceptedMarker, lastAcceptedState) == true - : "Previous Marker and previous ClusterState are not in sync"; - marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker); + assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true + : "Previous manifest and previous ClusterState are not in sync"; + manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); } - assert verifyMarkerAndClusterState(marker, clusterState) == true : "Marker and ClusterState are not in sync"; - lastAcceptedMarker = marker; + assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync"; + lastAcceptedManifest = manifest; lastAcceptedState = clusterState; } catch (RepositoryMissingException e) { // TODO This logic needs to be modified once PR for repo registration during bootstrap is pushed @@ -666,25 +666,25 @@ assert verifyMarkerAndClusterState(lastAcceptedMarker, lastAcceptedState) == tru } } - private boolean verifyMarkerAndClusterState(ClusterMetadataMarker marker, ClusterState clusterState) { - assert marker != null : "ClusterMetadataMarker is null"; + private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) { + assert manifest != null : "ClusterMetadataManifest is null"; assert clusterState != null : "ClusterState is null"; - assert clusterState.metadata().indices().size() == marker.getIndices().size() - : "Number of indices in last accepted state and marker are different"; - marker.getIndices().stream().forEach(md -> { + assert clusterState.metadata().indices().size() == manifest.getIndices().size() + : "Number of indices in last accepted state and manifest are different"; + manifest.getIndices().stream().forEach(md -> { assert clusterState.metadata().indices().containsKey(md.getIndexName()) : "Last accepted state does not contain the index : " + md.getIndexName(); assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID()) - : "Last accepted state and marker do not have same UUID for index : " + md.getIndexName(); + : "Last accepted state and manifest do not have same UUID for index : " + md.getIndexName(); }); return true; } private boolean shouldWriteFullClusterState(ClusterState clusterState) { if (lastAcceptedState == null - || lastAcceptedMarker == null + || lastAcceptedManifest == null || lastAcceptedState.term() != clusterState.term() - || lastAcceptedMarker.getOpensearchVersion() != Version.CURRENT) { + || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) { return true; } return false; @@ -694,17 +694,17 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState) { public void markLastAcceptedStateAsCommitted() { try { if (lastAcceptedState == null - || lastAcceptedMarker == null + || lastAcceptedManifest == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. logger.trace("Cluster is not yet ready to publish state to remote store"); return; } - final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted( + final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted( lastAcceptedState, - lastAcceptedMarker + lastAcceptedManifest ); - lastAcceptedMarker = committedMarker; + lastAcceptedManifest = committedManifest; } catch (Exception e) { handleExceptionOnWrite(e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java similarity index 91% rename from server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java rename to server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index b7c89965c6ac1..cac77f9996438 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -27,11 +27,11 @@ import java.util.Objects; /** - * Marker file which contains the details of the uploaded entity metadata + * Manifest file which contains the details of the uploaded entity metadata * * @opensearch.internal */ -public class ClusterMetadataMarker implements Writeable, ToXContentFragment { +public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); @@ -74,9 +74,9 @@ private static List indices(Object[] fields) { return (List) fields[7]; } - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "cluster_metadata_marker", - fields -> new ClusterMetadataMarker( + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> new ClusterMetadataManifest( term(fields), version(fields), clusterUUID(fields), @@ -144,7 +144,7 @@ public boolean isCommitted() { return committed; } - public ClusterMetadataMarker( + public ClusterMetadataManifest( long clusterTerm, long version, String clusterUUID, @@ -164,7 +164,7 @@ public ClusterMetadataMarker( this.indices = Collections.unmodifiableList(indices); } - public ClusterMetadataMarker(StreamInput in) throws IOException { + public ClusterMetadataManifest(StreamInput in) throws IOException { this.clusterTerm = in.readVLong(); this.stateVersion = in.readVLong(); this.clusterUUID = in.readString(); @@ -179,8 +179,8 @@ public static Builder builder() { return new Builder(); } - public static Builder builder(ClusterMetadataMarker marker) { - return new Builder(marker); + public static Builder builder(ClusterMetadataManifest manifest) { + return new Builder(manifest); } @Override @@ -222,7 +222,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final ClusterMetadataMarker that = (ClusterMetadataMarker) o; + final ClusterMetadataManifest that = (ClusterMetadataManifest) o; return Objects.equals(indices, that.indices) && clusterTerm == that.clusterTerm && stateVersion == that.stateVersion @@ -243,12 +243,12 @@ public String toString() { return Strings.toString(MediaTypeRegistry.JSON, this); } - public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException { + public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } /** - * Builder for ClusterMetadataMarker + * Builder for ClusterMetadataManifest * * @opensearch.internal */ @@ -311,19 +311,19 @@ public Builder() { indices = new ArrayList<>(); } - public Builder(ClusterMetadataMarker marker) { - this.clusterTerm = marker.clusterTerm; - this.stateVersion = marker.stateVersion; - this.clusterUUID = marker.clusterUUID; - this.stateUUID = marker.stateUUID; - this.opensearchVersion = marker.opensearchVersion; - this.nodeId = marker.nodeId; - this.committed = marker.committed; - this.indices = new ArrayList<>(marker.indices); + public Builder(ClusterMetadataManifest manifest) { + this.clusterTerm = manifest.clusterTerm; + this.stateVersion = manifest.stateVersion; + this.clusterUUID = manifest.clusterUUID; + this.stateUUID = manifest.stateUUID; + this.opensearchVersion = manifest.opensearchVersion; + this.nodeId = manifest.nodeId; + this.committed = manifest.committed; + this.indices = new ArrayList<>(manifest.indices); } - public ClusterMetadataMarker build() { - return new ClusterMetadataMarker( + public ClusterMetadataManifest build() { + return new ClusterMetadataManifest( clusterTerm, stateVersion, clusterUUID, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 17b7791277892..c0f55d60571ce 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -21,7 +21,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -52,7 +52,7 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_NAME_FORMAT = "%s.dat"; - public static final String METADATA_MARKER_NAME_FORMAT = "%s"; + public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", @@ -60,10 +60,10 @@ public class RemoteClusterStateService implements Closeable { IndexMetadata::fromXContent ); - public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MARKER_FORMAT = new ChecksumBlobStoreFormat<>( - "cluster-metadata-marker", - METADATA_MARKER_NAME_FORMAT, - ClusterMetadataMarker::fromXContent + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( + "cluster-metadata-manifest", + METADATA_MANIFEST_NAME_FORMAT, + ClusterMetadataManifest::fromXContent ); /** * Used to specify if cluster state metadata should be published to remote store @@ -116,10 +116,10 @@ public RemoteClusterStateService( * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. * - * @return A metadata/marker object which contains the details of uploaded entity metadata. + * @return A manifest object which contains the details of uploaded entity metadata. */ @Nullable - public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws IOException { + public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) throws IOException { final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -127,7 +127,7 @@ public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws } ensureRepositorySet(); - final List allUploadedIndexMetadata = new ArrayList<>(); + final List allUploadedIndexMetadata = new ArrayList<>(); // todo parallel upload // any validations before/after upload ? for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { @@ -145,7 +145,7 @@ public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws ); allUploadedIndexMetadata.add(uploadedIndexMetadata); } - final ClusterMetadataMarker marker = uploadMarker(clusterState, allUploadedIndexMetadata, false); + final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false); final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -162,21 +162,21 @@ public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws allUploadedIndexMetadata.size() ); } - return marker; + return manifest; } /** - * This method uploads the diff between the previous cluster state and the current cluster state. The previous marker file is needed to create the new - * marker. The new marker file is created by using the unchanged metadata from the previous marker and the new metadata changes from the current cluster + * This method uploads the diff between the previous cluster state and the current cluster state. The previous manifest file is needed to create the new + * manifest. The new manifest file is created by using the unchanged metadata from the previous manifest and the new metadata changes from the current cluster * state. * - * @return The uploaded ClusterMetadataMarker file + * @return The uploaded ClusterMetadataManifest file */ @Nullable - public ClusterMetadataMarker writeIncrementalMetadata( + public ClusterMetadataManifest writeIncrementalMetadata( ClusterState previousClusterState, ClusterState clusterState, - ClusterMetadataMarker previousMarker + ClusterMetadataManifest previousManifest ) throws IOException { final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { @@ -192,7 +192,7 @@ public ClusterMetadataMarker writeIncrementalMetadata( int numIndicesUpdated = 0; int numIndicesUnchanged = 0; - final Map allUploadedIndexMetadata = previousMarker.getIndices() + final Map allUploadedIndexMetadata = previousManifest.getIndices() .stream() .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { @@ -226,7 +226,7 @@ public ClusterMetadataMarker writeIncrementalMetadata( for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { allUploadedIndexMetadata.remove(removedIndexName); } - final ClusterMetadataMarker marker = uploadMarker( + final ClusterMetadataManifest manifest = uploadManifest( clusterState, allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), false @@ -249,20 +249,20 @@ public ClusterMetadataMarker writeIncrementalMetadata( numIndicesUnchanged ); } - return marker; + return manifest; } @Nullable - public ClusterMetadataMarker markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataMarker previousMarker) + public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); return null; } assert clusterState != null : "Last accepted cluster state is not set"; - assert previousMarker != null : "Last cluster metadata marker is not set"; + assert previousManifest != null : "Last cluster metadata manifest is not set"; assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; - return uploadMarker(clusterState, previousMarker.getIndices(), true); + return uploadManifest(clusterState, previousManifest.getIndices(), true); } public ClusterState getLatestClusterState(String clusterUUID) { @@ -289,14 +289,14 @@ void ensureRepositorySet() { blobStoreRepository = (BlobStoreRepository) repository; } - private ClusterMetadataMarker uploadMarker( + private ClusterMetadataManifest uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, boolean committed ) throws IOException { synchronized (this) { - final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version()); - final ClusterMetadataMarker marker = new ClusterMetadataMarker( + final String manifestFileName = getManifestFileName(clusterState.term(), clusterState.version()); + final ClusterMetadataManifest manifest = new ClusterMetadataManifest( clusterState.term(), clusterState.getVersion(), clusterState.metadata().clusterUUID(), @@ -306,8 +306,8 @@ private ClusterMetadataMarker uploadMarker( committed, uploadedIndexMetadata ); - writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName); - return marker; + writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); + return manifest; } } @@ -319,10 +319,10 @@ private String writeIndexMetadata(String clusterName, String clusterUUID, IndexM return indexMetadataContainer.path().buildAsString() + fileName; } - private void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker uploadMarker, String fileName) + private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { - final BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID); - CLUSTER_METADATA_MARKER_FORMAT.write(uploadMarker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor()); + final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); + CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); } private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { @@ -338,15 +338,15 @@ private BlobContainer indexMetadataContainer(String clusterName, String clusterU ); } - private BlobContainer markerContainer(String clusterName, String clusterUUID) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker + private BlobContainer manifestContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest return blobStoreRepository.blobStore() .blobContainer( blobStoreRepository.basePath() .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) .add("cluster-state") .add(clusterUUID) - .add("marker") + .add("manifest") ); } @@ -354,11 +354,11 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } - private static String getMarkerFileName(long term, long version) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/marker_2147483642_2147483637_456536447 + private static String getManifestFileName(long term, long version) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637_456536447 return String.join( DELIMITER, - "marker", + "manifest", RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version), RemoteStoreUtils.invertLong(System.currentTimeMillis()) diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index b34b7f3ffbd1f..47fea55242240 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -61,7 +61,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.env.TestEnvironment; import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; -import org.opensearch.gateway.remote.ClusterMetadataMarker; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.test.OpenSearchTestCase; @@ -657,10 +657,10 @@ Directory createDirectory(Path path) { public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); - final ClusterMetadataMarker marker = ClusterMetadataMarker.builder().clusterTerm(1L).stateVersion(5L).build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any())).thenReturn(marker); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any())).thenReturn(manifest); - Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(marker); + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService); assertThat(remotePersistedState.getLastAcceptedState(), nullValue()); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java similarity index 82% rename from server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java rename to server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 22a3383ac035e..eafa191581d65 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -15,7 +15,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -25,11 +25,11 @@ import java.util.Collections; import java.util.List; -public class ClusterMetadataMarkerTests extends OpenSearchTestCase { +public class ClusterMetadataManifestTests extends OpenSearchTestCase { - public void testClusterMetadataMarkerXContent() throws IOException { + public void testClusterMetadataManifestXContent() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); - ClusterMetadataMarker originalMarker = new ClusterMetadataMarker( + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( 1L, 1L, "test-cluster-uuid", @@ -41,17 +41,17 @@ public void testClusterMetadataMarkerXContent() throws IOException { ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); - originalMarker.toXContent(builder, ToXContent.EMPTY_PARAMS); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { - final ClusterMetadataMarker fromXContentMarker = ClusterMetadataMarker.fromXContent(parser); - assertEquals(originalMarker, fromXContentMarker); + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser); + assertEquals(originalManifest, fromXContentManifest); } } - public void testClusterMetadataMarkerSerializationEqualsHashCode() { - ClusterMetadataMarker initialMarker = new ClusterMetadataMarker( + public void testClusterMetadataManifestSerializationEqualsHashCode() { + ClusterMetadataManifest initialManifest = new ClusterMetadataManifest( randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(10), @@ -62,10 +62,14 @@ public void testClusterMetadataMarkerSerializationEqualsHashCode() { randomUploadedIndexMetadataList() ); EqualsHashCodeTestUtils.checkEqualsAndHashCode( - initialMarker, - orig -> OpenSearchTestCase.copyWriteable(orig, new NamedWriteableRegistry(Collections.emptyList()), ClusterMetadataMarker::new), - marker -> { - ClusterMetadataMarker.Builder builder = ClusterMetadataMarker.builder(marker); + initialManifest, + orig -> OpenSearchTestCase.copyWriteable( + orig, + new NamedWriteableRegistry(Collections.emptyList()), + ClusterMetadataManifest::new + ), + manifest -> { + ClusterMetadataManifest.Builder builder = ClusterMetadataManifest.builder(manifest); switch (randomInt(7)) { case 0: builder.clusterTerm(randomNonNegativeLong()); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 2323f7abac3c9..86133be86ff36 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -22,7 +22,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; -import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; @@ -76,8 +76,8 @@ public void setup() { public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); - final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState); - Assert.assertThat(marker, nullValue()); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + Assert.assertThat(manifest, nullValue()); } public void testFailWriteFullMetadataWhenRemoteStateDisabled() throws IOException { @@ -111,11 +111,11 @@ public void testFailWriteFullMetadataWhenNotBlobRepository() { public void testWriteFullMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); - final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); - final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(indices) .clusterTerm(1L) .stateVersion(1L) @@ -123,20 +123,20 @@ public void testWriteFullMetadataSuccess() throws IOException { .clusterUUID("cluster-uuid") .build(); - assertThat(marker.getIndices().size(), is(1)); - assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); - assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); - assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue()); - assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm())); - assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion())); - assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); - assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); - final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); - Assert.assertThat(marker, nullValue()); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); + Assert.assertThat(manifest, nullValue()); } public void testFailWriteIncrementalMetadataWhenTermChanged() { @@ -159,18 +159,18 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) .build(); - final ClusterMetadataMarker previousMarker = ClusterMetadataMarker.builder().indices(Collections.emptyList()).build(); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); remoteClusterStateService.ensureRepositorySet(); - final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata( + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( previousClusterState, clusterState, - previousMarker + previousManifest ); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final List indices = List.of(uploadedIndexMetadata); - final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(indices) .clusterTerm(1L) .stateVersion(1L) @@ -178,14 +178,14 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { .clusterUUID("cluster-uuid") .build(); - assertThat(marker.getIndices().size(), is(1)); - assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); - assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); - assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue()); - assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm())); - assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion())); - assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); - assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } public void testMarkLastStateAsCommittedSuccess() throws IOException { @@ -194,11 +194,11 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { remoteClusterStateService.ensureRepositorySet(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); - final ClusterMetadataMarker previousMarker = ClusterMetadataMarker.builder().indices(indices).build(); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); - final ClusterMetadataMarker marker = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousMarker); + final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest); - final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(indices) .clusterTerm(1L) .stateVersion(1L) @@ -206,14 +206,14 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { .clusterUUID("cluster-uuid") .build(); - assertThat(marker.getIndices().size(), is(1)); - assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); - assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); - assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue()); - assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm())); - assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion())); - assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); - assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } private void mockBlobStoreObjects() { From 42af4dd8885c5d5bf329c2ea5e8f86a449791b99 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Thu, 31 Aug 2023 07:36:52 +0530 Subject: [PATCH 09/10] Remove redundant remote state assertions Signed-off-by: Sooraj Sinha --- .../opensearch/gateway/remote/RemoteClusterStateService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index c0f55d60571ce..491c04bab3adb 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -184,7 +184,6 @@ public ClusterMetadataManifest writeIncrementalMetadata( return null; } assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); - assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; final Map previousStateIndexMetadataVersionByName = new HashMap<>(); for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); @@ -261,7 +260,6 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat } assert clusterState != null : "Last accepted cluster state is not set"; assert previousManifest != null : "Last cluster metadata manifest is not set"; - assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; return uploadManifest(clusterState, previousManifest.getIndices(), true); } From 99734f1a98379c366b86bca4203c7e6490f67de8 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Thu, 31 Aug 2023 10:45:15 +0530 Subject: [PATCH 10/10] Fix Unit test after remove state enabled assertion Signed-off-by: Sooraj Sinha --- .../gateway/remote/RemoteClusterStateServiceTests.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 86133be86ff36..215673642cce5 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -44,7 +44,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class RemoteClusterStateServiceTests extends OpenSearchTestCase { @@ -80,10 +79,11 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException Assert.assertThat(manifest, nullValue()); } - public void testFailWriteFullMetadataWhenRemoteStateDisabled() throws IOException { + public void testFailInitializationWhenRemoteStateDisabled() throws IOException { final Settings settings = Settings.builder().build(); - remoteClusterStateService = spy( - new RemoteClusterStateService( + assertThrows( + AssertionError.class, + () -> new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, settings, @@ -91,8 +91,6 @@ public void testFailWriteFullMetadataWhenRemoteStateDisabled() throws IOExceptio () -> 0L ) ); - final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); - assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); } public void testFailWriteFullMetadataWhenRepositoryNotSet() {