Skip to content

Commit

Permalink
Remote state interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed May 23, 2024
1 parent 66df930 commit e28a949
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.concurrent.ExecutorService;

/**
* An extension of {@link RemoteObject} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractRemoteBlobStoreObject<T> implements RemoteObject<T> {

public static final String PATH_DELIMITER = "/";

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;

public AbstractRemoteBlobStoreObject(
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName,
ThreadPool threadPool
) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(ThreadPool.Names.GENERIC);
}

public abstract BlobPathParameters getBlobPathParameters();

public abstract String getFullBlobName();

public String getBlobFileName() {
if (getFullBlobName() == null) {
generateBlobFileName();
}
String[] pathTokens = getFullBlobName().split(PATH_DELIMITER);
return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1];
}

public abstract String generateBlobFileName();

public abstract UploadedMetadata getUploadedMetadata();

@Override
public CheckedRunnable<IOException> writeAsync(ActionListener<Void> listener) {
return () -> {
assert get() != null;
// TODO add implementation
};
}

@Override
public T read() throws IOException {
assert getFullBlobName() != null;
return deserialize(transferService.downloadBlob(getBlobPathForDownload(), getBlobFileName()));
}

@Override
public void readAsync(ActionListener<T> listener) {
executorService.execute(() -> {
try {
listener.onResponse(read());
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public BlobPath getBlobPathForUpload() {
BlobPath blobPath = blobStoreRepository.basePath().add(encodeString(clusterName)).add("cluster-state").add(clusterUUID());
for (String token : getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
}

public BlobPath getBlobPathForDownload() {
String[] pathTokens = extractBlobPathTokens(getFullBlobName());
BlobPath blobPath = blobStoreRepository.basePath();
for (String token : pathTokens) {
blobPath = blobPath.add(token);
}
return blobPath;
}

protected Compressor getCompressor() {
return blobStoreRepository.getCompressor();
}

protected BlobStoreRepository getBlobStoreRepository() {
return this.blobStoreRepository;
}

private static String[] extractBlobPathTokens(String blobName) {
String[] blobNameTokens = blobName.split(PATH_DELIMITER);
return Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1);
}

private static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.util.List;

public class BlobPathParameters {

private List<String> pathTokens;
private String filePrefix;

public BlobPathParameters(List<String> pathTokens, String filePrefix) {
this.pathTokens = pathTokens;
this.filePrefix = filePrefix;
}

public List<String> getPathTokens() {
return pathTokens;
}

public String getFilePrefix() {
return filePrefix;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;

/**
* An interface to read/write and object from/to a remote storage. This interface is agnostic of the remote storage type.
* @param <T> The object type which can be upload to or download from remote storage.
*/
public interface RemoteObject<T> {
public T get();

public String clusterUUID();

public InputStream serialize() throws IOException;

public T deserialize(InputStream inputStream) throws IOException;

public CheckedRunnable<IOException> writeAsync(ActionListener<Void> listener);

public T read() throws IOException;

public void readAsync(ActionListener<T> listener);

}

0 comments on commit e28a949

Please sign in to comment.