Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement interface changes for s3 plugin to read/write blob with obj… #13113

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.InputStream;
import java.util.Map;

/**
* Model composed of an input stream and the total content length of the stream
Expand All @@ -23,17 +24,20 @@
private final InputStream inputStream;
private final long contentLength;
private final long offset;
private final Map<String, String> metadata;

/**
* Construct a new stream object
*
* @param inputStream The input stream that is to be encapsulated
* @param contentLength The total content length that is to be read from the stream
* @param metadata The metadata of the blob. This will be same for each part download.
*/
public InputStreamContainer(InputStream inputStream, long contentLength, long offset) {
public InputStreamContainer(InputStream inputStream, long contentLength, long offset, Map<String, String> metadata) {
this.inputStream = inputStream;
this.contentLength = contentLength;
this.offset = offset;
this.metadata = metadata;
}

/**
Expand All @@ -56,4 +60,11 @@
public long getOffset() {
return offset;
}

/**
* @return metadata of the source content.
*/
public Map<String, String> getMetadata() {
return metadata;

Check warning on line 68 in libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java

View check run for this annotation

Codecov / codecov/patch

libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java#L68

Added line #L68 was not covered by tests
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -138,6 +139,13 @@
}
}

@ExperimentalApi
@Override
public BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new BlobDownloadResponse(s3RetryingInputStream, s3RetryingInputStream.getMetadata());

Check warning on line 146 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L145-L146

Added lines #L145 - L146 were not covered by tests
}

@Override
public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));
Expand Down Expand Up @@ -169,12 +177,27 @@
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlobWithMetadata(blobName, inputStream, null, blobSize, failIfAlreadyExists);
}

/**
* Write blob with its object metadata.
*/
@ExperimentalApi
@Override
public void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
@Nullable Map<String, String> metadata,
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
});
Expand All @@ -190,7 +213,8 @@
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
);
try {
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
Expand All @@ -203,7 +227,8 @@
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
uploadRequest.getContentLength(),
uploadRequest.getMetadata()
);
completionListener.onResponse(null);
} catch (Exception ex) {
Expand Down Expand Up @@ -309,6 +334,18 @@
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}

@ExperimentalApi
@Override
public void writeBlobAtomicWithMetadata(
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
writeBlobWithMetadata(blobName, inputStream, metadata, blobSize, failIfAlreadyExists);
}

Check warning on line 347 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L346-L347

Added lines #L346 - L347 were not covered by tests

@Override
public DeleteResult delete() throws IOException {
final AtomicLong deletedBlobs = new AtomicLong();
Expand Down Expand Up @@ -542,8 +579,13 @@
/**
* Uploads a blob using a single upload request
*/
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeSingleUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
Expand All @@ -560,6 +602,10 @@
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));

if (metadata != null && !metadata.isEmpty()) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand All @@ -583,8 +629,13 @@
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeMultipartUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

ensureMultiPartUploadSize(blobSize);
final long partSize = blobStore.bufferSizeInBytes();
Expand All @@ -609,6 +660,10 @@
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (metadata != null && !metadata.isEmpty()) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand Down Expand Up @@ -767,11 +822,12 @@
final GetObjectResponse getObjectResponse = streamResponse.response();
final String contentRange = getObjectResponse.contentRange();
final Long contentLength = getObjectResponse.contentLength();
final Map<String, String> metadata = getObjectResponse.metadata();
if ((isMultipartObject && contentRange == null) || contentLength == null) {
throw SdkException.builder().message("Failed to fetch required metadata for blob part").build();
}
final long offset = isMultipartObject ? HttpRangeUtils.getStartOffsetFromRangeHeader(getObjectResponse.contentRange()) : 0L;
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset);
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset, metadata);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -77,6 +78,7 @@
private long currentOffset;
private boolean closed;
private boolean eof;
private Map<String, String> metadata;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
Expand Down Expand Up @@ -122,6 +124,7 @@
getObjectResponseInputStream.response().contentLength()
);
this.currentStream = getObjectResponseInputStream;
this.metadata = getObjectResponseInputStream.response().metadata();
this.isStreamAborted.set(false);
} catch (final SdkException e) {
if (e instanceof S3Exception) {
Expand Down Expand Up @@ -265,4 +268,8 @@
boolean isAborted() {
return isStreamAborted.get();
}

Map<String, String> getMetadata() {
return this.metadata;

Check warning on line 273 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java#L273

Added line #L273 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));

if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());

Check warning on line 136 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java#L136

Added line #L136 was not covered by tests
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
Expand Down Expand Up @@ -327,6 +331,10 @@
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));

if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) {
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());

Check warning on line 336 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java#L336

Added line #L336 was not covered by tests
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.repositories.s3.async;

import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
import java.util.Map;

/**
* A model encapsulating all details for an upload to S3
Expand All @@ -24,8 +26,8 @@ public class UploadRequest {
private final CheckedConsumer<Boolean, IOException> uploadFinalizer;
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;

private boolean uploadRetryEnabled;
private final Map<String, String> metadata;

/**
* Construct a new UploadRequest object
Expand All @@ -37,6 +39,7 @@ public class UploadRequest {
* @param uploadFinalizer An upload finalizer to call once all parts are uploaded
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
* @param metadata Metadata of the file being uploaded
*/
public UploadRequest(
String bucket,
Expand All @@ -46,7 +49,8 @@ public UploadRequest(
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum,
boolean uploadRetryEnabled
boolean uploadRetryEnabled,
@Nullable Map<String, String> metadata
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -56,6 +60,7 @@ public UploadRequest(
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
this.metadata = metadata;
}

public String getBucket() {
Expand Down Expand Up @@ -89,4 +94,11 @@ public Long getExpectedChecksum() {
public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

/**
* @return metadata of the blob to be uploaded
*/
public Map<String, String> getMetadata() {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
Expand All @@ -75,6 +76,7 @@

import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -471,7 +473,7 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
return new InputStreamContainer(inputStream, size, position, null);
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -527,7 +529,7 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
return new InputStreamContainer(inputStream, size, position, null);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -649,7 +651,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
return new InputStreamContainer(inputStream, size, position, null);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
Expand All @@ -659,6 +661,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
.writePriority(WritePriority.HIGH)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.metadata(new HashMap<>())
.build();

s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
Expand All @@ -668,7 +671,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
} else {
assertNull(exceptionRef.get());
}
verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong());
verify(s3BlobContainer, times(1)).executeMultipartUpload(
any(S3BlobStore.class),
anyString(),
any(InputStream.class),
anyLong(),
anyMap()
);

if (expectException) {
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));
Expand Down
Loading
Loading