Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Cluster State] Remote state interfaces #13785

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;

/**
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> {

protected String blobFileName;

protected String blobName;
private final String clusterUUID;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private String[] pathTokens;

public AbstractRemoteWritableBlobEntity(
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
this.clusterUUID = clusterUUID;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
}

Check warning on line 41 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L37-L41

Added lines #L37 - L41 were not covered by tests

public abstract BlobPathParameters getBlobPathParameters();

public String getFullBlobName() {
return blobName;

Check warning on line 46 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L46

Added line #L46 was not covered by tests
}

public String getBlobFileName() {
if (blobFileName == null) {
String[] pathTokens = getBlobPathTokens();

Check warning on line 51 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L51

Added line #L51 was not covered by tests
if (pathTokens == null || pathTokens.length < 1) {
return null;

Check warning on line 53 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L53

Added line #L53 was not covered by tests
}
blobFileName = pathTokens[pathTokens.length - 1];

Check warning on line 55 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L55

Added line #L55 was not covered by tests
}
return blobFileName;

Check warning on line 57 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L57

Added line #L57 was not covered by tests
}

public String[] getBlobPathTokens() {
if (pathTokens != null) {
return pathTokens;

Check warning on line 62 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L62

Added line #L62 was not covered by tests
}
if (blobName == null) {
return null;

Check warning on line 65 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L65

Added line #L65 was not covered by tests
}
pathTokens = blobName.split(PATH_DELIMITER);
return pathTokens;

Check warning on line 68 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L67-L68

Added lines #L67 - L68 were not covered by tests
}

public abstract String generateBlobFileName();

public String clusterUUID() {
return clusterUUID;

Check warning on line 74 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L74

Added line #L74 was not covered by tests
}

public abstract UploadedMetadata getUploadedMetadata();

public void setFullBlobName(BlobPath blobPath) {
soosinha marked this conversation as resolved.
Show resolved Hide resolved
this.blobName = blobPath.buildAsString() + blobFileName;
}

Check warning on line 81 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L80-L81

Added lines #L80 - L81 were not covered by tests

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;

Check warning on line 84 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L84

Added line #L84 was not covered by tests
}

protected Compressor getCompressor() {
return compressor;

Check warning on line 88 in server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java#L88

Added line #L88 was not covered by tests
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import java.util.List;

/**
* Parameters which can be used to construct a blob path
*
*/
public class BlobPathParameters {

private final List<String> pathTokens;
private final String filePrefix;

public BlobPathParameters(final List<String> pathTokens, final String filePrefix) {
this.pathTokens = pathTokens;
this.filePrefix = filePrefix;
}

Check warning on line 25 in server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java#L22-L25

Added lines #L22 - L25 were not covered by tests

public List<String> getPathTokens() {
return pathTokens;

Check warning on line 28 in server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java#L28

Added line #L28 was not covered by tests
}

public String getFilePrefix() {
return filePrefix;

Check warning on line 32 in server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java#L32

Added line #L32 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.action.ActionListener;

import java.io.IOException;

/**
* An interface to read/write an object from/to a remote storage. This interface is agnostic of the remote storage type.
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T.
*/
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make this ExperimentalApi for the current release


public void writeAsync(U entity, ActionListener<Void> listener);

public T read(U entity) throws IOException;

public void readAsync(U entity, ActionListener<T> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;
soosinha marked this conversation as resolved.
Show resolved Hide resolved

import java.io.IOException;
import java.io.InputStream;

/**
* An interface to which provides defines the serialization/deserialization methods for objects to be uploaded to or downloaded from remote store.
* This interface is agnostic of the remote storage type.
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
*/
public interface RemoteWriteableEntity<T> {
/**
* @return An InputStream created by serializing the entity T
* @throws IOException Exception encountered while serialization
*/
public InputStream serialize() throws IOException;

/**
* @param inputStream The InputStream which is used to read the serialized entity
* @return The entity T after deserialization
* @throws IOException Exception encountered while deserialization
*/
public T deserialize(InputStream inputStream) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/**
* Common remote store package
*/
package org.opensearch.common.remote;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.nio.charset.StandardCharsets;
import java.util.Base64;

/**
* Utility class for Remote Cluster State
*/
public class RemoteClusterStateUtils {

Check warning on line 17 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java#L17

Added line #L17 was not covered by tests
public static final String PATH_DELIMITER = "/";

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));

Check warning on line 21 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java#L21

Added line #L21 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
* Abstract class for a blob type storage
*
* @param <T> The entity which can be uploaded to / downloaded from blob store
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity.
*/
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {

Check warning on line 31 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L31

Added line #L31 was not covered by tests

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;

public RemoteClusterStateBlobStore(
final BlobStoreTransferService blobStoreTransferService,
final BlobStoreRepository blobStoreRepository,
final String clusterName,
final ThreadPool threadPool,
final String executor
) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(executor);
}

Check warning on line 49 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L44-L49

Added lines #L44 - L49 were not covered by tests

@Override
public void writeAsync(final U entity, final ActionListener<Void> listener) {
try {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);

Check warning on line 56 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L54-L56

Added lines #L54 - L56 were not covered by tests
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT,
// listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

Check warning on line 64 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L61-L64

Added lines #L61 - L64 were not covered by tests

public T read(final U entity) throws IOException {
// TODO Add timing logs and tracing
assert entity.getFullBlobName() != null;
soosinha marked this conversation as resolved.
Show resolved Hide resolved
return entity.deserialize(transferService.downloadBlob(getBlobPathForDownload(entity), entity.getBlobFileName()));

Check warning on line 69 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L69

Added line #L69 was not covered by tests
}

@Override
public void readAsync(final U entity, final ActionListener<T> listener) {
soosinha marked this conversation as resolved.
Show resolved Hide resolved
executorService.execute(() -> {

Check warning on line 74 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L74

Added line #L74 was not covered by tests
try {
listener.onResponse(read(entity));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

Check warning on line 81 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L76-L81

Added lines #L76 - L81 were not covered by tests

private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
BlobPath blobPath = blobStoreRepository.basePath()
.add(RemoteClusterStateUtils.encodeString(clusterName))
.add("cluster-state")
.add(obj.clusterUUID());

Check warning on line 87 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L84-L87

Added lines #L84 - L87 were not covered by tests
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;

Check warning on line 91 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L89-L91

Added lines #L89 - L91 were not covered by tests
}

private BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) {
String[] pathTokens = obj.getBlobPathTokens();
BlobPath blobPath = new BlobPath();

Check warning on line 96 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L95-L96

Added lines #L95 - L96 were not covered by tests
if (pathTokens == null || pathTokens.length < 1) {
return blobPath;

Check warning on line 98 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L98

Added line #L98 was not covered by tests
}
// Iterate till second last path token to get the blob folder
for (int i = 0; i < pathTokens.length - 1; i++) {
blobPath = blobPath.add(pathTokens[i]);

Check warning on line 102 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L102

Added line #L102 was not covered by tests
}
return blobPath;

Check warning on line 104 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java#L104

Added line #L104 was not covered by tests
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package containing models for remote cluster state
*/
package org.opensearch.gateway.remote.model;
Loading