Skip to content

Commit

Permalink
Remove RemoteObject Store interface
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed May 22, 2024
1 parent 99687b0 commit 3e1f31f
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,37 @@

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

public abstract class AbstractRemoteBlobStoreObject<T> implements RemoteObject <T> {
public abstract class AbstractRemoteBlobStoreObject<T> implements RemoteObject<T> {

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;

public AbstractRemoteBlobStoreObject(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(ThreadPool.Names.GENERIC);
}

public abstract BlobPathParameters getBlobPathParameters();

public abstract String getFullBlobName();

public String getBlobFileName() {
Expand All @@ -24,8 +50,65 @@ public String getBlobFileName() {
String[] pathTokens = getFullBlobName().split(PATH_DELIMITER);
return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1];
}

public abstract String generateBlobFileName();
public abstract RemoteObjectStore<T> getBackingStore();

public abstract UploadedMetadata getUploadedMetadata();

@Override
public CheckedRunnable<IOException> writeAsync(ActionListener<Void> listener) {
return () -> {
assert get() != null;
InputStream inputStream = serialize();
transferService.uploadBlob(inputStream, getBlobPathForUpload(), getBlobFileName(), WritePriority.URGENT, listener);
};
}

@Override
public T read() throws IOException {
assert getFullBlobName() != null;
return deserialize(
transferService.downloadBlob(getBlobPathForDownload(), getBlobFileName()));
}

@Override
public void readAsync(ActionListener<T> listener) {
executorService.execute(() -> {
try {
listener.onResponse(read());
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public BlobPath getBlobPathForUpload() {
BlobPath blobPath = blobStoreRepository.basePath().add(RemoteClusterStateUtils.encodeString(clusterName)).add("cluster-state").add(clusterUUID());
for (String token : getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
}

public BlobPath getBlobPathForDownload() {
String[] pathTokens = extractBlobPathTokens(getFullBlobName());
BlobPath blobPath = blobStoreRepository.basePath();
for (String token : pathTokens) {
blobPath = blobPath.add(token);
}
return blobPath;
}

protected Compressor getCompressor() {
return blobStoreRepository.getCompressor();
}

protected BlobStoreRepository getBlobStoreRepository() {
return this.blobStoreRepository;
}

private static String[] extractBlobPathTokens(String blobName) {
String[] blobNameTokens = blobName.split(PATH_DELIMITER);
return Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public void start() {
}
String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value();
remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(blobStoreRepository, clusterSettings,threadpool, getBlobStoreTransferService(), clusterName);
remoteIndexMetadataManager = new RemoteIndexMetadataManager(blobStoreRepository, clusterSettings,threadpool, clusterName, blobStoreRepository.getNamedXContentRegistry(), getBlobStoreTransferService());
remoteIndexMetadataManager = new RemoteIndexMetadataManager(blobStoreRepository, clusterSettings,threadpool, clusterName, getBlobStoreTransferService());
remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager(blobStoreRepository, threadpool);
remoteManifestManager = new RemoteManifestManager(blobStoreRepository, clusterSettings, nodeId);
remoteClusterStateCleanupManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_CURRENT_CODEC_VERSION;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.io.Streams;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

public class RemoteCoordinationMetadata extends AbstractRemoteBlobStoreObject<CoordinationMetadata> {

Expand All @@ -36,24 +36,21 @@ public class RemoteCoordinationMetadata extends AbstractRemoteBlobStoreObject<Co

private CoordinationMetadata coordinationMetadata;
private long metadataVersion;
private final RemoteObjectStore<CoordinationMetadata> backingStore;
private String blobName;
private final NamedXContentRegistry xContentRegistry;
private final String clusterUUID;

public RemoteCoordinationMetadata(CoordinationMetadata coordinationMetadata, long metadataVersion, String clusterUUID, RemoteObjectBlobStore<CoordinationMetadata> backingStore,
NamedXContentRegistry xContentRegistry) {
public RemoteCoordinationMetadata(CoordinationMetadata coordinationMetadata, long metadataVersion, String clusterUUID, BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.coordinationMetadata = coordinationMetadata;
this.metadataVersion = metadataVersion;
this.backingStore = backingStore;
this.xContentRegistry = xContentRegistry;
this.clusterUUID = clusterUUID;
}

public RemoteCoordinationMetadata(String blobName, String clusterUUID, RemoteObjectStore<CoordinationMetadata> backingStore, NamedXContentRegistry xContentRegistry) {
public RemoteCoordinationMetadata(String blobName, String clusterUUID, BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.blobName = blobName;
this.backingStore = backingStore;
this.xContentRegistry = xContentRegistry;
this.clusterUUID = clusterUUID;
}

Expand All @@ -77,10 +74,8 @@ public String generateBlobFileName() {
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION)
);
assert backingStore instanceof RemoteObjectBlobStore;
RemoteObjectBlobStore<CoordinationMetadata> blobStore = (RemoteObjectBlobStore<CoordinationMetadata>) backingStore;
// setting the full blob path with name for future access
this.blobName = blobStore.getBlobPathForUpload(this).buildAsString() + blobFileName;
this.blobName = getBlobPathForUpload().buildAsString() + blobFileName;
return blobFileName;
}

Expand All @@ -94,19 +89,14 @@ public String clusterUUID() {
return clusterUUID;
}

@Override
public RemoteObjectStore<CoordinationMetadata> getBackingStore() {
return backingStore;
}

@Override
public InputStream serialize() throws IOException {
return COORDINATION_METADATA_FORMAT.serialize(coordinationMetadata, generateBlobFileName(), getBackingStore().getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
return COORDINATION_METADATA_FORMAT.serialize(coordinationMetadata, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
}

@Override
public CoordinationMetadata deserialize(InputStream inputStream) throws IOException {
return COORDINATION_METADATA_FORMAT.deserialize(blobName, xContentRegistry, Streams.readFully(inputStream));
return COORDINATION_METADATA_FORMAT.deserialize(blobName, getBlobStoreRepository().getNamedXContentRegistry(), Streams.readFully(inputStream));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,34 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.Metadata.Custom;
import org.opensearch.common.io.Streams;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

public class RemoteCustomMetadata extends AbstractRemoteBlobStoreObject<Custom>{
public class RemoteCustomMetadata extends AbstractRemoteBlobStoreObject<Custom> {

public static final String CUSTOM_METADATA = "custom";
public static final String CUSTOM_DELIMITER = "--";

public final ChecksumBlobStoreFormat<Custom> customBlobStoreFormat;

private Custom custom;
private String customType;
private final String customType;
private long metadataVersion;
private final RemoteObjectStore<Custom> backingStore;
private String blobName;
private final NamedXContentRegistry xContentRegistry;
private final String clusterUUID;

public RemoteCustomMetadata(Custom custom, String customType, long metadataVersion, String clusterUUID, RemoteObjectBlobStore<Custom> backingStore,
NamedXContentRegistry xContentRegistry) {
public RemoteCustomMetadata(Custom custom, String customType, long metadataVersion, String clusterUUID, BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.custom = custom;
this.customType = customType;
this.metadataVersion = metadataVersion;
this.backingStore = backingStore;
this.xContentRegistry = xContentRegistry;
this.clusterUUID = clusterUUID;
this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>(
"custom",
Expand All @@ -54,11 +54,12 @@ public RemoteCustomMetadata(Custom custom, String customType, long metadataVersi
);
}

public RemoteCustomMetadata(String blobName, String customType, String clusterUUID, RemoteObjectStore<Custom> backingStore, NamedXContentRegistry xContentRegistry) {
public RemoteCustomMetadata(String blobName, String customType, String clusterUUID, BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.blobName = blobName;
this.customType = customType;
this.backingStore = backingStore;
this.xContentRegistry = xContentRegistry;
this.clusterUUID = clusterUUID;
this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>(
"custom",
Expand All @@ -80,18 +81,17 @@ public String getFullBlobName() {

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version>
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__
// <codec_version>
String blobFileName = String.join(
DELIMITER,
getBlobPathParameters().getFilePrefix(),
RemoteStoreUtils.invertLong(metadataVersion),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION)
);
assert backingStore instanceof RemoteObjectBlobStore;
RemoteObjectBlobStore<Custom> blobStore = (RemoteObjectBlobStore<Custom>) backingStore;
// setting the full blob path with name for future access
this.blobName = blobStore.getBlobPathForUpload(this).buildAsString() + blobFileName;
this.blobName = getBlobPathForUpload().buildAsString() + blobFileName;
return blobFileName;
}

Expand All @@ -105,19 +105,14 @@ public String clusterUUID() {
return clusterUUID;
}

@Override
public RemoteObjectStore<Custom> getBackingStore() {
return backingStore;
}

@Override
public InputStream serialize() throws IOException {
return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getBackingStore().getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
}

@Override
public Custom deserialize(InputStream inputStream) throws IOException {
return customBlobStoreFormat.deserialize(blobName, xContentRegistry, Streams.readFully(inputStream));
return customBlobStoreFormat.deserialize(blobName, getBlobStoreRepository().getNamedXContentRegistry(), Streams.readFully(inputStream));
}

@Override
Expand Down
Loading

0 comments on commit 3e1f31f

Please sign in to comment.