Skip to content

Commit

Permalink
Implement interface changes for s3 plugin to read/write blob with obj…
Browse files Browse the repository at this point in the history
…ect metadata

Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
Sandeep Kumawat committed Apr 8, 2024
1 parent 9b0f578 commit fe8ba21
Show file tree
Hide file tree
Showing 16 changed files with 217 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.InputStream;
import java.util.Map;

/**
* Model composed of an input stream and the total content length of the stream
Expand All @@ -23,17 +24,19 @@ public class InputStreamContainer {
private final InputStream inputStream;
private final long contentLength;
private final long offset;
private final Map<String, String> metadata;

/**
* Construct a new stream object
*
* @param inputStream The input stream that is to be encapsulated
* @param contentLength The total content length that is to be read from the stream
*/
public InputStreamContainer(InputStream inputStream, long contentLength, long offset) {
public InputStreamContainer(InputStream inputStream, long contentLength, long offset, Map<String, String> metadata) {
this.inputStream = inputStream;
this.contentLength = contentLength;
this.offset = offset;
this.metadata = metadata;
}

/**
Expand All @@ -56,4 +59,11 @@ public long getContentLength() {
public long getOffset() {
return offset;
}

/**
* @return metadata of the source content.
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -138,6 +139,13 @@ public boolean blobExists(String blobName) {
}
}

@ExperimentalApi
@Override
public BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new BlobDownloadResponse(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
}

@Override
public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));
Expand Down Expand Up @@ -172,9 +180,32 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, null);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, null);
}
return null;
});
}

/**
* Write blob with its object metadata.
*/
@ExperimentalApi
@Override
public void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
});
Expand All @@ -190,7 +221,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
);
try {
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
Expand All @@ -203,7 +235,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
uploadRequest.getContentLength(),
uploadRequest.getMetadata()
);
completionListener.onResponse(null);
} catch (Exception ex) {
Expand Down Expand Up @@ -309,6 +342,18 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}

@ExperimentalApi
@Override
public void writeBlobAtomicWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
writeBlobWithMetadata(blobName, inputStream, metadata, blobSize, failIfAlreadyExists);
}

@Override
public DeleteResult delete() throws IOException {
final AtomicLong deletedBlobs = new AtomicLong();
Expand Down Expand Up @@ -542,8 +587,13 @@ private String buildKey(String blobName) {
/**
* Uploads a blob using a single upload request
*/
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeSingleUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
Expand All @@ -560,6 +610,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));

if (metadata != null) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand All @@ -583,8 +637,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeMultipartUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

ensureMultiPartUploadSize(blobSize);
final long partSize = blobStore.bufferSizeInBytes();
Expand All @@ -609,6 +668,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (metadata != null) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand Down Expand Up @@ -767,11 +830,12 @@ static InputStreamContainer transformResponseToInputStreamContainer(
final GetObjectResponse getObjectResponse = streamResponse.response();
final String contentRange = getObjectResponse.contentRange();
final Long contentLength = getObjectResponse.contentLength();
final Map<String, String> metadata = getObjectResponse.metadata();
if ((isMultipartObject && contentRange == null) || contentLength == null) {
throw SdkException.builder().message("Failed to fetch required metadata for blob part").build();
}
final long offset = isMultipartObject ? HttpRangeUtils.getStartOffsetFromRangeHeader(getObjectResponse.contentRange()) : 0L;
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset);
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset, metadata);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;
private boolean eof;
private Map<String, String> metadata;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
Expand Down Expand Up @@ -122,6 +124,7 @@ private void openStream() throws IOException {
getObjectResponseInputStream.response().contentLength()
);
this.currentStream = getObjectResponseInputStream;
this.metadata = getObjectResponseInputStream.response().metadata();
this.isStreamAborted.set(false);
} catch (final SdkException e) {
if (e instanceof S3Exception) {
Expand Down Expand Up @@ -265,4 +268,8 @@ boolean isEof() {
boolean isAborted() {
return isStreamAborted.get();
}

public Map<String, String> getMetadata() {
return this.metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ private void uploadInParts(

CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder()
.bucket(uploadRequest.getBucket())
.metadata(uploadRequest.getMetadata())
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));
if (uploadRequest.doRemoteDataIntegrityCheck()) {
Expand Down Expand Up @@ -324,6 +325,7 @@ private void uploadInOneChunk(
) {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(uploadRequest.getBucket())
.metadata(uploadRequest.getMetadata())
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
import java.util.Map;

/**
* A model encapsulating all details for an upload to S3
Expand All @@ -24,8 +25,8 @@ public class UploadRequest {
private final CheckedConsumer<Boolean, IOException> uploadFinalizer;
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;

private boolean uploadRetryEnabled;
private final Map<String, String> metadata;

/**
* Construct a new UploadRequest object
Expand All @@ -46,7 +47,8 @@ public UploadRequest(
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum,
boolean uploadRetryEnabled
boolean uploadRetryEnabled,
Map<String, String> metadata
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -56,6 +58,7 @@ public UploadRequest(
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
this.metadata = metadata;
}

public String getBucket() {
Expand Down Expand Up @@ -89,4 +92,8 @@ public Long getExpectedChecksum() {
public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -471,7 +472,7 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
return new InputStreamContainer(inputStream, size, position, null);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -527,7 +528,7 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
return new InputStreamContainer(inputStream, size, position, null);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -649,7 +650,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
return new InputStreamContainer(inputStream, size, position, null);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
Expand All @@ -668,7 +669,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
} else {
assertNull(exceptionRef.get());
}
verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong());
verify(s3BlobContainer, times(1)).executeMultipartUpload(
any(S3BlobStore.class),
anyString(),
any(InputStream.class),
anyLong(),
anyMap()
);

if (expectException) {
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));
Expand Down
Loading

0 comments on commit fe8ba21

Please sign in to comment.