Skip to content

Commit

Permalink
Implement interface changes for s3 plugin to read/write blob with obj… (
Browse files Browse the repository at this point in the history
opensearch-project#13113) (opensearch-project#13765)

* Implement interface changes for s3 plugin to read/write blob with object metadata
---------
Signed-off-by: Sandeep Kumawat <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
skumawat2025 authored and kkewwei committed Jul 24, 2024
1 parent 8bfa772 commit 8932c34
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -77,6 +78,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -139,6 +141,13 @@ public boolean blobExists(String blobName) {
}
}

@ExperimentalApi
@Override
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
}

@Override
public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));
Expand Down Expand Up @@ -170,12 +179,27 @@ public long readBlobPreferredLength() {
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, null);
}

/**
* Write blob with its object metadata.
*/
@ExperimentalApi
@Override
public void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists,
@Nullable Map<String, String> metadata
) throws IOException {
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
});
Expand All @@ -191,7 +215,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
);
try {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
Expand All @@ -211,7 +236,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
uploadRequest.getContentLength(),
uploadRequest.getMetadata()
);
completionListener.onResponse(null);
} catch (Exception ex) {
Expand Down Expand Up @@ -582,8 +608,13 @@ private String buildKey(String blobName) {
/**
* Uploads a blob using a single upload request
*/
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeSingleUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
Expand All @@ -600,6 +631,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));

if (CollectionUtils.isNotEmpty(metadata)) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand All @@ -623,8 +658,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeMultipartUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

ensureMultiPartUploadSize(blobSize);
final long partSize = blobStore.bufferSizeInBytes();
Expand All @@ -649,6 +689,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (CollectionUtils.isNotEmpty(metadata)) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;
private boolean eof;
private Map<String, String> metadata;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
Expand Down Expand Up @@ -122,6 +124,7 @@ private void openStream() throws IOException {
getObjectResponseInputStream.response().contentLength()
);
this.currentStream = getObjectResponseInputStream;
this.metadata = getObjectResponseInputStream.response().metadata();
this.isStreamAborted.set(false);
} catch (final SdkException e) {
if (e instanceof S3Exception) {
Expand Down Expand Up @@ -265,4 +268,8 @@ boolean isEof() {
boolean isAborted() {
return isStreamAborted.get();
}

Map<String, String> getMetadata() {
return this.metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.CompletableFutureUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -154,6 +155,10 @@ private void uploadInParts(
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));

if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
Expand Down Expand Up @@ -352,6 +357,10 @@ private void uploadInOneChunk(
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));

if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.repositories.s3.async;

import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
import java.util.Map;

/**
* A model encapsulating all details for an upload to S3
Expand All @@ -25,6 +27,7 @@ public class UploadRequest {
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;
private final boolean uploadRetryEnabled;
private final Map<String, String> metadata;

/**
* Construct a new UploadRequest object
Expand All @@ -36,6 +39,7 @@ public class UploadRequest {
* @param uploadFinalizer An upload finalizer to call once all parts are uploaded
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
* @param metadata Metadata of the file being uploaded
*/
public UploadRequest(
String bucket,
Expand All @@ -45,7 +49,8 @@ public UploadRequest(
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum,
boolean uploadRetryEnabled
boolean uploadRetryEnabled,
@Nullable Map<String, String> metadata
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -55,6 +60,7 @@ public UploadRequest(
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
this.metadata = metadata;
}

public String getBucket() {
Expand Down Expand Up @@ -88,4 +94,11 @@ public Long getExpectedChecksum() {
public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

/**
* @return metadata of the blob to be uploaded
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
Expand All @@ -79,6 +80,7 @@

import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -722,6 +724,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
.writePriority(writePriority)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.metadata(new HashMap<>())
.build();

s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
Expand All @@ -731,7 +734,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
} else {
assertNull(exceptionRef.get());
}
verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong());
verify(s3BlobContainer, times(1)).executeMultipartUpload(
any(S3BlobStore.class),
anyString(),
any(InputStream.class),
anyLong(),
anyMap()
);

if (expectException) {
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));
Expand Down
Loading

0 comments on commit 8932c34

Please sign in to comment.