From 4632f4b17e5bbd6077104d7b68f2bd28336a5922 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Wed, 18 Oct 2023 17:11:26 +0530 Subject: [PATCH] Upload global cluster state to remote store (#10404) * Upload global cluster state to remote store Signed-off-by: Dhwanil Patel --- CHANGELOG.md | 5 +- .../remote/ClusterMetadataManifest.java | 128 ++++++++- .../remote/RemoteClusterStateService.java | 201 +++++++++++++- .../blobstore/BlobStoreRepository.java | 8 +- .../blobstore/ChecksumBlobStoreFormat.java | 15 +- .../coordination/CoordinationStateTests.java | 2 + .../remote/ClusterMetadataManifestTests.java | 31 +++ .../RemoteClusterStateServiceTests.java | 260 +++++++++++++++++- .../snapshots/BlobStoreFormatTests.java | 6 +- 9 files changed, 617 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3471564dba15e..243423223ca52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618)) - [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10541](https://github.com/opensearch-project/OpenSearch/pull/10541)) - [Admission control] Add Resource usage collector service and resource usage tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890)) -- Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557)) +- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557)) +- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 @@ -121,4 +122,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 40b16f3d6323b..97b37d9532f85 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -33,6 +33,9 @@ */ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { + public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest. + public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file. + private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); @@ -40,6 +43,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version"); private static final ParseField NODE_ID_FIELD = new ParseField("node_id"); private static final ParseField COMMITTED_FIELD = new ParseField("committed"); + private static final ParseField CODEC_VERSION_FIELD = new ParseField("codec_version"); + private static final ParseField GLOBAL_METADATA_FIELD = new ParseField("global_metadata"); private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid"); private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed"); @@ -84,7 +89,33 @@ private static boolean clusterUUIDCommitted(Object[] fields) { return (boolean) fields[9]; } - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + private static int codecVersion(Object[] fields) { + return (int) fields[10]; + } + + private static String globalMetadataFileName(Object[] fields) { + return (String) fields[11]; + } + + private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> new ClusterMetadataManifest( + term(fields), + version(fields), + clusterUUID(fields), + stateUUID(fields), + opensearchVersion(fields), + nodeId(fields), + committed(fields), + CODEC_V0, + null, + indices(fields), + previousClusterUUID(fields), + clusterUUIDCommitted(fields) + ) + ); + + private static final ConstructingObjectParser PARSER_V1 = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> new ClusterMetadataManifest( term(fields), @@ -94,29 +125,45 @@ private static boolean clusterUUIDCommitted(Object[] fields) { opensearchVersion(fields), nodeId(fields), committed(fields), + codecVersion(fields), + globalMetadataFileName(fields), indices(fields), previousClusterUUID(fields), clusterUUIDCommitted(fields) ) ); + private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V1; + static { - PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD); - PARSER.declareObjectArray( + declareParser(PARSER_V0, CODEC_V0); + declareParser(PARSER_V1, CODEC_V1); + } + + private static void declareParser(ConstructingObjectParser parser, long codec_version) { + parser.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD); + parser.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); + parser.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD); + parser.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD); + parser.declareObjectArray( ConstructingObjectParser.constructorArg(), (p, c) -> UploadedIndexMetadata.fromXContent(p), INDICES_FIELD ); - PARSER.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED); + parser.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID); + parser.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED); + + if (codec_version >= CODEC_V1) { + parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD); + } } + private final int codecVersion; + private final String globalMetadataFileName; private final List indices; private final long clusterTerm; private final long stateVersion; @@ -168,6 +215,14 @@ public boolean isClusterUUIDCommitted() { return clusterUUIDCommitted; } + public int getCodecVersion() { + return codecVersion; + } + + public String getGlobalMetadataFileName() { + return globalMetadataFileName; + } + public ClusterMetadataManifest( long clusterTerm, long version, @@ -176,6 +231,8 @@ public ClusterMetadataManifest( Version opensearchVersion, String nodeId, boolean committed, + int codecVersion, + String globalMetadataFileName, List indices, String previousClusterUUID, boolean clusterUUIDCommitted @@ -187,6 +244,8 @@ public ClusterMetadataManifest( this.opensearchVersion = opensearchVersion; this.nodeId = nodeId; this.committed = committed; + this.codecVersion = codecVersion; + this.globalMetadataFileName = globalMetadataFileName; this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; @@ -203,6 +262,13 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); this.previousClusterUUID = in.readString(); this.clusterUUIDCommitted = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.codecVersion = in.readInt(); + this.globalMetadataFileName = in.readString(); + } else { + this.codecVersion = CODEC_V0; // Default codec + this.globalMetadataFileName = null; + } } public static Builder builder() { @@ -231,6 +297,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endArray(); builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID()); builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted()); + if (onOrAfterCodecVersion(CODEC_V1)) { + builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); + builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); + } return builder; } @@ -246,6 +316,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(indices); out.writeString(previousClusterUUID); out.writeBoolean(clusterUUIDCommitted); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeInt(codecVersion); + out.writeString(globalMetadataFileName); + } } @Override @@ -266,12 +340,16 @@ public boolean equals(Object o) { && Objects.equals(nodeId, that.nodeId) && Objects.equals(committed, that.committed) && Objects.equals(previousClusterUUID, that.previousClusterUUID) - && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted); + && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted) + && Objects.equals(globalMetadataFileName, that.globalMetadataFileName) + && Objects.equals(codecVersion, that.codecVersion); } @Override public int hashCode() { return Objects.hash( + codecVersion, + globalMetadataFileName, indices, clusterTerm, stateVersion, @@ -290,8 +368,16 @@ public String toString() { return Strings.toString(MediaTypeRegistry.JSON, this); } + public boolean onOrAfterCodecVersion(int codecVersion) { + return this.codecVersion >= codecVersion; + } + + public static ClusterMetadataManifest fromXContentV0(XContentParser parser) throws IOException { + return PARSER_V0.parse(parser, null); + } + public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); + return CURRENT_PARSER.parse(parser, null); } /** @@ -301,6 +387,8 @@ public static ClusterMetadataManifest fromXContent(XContentParser parser) throws */ public static class Builder { + private String globalMetadataFileName; + private int codecVersion; private List indices; private long clusterTerm; private long stateVersion; @@ -317,6 +405,16 @@ public Builder indices(List indices) { return this; } + public Builder codecVersion(int codecVersion) { + this.codecVersion = codecVersion; + return this; + } + + public Builder globalMetadataFileName(String globalMetadataFileName) { + this.globalMetadataFileName = globalMetadataFileName; + return this; + } + public Builder clusterTerm(long clusterTerm) { this.clusterTerm = clusterTerm; return this; @@ -378,6 +476,8 @@ public Builder(ClusterMetadataManifest manifest) { this.opensearchVersion = manifest.opensearchVersion; this.nodeId = manifest.nodeId; this.committed = manifest.committed; + this.globalMetadataFileName = manifest.globalMetadataFileName; + this.codecVersion = manifest.codecVersion; this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; @@ -392,6 +492,8 @@ public ClusterMetadataManifest build() { opensearchVersion, nodeId, committed, + codecVersion, + globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 0cf97de53d5f3..2092c2a0aac3f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -15,6 +15,7 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; @@ -27,6 +28,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -55,6 +57,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -80,7 +83,9 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + // TODO make this two variable as dynamic setting [issue: #10688] public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; + public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000; public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", @@ -88,11 +93,27 @@ public class RemoteClusterStateService implements Closeable { IndexMetadata::fromXContent ); + public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "metadata", + METADATA_NAME_FORMAT, + Metadata::fromXContent + ); + + /** + * Manifest format compatible with older codec v0, where codec version was missing. + */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V0 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0); + + /** + * Manifest format compatible with codec v1, where we introduced codec versions/global metadata. + */ public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( "cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContent ); + /** * Used to specify if cluster state metadata should be published to remote store */ @@ -105,9 +126,11 @@ public class RemoteClusterStateService implements Closeable { public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state"; public static final String INDEX_PATH_TOKEN = "index"; + public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata"; public static final String MANIFEST_PATH_TOKEN = "manifest"; public static final String MANIFEST_FILE_PREFIX = "manifest"; - public static final String INDEX_METADATA_FILE_PREFIX = "metadata"; + public static final String METADATA_FILE_PREFIX = "metadata"; + public static final int SPLITED_MANIFEST_FILE_LENGTH = 6; // file name manifest__term__version__C/P__timestamp__codecversion private final String nodeId; private final Supplier repositoriesService; @@ -121,7 +144,17 @@ public class RemoteClusterStateService implements Closeable { private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; - public static final int MANIFEST_CURRENT_CODEC_VERSION = 1; + public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V1; + public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; + + // ToXContent Params with gateway mode. + // We are using gateway context mode to persist all custom metadata. + public static final ToXContent.Params FORMAT_PARAMS; + static { + Map params = new HashMap<>(1); + params.put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY); + FORMAT_PARAMS = new ToXContent.MapParams(params); + } public RemoteClusterStateService( String nodeId, @@ -162,12 +195,22 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri return null; } + // TODO: we can upload global metadata and index metadata in parallel. [issue: #10645] + // Write globalMetadata + String globalMetadataFile = writeGlobalMetadata(clusterState); + // any validations before/after upload ? final List allUploadedIndexMetadata = writeIndexMetadataParallel( clusterState, new ArrayList<>(clusterState.metadata().indices().values()) ); - final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, false); + final ClusterMetadataManifest manifest = uploadManifest( + clusterState, + allUploadedIndexMetadata, + previousClusterUUID, + globalMetadataFile, + false + ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -206,6 +249,22 @@ public ClusterMetadataManifest writeIncrementalMetadata( return null; } assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); + + // Write Global Metadata + final boolean updateGlobalMetadata = Metadata.isGlobalStateEquals( + previousClusterState.metadata(), + clusterState.metadata() + ) == false; + String globalMetadataFile; + // For migration case from codec V0 to V1, we have added null check on global metadata file, + // If file is empty and codec is 1 then write global metadata. + if (updateGlobalMetadata || previousManifest.getGlobalMetadataFileName() == null) { + globalMetadataFile = writeGlobalMetadata(clusterState); + } else { + globalMetadataFile = previousManifest.getGlobalMetadataFileName(); + } + + // Write Index Metadata final Map previousStateIndexMetadataVersionByName = new HashMap<>(); for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); @@ -248,6 +307,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), previousManifest.getPreviousClusterUUID(), + globalMetadataFile, false ); deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); @@ -273,6 +333,59 @@ public ClusterMetadataManifest writeIncrementalMetadata( return manifest; } + /** + * Uploads provided ClusterState's global Metadata to remote store in parallel. + * The call is blocking so the method waits for upload to finish and then return. + * + * @param clusterState current ClusterState + * @return String file name where globalMetadata file is stored. + */ + private String writeGlobalMetadata(ClusterState clusterState) throws IOException { + + AtomicReference result = new AtomicReference(); + final BlobContainer globalMetadataContainer = globalMetadataContainer( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ); + final String globalMetadataFilename = globalMetadataFileName(clusterState.metadata()); + + // latch to wait until upload is not finished + CountDownLatch latch = new CountDownLatch(1); + + LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> { + logger.trace(String.format(Locale.ROOT, "GlobalMetadata uploaded successfully.")); + result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); + }, ex -> { throw new GlobalMetadataTransferException(ex.getMessage(), ex); }), latch); + + GLOBAL_METADATA_FORMAT.writeAsync( + clusterState.metadata(), + globalMetadataContainer, + globalMetadataFilename, + blobStoreRepository.getCompressor(), + completionListener, + FORMAT_PARAMS + ); + + try { + if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + // TODO: We should add metrics where transfer is timing out. [Issue: #10687] + GlobalMetadataTransferException ex = new GlobalMetadataTransferException( + String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") + ); + throw ex; + } + } catch (InterruptedException ex) { + GlobalMetadataTransferException exception = new GlobalMetadataTransferException( + String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"), + ex + ); + Thread.currentThread().interrupt(); + throw exception; + } + + return result.get(); + } + /** * Uploads provided IndexMetadata's to remote store in parallel. The call is blocking so the method waits for upload to finish and then return. * @@ -381,7 +494,8 @@ private void writeIndexMetadataAsync( indexMetadataContainer, indexMetadataFilename, blobStoreRepository.getCompressor(), - completionListener + completionListener, + FORMAT_PARAMS ); } @@ -398,6 +512,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), + previousManifest.getGlobalMetadataFileName(), true ); deleteStaleClusterUUIDs(clusterState, committedManifest); @@ -426,6 +541,7 @@ private ClusterMetadataManifest uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, String previousClusterUUID, + String globalClusterMetadataFileName, boolean committed ) throws IOException { synchronized (this) { @@ -438,6 +554,8 @@ private ClusterMetadataManifest uploadManifest( Version.CURRENT, nodeId, committed, + MANIFEST_CURRENT_CODEC_VERSION, + globalClusterMetadataFileName, uploadedIndexMetadata, previousClusterUUID, clusterState.metadata().clusterUUIDCommitted() @@ -469,6 +587,12 @@ private BlobContainer indexMetadataContainer(String clusterName, String clusterU .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(INDEX_PATH_TOKEN).add(indexUUID)); } + private BlobContainer globalMetadataContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/ + return blobStoreRepository.blobStore() + .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(GLOBAL_METADATA_PATH_TOKEN)); + } + private BlobContainer manifestContainer(String clusterName, String clusterUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); @@ -510,7 +634,7 @@ static String indexMetadataFileName(IndexMetadata indexMetadata) { // version> return String.join( DELIMITER, - INDEX_METADATA_FILE_PREFIX, + METADATA_FILE_PREFIX, RemoteStoreUtils.invertLong(indexMetadata.getVersion()), RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION) // Keep the codec version at last place only, during read we reads last @@ -518,6 +642,17 @@ static String indexMetadataFileName(IndexMetadata indexMetadata) { ); } + private static String globalMetadataFileName(Metadata metadata) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/metadata______ + return String.join( + DELIMITER, + METADATA_FILE_PREFIX, + RemoteStoreUtils.invertLong(metadata.version()), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) + ); + } + private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); } @@ -601,7 +736,8 @@ public String getLastKnownUUIDFromRemote(String clusterName) { return validChain.get(0); } catch (IOException e) { throw new IllegalStateException( - String.format(Locale.ROOT, "Error while fetching previous UUIDs from remote store for cluster name: %s", clusterName) + String.format(Locale.ROOT, "Error while fetching previous UUIDs from remote store for cluster name: %s", clusterName), + e ); } } @@ -622,7 +758,8 @@ private Map getLatestManifestForAllClusterUUIDs manifest.ifPresent(clusterMetadataManifest -> manifestsByClusterUUID.put(clusterUUID, clusterMetadataManifest)); } catch (Exception e) { throw new IllegalStateException( - String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID) + String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID), + e ); } } @@ -788,7 +925,7 @@ private Optional getLatestManifestFileName(String clusterName, String cl private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) throws IllegalStateException { try { - return RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.read( + return getClusterMetadataManifestBlobStoreFormat(filename).read( manifestContainer(clusterName, clusterUUID), filename, blobStoreRepository.getNamedXContentRegistry() @@ -798,6 +935,29 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste } } + private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat(String fileName) { + long codecVersion = getManifestCodecVersion(fileName); + if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { + return CLUSTER_METADATA_MANIFEST_FORMAT; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V0; + } + + throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); + } + + private int getManifestCodecVersion(String fileName) { + String[] splitName = fileName.split(DELIMITER); + if (splitName.length == SPLITED_MANIFEST_FILE_LENGTH) { + return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. + } else if (splitName.length < SPLITED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 + // is used. + return ClusterMetadataManifest.CODEC_V0; + } else { + throw new IllegalArgumentException("Manifest file name is corrupted"); + } + } + public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } @@ -816,6 +976,20 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { } } + /** + * Exception for GlobalMetadata transfer failures to remote + */ + static class GlobalMetadataTransferException extends RuntimeException { + + public GlobalMetadataTransferException(String errorDesc) { + super(errorDesc); + } + + public GlobalMetadataTransferException(String errorDesc, Throwable cause) { + super(errorDesc, cause); + } + } + /** * Purges all remote cluster state against provided cluster UUIDs * @@ -907,6 +1081,7 @@ private void deleteClusterMetadata( Set filesToKeep = new HashSet<>(); Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); + Set staleGlobalMetadataPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( clusterName, @@ -915,6 +1090,7 @@ private void deleteClusterMetadata( ); clusterMetadataManifest.getIndices() .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( @@ -923,6 +1099,14 @@ private void deleteClusterMetadata( blobMetadata.name() ); staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); + if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) { + String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + globalMetadataSplitPath[globalMetadataSplitPath.length - 1] + ) + ); + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { staleIndexMetadataPaths.add( @@ -938,6 +1122,7 @@ private void deleteClusterMetadata( return; } + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); } catch (IllegalStateException e) { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 69883e0d19c8d..c48efa9d3d2ee 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -173,6 +173,7 @@ import java.util.stream.Stream; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; +import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; /** * BlobStore - based implementation of Snapshot Repository @@ -3333,7 +3334,12 @@ private void writeShardIndexBlobAtomic( () -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path()) ); final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration)); - writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor), true); + writeAtomic( + shardContainer, + blobName, + INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS), + true + ); } // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 7e1960171043a..17cb68f798094 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -83,7 +83,7 @@ public final class ChecksumBlobStoreFormat { // Serialization parameters to specify correct context for metadata serialization - private static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS; + public static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS; static { Map snapshotOnlyParams = new HashMap<>(); @@ -171,7 +171,7 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr */ public void write(final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor) throws IOException { final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor); + final BytesReference bytes = serialize(obj, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS); blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } @@ -184,13 +184,15 @@ public void write(final T obj, final BlobContainer blobContainer, final String n * @param name blob name * @param compressor whether to use compression * @param listener listener to listen to write result + * @param params ToXContent params */ public void writeAsync( final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor, - ActionListener listener + ActionListener listener, + final ToXContent.Params params ) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { write(obj, blobContainer, name, compressor); @@ -198,7 +200,7 @@ public void writeAsync( return; } final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor); + final BytesReference bytes = serialize(obj, blobName, compressor, params); final String resourceDescription = "ChecksumBlobStoreFormat.writeAsync(blob=\"" + blobName + "\")"; try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { long expectedChecksum; @@ -230,7 +232,8 @@ public void writeAsync( } } - public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) + throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -254,7 +257,7 @@ public void close() throws IOException { ) ) { builder.startObject(); - obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); + obj.toXContent(builder, params); builder.endObject(); } CodecUtil.writeFooter(indexOutput); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index f37823d2c0c7d..1c0dc7fc1ca2d 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -938,6 +938,8 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep Version.CURRENT, randomAlphaOfLength(10), false, + 1, + randomAlphaOfLength(10), Collections.emptyList(), randomAlphaOfLength(10), true diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 66426c2a880a3..6c9a3201656d7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -26,6 +26,33 @@ public class ClusterMetadataManifestTests extends OpenSearchTestCase { + public void testClusterMetadataManifestXContentV0() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid", + Version.CURRENT, + "test-node-id", + false, + ClusterMetadataManifest.CODEC_V0, + null, + Collections.singletonList(uploadedIndexMetadata), + "prev-cluster-uuid", + true + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV0(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + public void testClusterMetadataManifestXContent() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( @@ -36,6 +63,8 @@ public void testClusterMetadataManifestXContent() throws IOException { Version.CURRENT, "test-node-id", false, + ClusterMetadataManifest.CODEC_V1, + "test-global-metadata-file", Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", true @@ -60,6 +89,8 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { Version.CURRENT, "B10RX1f5RJenMQvYccCgSQ", true, + 1, + "test-global-metadata-file", randomUploadedIndexMetadataList(), "yfObdx8KSMKKrXf8UyHhM", true diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 119d19cc34981..ddee3bcf10382 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -66,10 +66,11 @@ import org.mockito.ArgumentMatchers; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -232,14 +233,15 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 1); - assertEquals(writeContextArgumentCaptor.getAllValues().size(), 1); + assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 2); + assertEquals(writeContextArgumentCaptor.getAllValues().size(), 2); WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue(); byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes(); @@ -263,7 +265,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { } - public void testWriteFullMetadataInParallelFailure() throws IOException { + public void testWriteFullMetadataFailureForGlobalMetadata() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); @@ -274,6 +276,27 @@ public void testWriteFullMetadataInParallelFailure() throws IOException { return null; }).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture()); + remoteClusterStateService.start(); + assertThrows( + RemoteClusterStateService.GlobalMetadataTransferException.class, + () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) + ); + } + + public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onFailure(new RuntimeException("Cannot upload to remote")); + return null; + }).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture()); + remoteClusterStateService.start(); assertThrows( RemoteClusterStateService.IndexMetadataTransferException.class, @@ -338,6 +361,207 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } + /* + * Here we will verify the migration of manifest file from codec V0 and V1. + * + * Initially codec version is 0 and global metadata is also null, we will perform index metadata update. + * In final manifest codec version should be 1 and + * global metadata should be updated, even if it was not changed in this cluster state update + */ + public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOException { + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .nodes(nodesWithLocalNodeClusterManager()) + .build(); + + // Update only index metadata + final IndexMetadata indexMetadata = new IndexMetadata.Builder("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(0).build(); + Metadata newMetadata = Metadata.builder(previousClusterState.metadata()).put(indexMetadata, true).build(); + ClusterState newClusterState = ClusterState.builder(previousClusterState).metadata(newMetadata).build(); + + // previous manifest with codec 0 and null global metadata + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .codecVersion(ClusterMetadataManifest.CODEC_V0) + .globalMetadataFileName(null) + .indices(Collections.emptyList()) + .build(); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifestAfterUpdate = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + newClusterState, + previousManifest + ); + + // global metadata is updated + assertThat(manifestAfterUpdate.getGlobalMetadataFileName(), notNullValue()); + // Manifest file with codec version with 1 is updated. + assertThat(manifestAfterUpdate.getCodecVersion(), is(ClusterMetadataManifest.CODEC_V1)); + } + + public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .codecVersion(2) + .globalMetadataFileName("global-metadata-file") + .indices(Collections.emptyList()) + .build(); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousManifest + ); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(Collections.emptyList()) + .globalMetadataFileName("mock-filename") + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .build(); + + assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + } + + /* + * Here we will verify index metadata is not uploaded again if change is only in global metadata + */ + public void testGlobalMetadataOnlyUpdated() throws IOException { + // setup + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() + .codecVersion(2) + .globalMetadataFileName("global-metadata-file") + .indices(Collections.emptyList()) + .build(); + remoteClusterStateService.start(); + + // Initial cluster state with index. + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + // Updating remote cluster state with changing index metadata + final ClusterMetadataManifest manifestAfterIndexMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + initialClusterState, + clusterState, + initialManifest + ); + + // new cluster state where only global metadata is different + Metadata newMetadata = Metadata.builder(clusterState.metadata()) + .persistentSettings(Settings.builder().put("cluster.blocks.read_only", true).build()) + .build(); + ClusterState newClusterState = ClusterState.builder(clusterState).metadata(newMetadata).build(); + + // updating remote cluster state with global metadata + final ClusterMetadataManifest manifestAfterGlobalMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + clusterState, + newClusterState, + manifestAfterIndexMetadataUpdate + ); + + // Verify that index metadata information is same in manifest files + assertThat(manifestAfterIndexMetadataUpdate.getIndices().size(), is(manifestAfterGlobalMetadataUpdate.getIndices().size())); + assertThat( + manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexName(), + is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexName()) + ); + assertThat( + manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexUUID(), + is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexUUID()) + ); + + // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata update + assertThat( + manifestAfterIndexMetadataUpdate.getIndices().get(0).getUploadedFilename(), + is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getUploadedFilename()) + ); + + // global metadata file would have changed + assertFalse( + manifestAfterIndexMetadataUpdate.getGlobalMetadataFileName() + .equalsIgnoreCase(manifestAfterGlobalMetadataUpdate.getGlobalMetadataFileName()) + ); + } + + /* + * Here we will verify global metadata is not uploaded again if change is only in index metadata + */ + public void testIndexMetadataOnlyUpdated() throws IOException { + // setup + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() + .codecVersion(2) + .indices(Collections.emptyList()) + .build(); + remoteClusterStateService.start(); + + // Initial cluster state with global metadata. + final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); + + // Updating remote cluster state with changing global metadata + final ClusterMetadataManifest manifestAfterGlobalMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + initialClusterState, + clusterState, + initialManifest + ); + + // new cluster state where only Index metadata is different + final IndexMetadata indexMetadata = new IndexMetadata.Builder("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(0).build(); + Metadata newMetadata = Metadata.builder(clusterState.metadata()).put(indexMetadata, true).build(); + ClusterState newClusterState = ClusterState.builder(clusterState).metadata(newMetadata).build(); + + // updating remote cluster state with index metadata + final ClusterMetadataManifest manifestAfterIndexMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + clusterState, + newClusterState, + manifestAfterGlobalMetadataUpdate + ); + + // Verify that global metadata information is same in manifest files after updating index Metadata + // since timestamp is part of file name, if file name is same we can confirm that file is not update in index metadata update + assertThat( + manifestAfterIndexMetadataUpdate.getGlobalMetadataFileName(), + is(manifestAfterGlobalMetadataUpdate.getGlobalMetadataFileName()) + ); + + // Index metadata would have changed + assertThat(manifestAfterGlobalMetadataUpdate.getIndices().size(), is(0)); + assertThat(manifestAfterIndexMetadataUpdate.getIndices().size(), is(1)); + } + public void testReadLatestMetadataManifestFailedIOException() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); @@ -402,6 +626,7 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") + .codecVersion(ClusterMetadataManifest.CODEC_V0) .build(); BlobContainer blobContainer = mockBlobStoreObjects(); @@ -458,6 +683,7 @@ public void testReadLatestMetadataManifestSuccess() throws IOException { .clusterUUID("cluster-uuid") .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .codecVersion(ClusterMetadataManifest.CODEC_V0) .previousClusterUUID("prev-cluster-uuid") .build(); @@ -504,6 +730,7 @@ public void testReadLatestIndexMetadataSuccess() throws IOException { .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") + .codecVersion(ClusterMetadataManifest.CODEC_V0) .build(); mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata)); @@ -691,7 +918,7 @@ public void testFileNames() { String indexMetadataFileName = RemoteClusterStateService.indexMetadataFileName(indexMetadata); String[] splittedIndexMetadataFileName = indexMetadataFileName.split(DELIMITER); assertThat(indexMetadataFileName.split(DELIMITER).length, is(4)); - assertThat(splittedIndexMetadataFileName[0], is(INDEX_METADATA_FILE_PREFIX)); + assertThat(splittedIndexMetadataFileName[0], is(METADATA_FILE_PREFIX)); assertThat(splittedIndexMetadataFileName[1], is(RemoteStoreUtils.invertLong(indexMetadata.getVersion()))); assertThat(splittedIndexMetadataFileName[3], is(String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION))); @@ -820,6 +1047,7 @@ private ClusterMetadataManifest generateClusterMetadataManifest( .previousClusterUUID(previousClusterUUID) .committed(true) .clusterUUIDCommitted(true) + .globalMetadataFileName("test-global-metadata") .build(); } @@ -859,7 +1087,8 @@ private void mockBlobContainer( BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize( clusterMetadataManifest, "manifestFileName", - blobStoreRepository.getCompressor() + blobStoreRepository.getCompressor(), + FORMAT_PARAMS ); when(blobContainer.readBlob("manifestFileName")).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes())); @@ -873,7 +1102,8 @@ private void mockBlobContainer( BytesReference bytesIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.serialize( indexMetadata, fileName, - blobStoreRepository.getCompressor() + blobStoreRepository.getCompressor(), + FORMAT_PARAMS ); when(blobContainer.readBlob(fileName + ".dat")).thenReturn( new ByteArrayInputStream(bytesIndexMetadata.streamInput().readAllBytes()) @@ -884,6 +1114,22 @@ private void mockBlobContainer( }); } + private static ClusterState.Builder generateClusterStateWithGlobalMetadata() { + final Settings clusterSettings = Settings.builder().put("cluster.blocks.read_only", true).build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder() + .persistentSettings(clusterSettings) + .clusterUUID("cluster-uuid") + .coordinationMetadata(coordinationMetadata) + .build() + ); + } + private static ClusterState.Builder generateClusterStateWithOneIndex() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 03f0d27188027..c114b56bd0b39 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -152,14 +152,16 @@ public void onFailure(Exception e) { mockBlobContainer, "check-smile", CompressorRegistry.none(), - actionListener + actionListener, + ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS ); checksumSMILE.writeAsync( new BlobObj("checksum smile compressed"), mockBlobContainer, "check-smile-comp", CompressorRegistry.getCompressor(DeflateCompressor.NAME), - actionListener + actionListener, + ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS ); latch.await();