diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 491c04bab3adb..b0354af2e609c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Nullable; @@ -21,10 +23,12 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; @@ -35,7 +39,11 @@ import java.util.Base64; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -127,23 +135,12 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro } ensureRepositorySet(); - final List allUploadedIndexMetadata = new ArrayList<>(); - // todo parallel upload // any validations before/after upload ? - for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 - final String indexMetadataKey = writeIndexMetadata( - clusterState.getClusterName().value(), - clusterState.getMetadata().clusterUUID(), - indexMetadata, - indexMetadataFileName(indexMetadata) - ); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( - indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), - indexMetadataKey - ); - allUploadedIndexMetadata.add(uploadedIndexMetadata); + final List allUploadedIndexMetadata; + try { + allUploadedIndexMetadata = writeIndexMetadata(clusterState, new ArrayList<>(clusterState.metadata().indices().values())); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); } final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false); final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; @@ -194,6 +191,9 @@ public ClusterMetadataManifest writeIncrementalMetadata( final Map allUploadedIndexMetadata = previousManifest.getIndices() .stream() .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); + + List toUpload = new ArrayList<>(); + for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { @@ -204,30 +204,25 @@ public ClusterMetadataManifest writeIncrementalMetadata( indexMetadata.getVersion() ); numIndicesUpdated++; - final String indexMetadataKey = writeIndexMetadata( - clusterState.getClusterName().value(), - clusterState.getMetadata().clusterUUID(), - indexMetadata, - indexMetadataFileName(indexMetadata) - ); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( - indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), - indexMetadataKey - ); - allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); + toUpload.add(indexMetadata); } else { numIndicesUnchanged++; } previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); } + try { + writeIndexMetadata(clusterState, toUpload); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { allUploadedIndexMetadata.remove(removedIndexName); } final ClusterMetadataManifest manifest = uploadManifest( clusterState, - allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), + new ArrayList<>(allUploadedIndexMetadata.values()), false ); final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; @@ -251,6 +246,72 @@ public ClusterMetadataManifest writeIncrementalMetadata( return manifest; } + private List writeIndexMetadata(ClusterState clusterState, List toUpload) throws IOException, + InterruptedException, TimeoutException { + List exceptionList = new ArrayList<>(toUpload.size()); + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + List result = new ArrayList<>(toUpload.size()); + + LatchedActionListener latchedActionListener = new LatchedActionListener<>(ActionListener.wrap((IndexMetadata t) -> { + logger.trace(String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", t.getIndex().toString())); + }, ex -> { + assert ex instanceof IndexMetadataTransferException; + logger.error( + () -> new ParameterizedMessage( + "Exception during transfer of IndexMetadata to Remote {}", + ((IndexMetadataTransferException) ex).getIndexMetadata().getIndex().toString() + ), + ex + ); + exceptionList.add(ex); + }), latch); + + for (IndexMetadata indexMetadata : toUpload) { + final UploadedIndexMetadata uploadedIndexMetadata = writeIndexMetadata(clusterState, latchedActionListener, indexMetadata); + result.add(uploadedIndexMetadata); + } + + try { + if (latch.await(20000, TimeUnit.MILLISECONDS) == false) { + TimeoutException ex = new TimeoutException("Timed out waiting for transfer of index metadata to complete"); + exceptionList.forEach(ex::addSuppressed); + throw ex; + } + } catch (InterruptedException ex) { + exceptionList.forEach(ex::addSuppressed); + Thread.currentThread().interrupt(); + throw ex; + } + + return result; + } + + private UploadedIndexMetadata writeIndexMetadata( + ClusterState clusterState, + LatchedActionListener latchedActionListener, + IndexMetadata indexMetadata + ) throws IOException { + final BlobContainer indexMetadataContainer = indexMetadataContainer( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID(), + indexMetadata.getIndexUUID() + ); + + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse(indexMetadata), + ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata, ex)) + ); + INDEX_METADATA_FORMAT.writeAsync( + indexMetadata, + indexMetadataContainer, + indexMetadataFileName(indexMetadata), + blobStoreRepository.getCompressor(), + completionListener + ); + final String indexMetadataKey = indexMetadataContainer.path().buildAsString() + indexMetadataFileName(indexMetadata); + return new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(), indexMetadataKey); + } + @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -282,7 +343,12 @@ void ensureRepositorySet() { } final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings); assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; - final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + final Repository repository; + try { + repository = repositoriesService.get().repository(remoteStoreRepo); + } catch (RepositoryMissingException e) { + return; + } assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; } @@ -367,4 +433,17 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) { return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); } + class IndexMetadataTransferException extends RuntimeException { + + private final IndexMetadata indexMetadata; + + public IndexMetadataTransferException(IndexMetadata indexMetadata, Throwable cause) { + super(cause); + this.indexMetadata = indexMetadata; + } + + public IndexMetadata getIndexMetadata() { + return indexMetadata; + } + } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 9048757405108..28d01256ec3e3 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -43,6 +43,10 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.CheckedFunction; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -50,6 +54,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -167,6 +172,46 @@ public void write(final T obj, final BlobContainer blobContainer, final String n blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } + /** + * Writes blob with resolving the blob name using {@link #blobName} method. + * Leverages the multipart upload if supported by the blobContainer. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param compressor whether to use compression + * @param listener listener to listen to write result + */ + public void writeAsync( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + ActionListener listener + ) throws IOException { + if (blobContainer instanceof VerifyingMultiStreamBlobContainer == false) { + write(obj, blobContainer, name, compressor); + return; + } + final String blobName = blobName(name); + final BytesReference bytes = serialize(obj, blobName, compressor); + + IndexInput input = new ByteArrayIndexInput("", BytesReference.toBytes(bytes)); + + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + blobName, + blobName, + bytes.length(), + true, + WritePriority.HIGH, + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), + CodecUtil.checksumEntireFile(input), + true + ); + + ((VerifyingMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); + } + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try (