From 3f0909420fe0277973c1c762d447328eea8c86d1 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 22 May 2024 23:43:53 +0530 Subject: [PATCH] Remote state interfaces Signed-off-by: Sooraj Sinha --- .../remote/AbstractRemoteBlobStoreObject.java | 122 ++++++++++++++++++ .../gateway/remote/BlobPathParameters.java | 30 +++++ .../gateway/remote/RemoteObject.java | 30 +++++ 3 files changed, 182 insertions(+) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java b/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java new file mode 100644 index 0000000000000..1019de28ae751 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java @@ -0,0 +1,122 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.concurrent.ExecutorService; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.compress.Compressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An extension of {@link RemoteObject} class which caters to the use case of writing to and reading from a blob storage + * + * @param The class type which can be uploaded to or downloaded from a blob storage. + */ +public abstract class AbstractRemoteBlobStoreObject implements RemoteObject { + + public static final String PATH_DELIMITER = "/"; + + private final BlobStoreTransferService transferService; + private final BlobStoreRepository blobStoreRepository; + private final String clusterName; + private final ExecutorService executorService; + + public AbstractRemoteBlobStoreObject(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, + ThreadPool threadPool) { + this.transferService = blobStoreTransferService; + this.blobStoreRepository = blobStoreRepository; + this.clusterName = clusterName; + this.executorService = threadPool.executor(ThreadPool.Names.GENERIC); + } + + public abstract BlobPathParameters getBlobPathParameters(); + + public abstract String getFullBlobName(); + + public String getBlobFileName() { + if (getFullBlobName() == null) { + generateBlobFileName(); + } + String[] pathTokens = getFullBlobName().split(PATH_DELIMITER); + return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1]; + } + + public abstract String generateBlobFileName(); + + public abstract UploadedMetadata getUploadedMetadata(); + + @Override + public CheckedRunnable writeAsync(ActionListener listener) { + return () -> { + assert get() != null; + // TODO add implementation + }; + } + + @Override + public T read() throws IOException { + assert getFullBlobName() != null; + return deserialize( + transferService.downloadBlob(getBlobPathForDownload(), getBlobFileName())); + } + + @Override + public void readAsync(ActionListener listener) { + executorService.execute(() -> { + try { + listener.onResponse(read()); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + + public BlobPath getBlobPathForUpload() { + BlobPath blobPath = blobStoreRepository.basePath().add(encodeString(clusterName)).add("cluster-state").add(clusterUUID()); + for (String token : getBlobPathParameters().getPathTokens()) { + blobPath = blobPath.add(token); + } + return blobPath; + } + + public BlobPath getBlobPathForDownload() { + String[] pathTokens = extractBlobPathTokens(getFullBlobName()); + BlobPath blobPath = blobStoreRepository.basePath(); + for (String token : pathTokens) { + blobPath = blobPath.add(token); + } + return blobPath; + } + + protected Compressor getCompressor() { + return blobStoreRepository.getCompressor(); + } + + protected BlobStoreRepository getBlobStoreRepository() { + return this.blobStoreRepository; + } + + private static String[] extractBlobPathTokens(String blobName) { + String[] blobNameTokens = blobName.split(PATH_DELIMITER); + return Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1); + } + + private static String encodeString(String content) { + return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java b/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java new file mode 100644 index 0000000000000..4b47bd744ef1a --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import java.util.List; + +public class BlobPathParameters { + + private List pathTokens; + private String filePrefix; + + public BlobPathParameters(List pathTokens, String filePrefix) { + this.pathTokens = pathTokens; + this.filePrefix = filePrefix; + } + + public List getPathTokens() { + return pathTokens; + } + + public String getFilePrefix() { + return filePrefix; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java new file mode 100644 index 0000000000000..b961198a6f798 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import java.io.IOException; +import java.io.InputStream; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.core.action.ActionListener; + +/** + * An interface to read/write and object from/to a remote storage. This interface is agnostic of the remote storage type. + * @param The object type which can be upload to or download from remote storage. + */ +public interface RemoteObject { + public T get(); + public String clusterUUID(); + public InputStream serialize() throws IOException; + public T deserialize(InputStream inputStream) throws IOException; + + public CheckedRunnable writeAsync(ActionListener listener); + public T read() throws IOException; + public void readAsync(ActionListener listener); + +}