From f41dc14696bfa3b20299b300321f7302002f1d69 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Sat, 21 Oct 2023 15:51:26 +0530 Subject: [PATCH] Fix custom metadata not getting stored for remote store not supporting async write Signed-off-by: Dhwanil Patel --- .../RemoteStoreClusterStateRestoreIT.java | 28 +++++++++++-------- .../remote/RemoteClusterStateService.java | 8 +++++- .../blobstore/ChecksumBlobStoreFormat.java | 25 +++++++++++++++-- 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java index c2cb7cc60f152..f0863966fa222 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.file.Files; +import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -233,8 +234,7 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception { String prevClusterUUID = clusterService().state().metadata().clusterUUID(); // Create global metadata - register a custom repo - // TODO - uncomment after all customs is also uploaded for all repos - https://github.com/opensearch-project/OpenSearch/issues/10691 - // registerCustomRepository(); + Path repoPath = registerCustomRepository(); // Create global metadata - persistent settings updatePersistentSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 34).build()); @@ -263,30 +263,36 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception { verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false); // validate global metadata restored - verifyRestoredRepositories(); + verifyRestoredRepositories(repoPath); verifyRestoredIndexTemplate(); } - private void registerCustomRepository() { + private Path registerCustomRepository() { + Path path = randomRepoPath(); assertAcked( client().admin() .cluster() .preparePutRepository("custom-repo") .setType("fs") - .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", false)) + .setSettings(Settings.builder().put("location", path).put("compress", false)) .get() ); + return path; } - private void verifyRestoredRepositories() { + private void verifyRestoredRepositories(Path repoPath) { RepositoriesMetadata repositoriesMetadata = clusterService().state().metadata().custom(RepositoriesMetadata.TYPE); - assertEquals(2, repositoriesMetadata.repositories().size()); // includes remote store repo as well + assertEquals(3, repositoriesMetadata.repositories().size()); // includes remote store repo as well assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_NAME).settings())); assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_2_NAME).settings())); - // TODO - uncomment after all customs is also uploaded for all repos - https://github.com/opensearch-project/OpenSearch/issues/10691 - // assertEquals("fs", repositoriesMetadata.repository("custom-repo").type()); - // assertEquals(Settings.builder().put("location", randomRepoPath()).put("compress", false).build(), - // repositoriesMetadata.repository("custom-repo").settings()); + assertEquals("fs", repositoriesMetadata.repository("custom-repo").type()); + assertEquals( + Settings.builder().put("location", repoPath).put("compress", false).build(), + repositoriesMetadata.repository("custom-repo").settings() + ); + + // repo cleanup post verification + clusterAdmin().prepareDeleteRepository("custom-repo").get(); } private void addClusterLevelReadOnlyBlock() throws InterruptedException, ExecutionException { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 96ce2fc779ea0..5eff9860d5218 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -568,7 +568,13 @@ private ClusterMetadataManifest uploadManifest( private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); - CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); + CLUSTER_METADATA_MANIFEST_FORMAT.write( + uploadManifest, + metadataManifestContainer, + fileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); } private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 17cb68f798094..e280141c12bc1 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -170,8 +170,29 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr * @param compressor whether to use compression */ public void write(final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor) throws IOException { + write(obj, blobContainer, name, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS); + } + + /** + * Writes blob with resolving the blob name using {@link #blobName} method. + *

+ * The blob will optionally by compressed. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param compressor whether to use compression + * @param params ToXContent params + */ + public void write( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + final ToXContent.Params params + ) throws IOException { final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS); + final BytesReference bytes = serialize(obj, blobName, compressor, params); blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } @@ -195,7 +216,7 @@ public void writeAsync( final ToXContent.Params params ) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - write(obj, blobContainer, name, compressor); + write(obj, blobContainer, name, compressor, params); listener.onResponse(null); return; }