Skip to content

Commit

Permalink
Move functionality to S3BlobContainer
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Sep 7, 2023
1 parent 7491708 commit 3bde359
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.repositories.s3;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand All @@ -44,11 +46,15 @@
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
Expand Down Expand Up @@ -82,8 +88,8 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -222,9 +228,8 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client();
final String bucketName = blobStore.bucket();
final AsyncTransferManager transferManager = blobStore.getAsyncTransferManager();

final GetObjectAttributesResponse blobMetadata = transferManager.getBlobMetadata(s3AsyncClient, bucketName, blobName).get();
final GetObjectAttributesResponse blobMetadata = getBlobMetadata(s3AsyncClient, bucketName, blobName).get();

final long blobSize = blobMetadata.objectSize();
final int numberOfParts = blobMetadata.objectParts().totalPartsCount();
Expand All @@ -234,26 +239,22 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
final List<CompletableFuture<InputStreamContainer>> blobPartInputStreamFutures = new ArrayList<>();
// S3 multipart files use 1 to n indexing
for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) {
int finalPartNumber = partNumber - 1;
CompletableFuture<InputStreamContainer> partInputStreamFuture = transferManager.getBlobPartInputStreamContainer(
s3AsyncClient,
bucketName,
blobName,
partNumber
).whenComplete((inputStreamContainer, error) -> {
if (error == null) {
blobPartStreams.add(finalPartNumber, inputStreamContainer);
}
});

blobPartInputStreamFutures.add(partInputStreamFuture);
blobPartInputStreamFutures.add(getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobName, partNumber));
}

CompletableFuture.allOf(blobPartInputStreamFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> {
if (throwable == null) {
listener.onResponse(new ReadContext(blobSize, blobPartStreams, blobChecksum));
listener.onResponse(
new ReadContext(
blobSize,
blobPartInputStreamFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()),
blobChecksum
)
);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
Exception ex = throwable.getCause() instanceof Exception
? (Exception) throwable.getCause()
: new Exception(throwable.getCause());
listener.onFailure(ex);
}
});
Expand Down Expand Up @@ -678,4 +679,65 @@ static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long par
return Tuple.tuple(parts + 1, remaining);
}
}

/**
* Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds
* the stream and its related metadata.
* @param s3AsyncClient Async client to be utilized to fetch the object part
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the parts will be fetched
* @param partNumber Part number for the blob to be retrieved
* @return A future of {@link InputStreamContainer} containing the stream and stream metadata.
*/
CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
S3AsyncClient s3AsyncClient,
String bucketName,
String blobName,
int partNumber
) {
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobName)
.partNumber(partNumber);

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(S3BlobContainer::transformResponseToInputStreamContainer)
);
}

/**
* Transforms the stream response object from S3 into an {@link InputStreamContainer}
* @param streamResponse Response stream object from S3
* @return {@link InputStreamContainer} containing the stream and stream metadata
*/
// Package-Private for testing.
static InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream<GetObjectResponse> streamResponse) {
final GetObjectResponse getObjectResponse = streamResponse.response();
final String contentRange = getObjectResponse.contentRange();
final Long contentLength = getObjectResponse.contentLength();
if (contentRange == null || contentLength == null) {
throw SdkException.builder().message("Failed to fetch required metadata for blob part").build();
}
final Long offset = HttpRangeUtils.getStartOffsetFromRangeHeader(getObjectResponse.contentRange());
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset);
}

/**
* Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket.
* @param s3AsyncClient Async client to be utilized to fetch the metadata
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the metadata will be fetched
* @return A future containing the metadata within {@link GetObjectAttributesResponse}
*/
CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3AsyncClient, String bucketName, String blobName) {
// Fetch blob metadata - part info, size, checksum
final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.build();

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@

package org.opensearch.repositories.s3.async;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
Expand All @@ -23,11 +20,6 @@
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CompletableFutureUtils;
Expand All @@ -37,16 +29,13 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.StreamContext;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.exception.CorruptFileException;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.io.CheckedContainer;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -364,67 +353,4 @@ private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest upl
return null;
});
}

/**
* Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds
* the stream and its related metadata.
* @param s3AsyncClient Async client to be utilized to fetch the object part
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the parts will be fetched
* @param partNumber Part number for the blob to be retrieved
* @return A future of {@link InputStreamContainer} containing the stream and stream metadata.
*/
@ExperimentalApi
public CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
S3AsyncClient s3AsyncClient,
String bucketName,
String blobName,
int partNumber
) {
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobName)
.partNumber(partNumber);

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(this::transformResponseToInputStreamContainer)
);
}

/**
* Transforms the stream response object from S3 into an {@link InputStreamContainer}
* @param streamResponse Response stream object from S3
* @return {@link InputStreamContainer} containing the stream and stream metadata
*/
// Package-Private for testing.
InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream<GetObjectResponse> streamResponse) {
final GetObjectResponse getObjectResponse = streamResponse.response();
final String contentRange = getObjectResponse.contentRange();
final Long contentLength = getObjectResponse.contentLength();
if (contentRange == null || contentLength == null) {
throw SdkException.builder().message("Failed to fetch required metadata for blob part").build();
}
final Tuple<Long, Long> s3ResponseRange = HttpRangeUtils.fromHttpRangeHeader(getObjectResponse.contentRange());
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), s3ResponseRange.v1());
}

/**
* Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket.
* @param s3AsyncClient Async client to be utilized to fetch the metadata
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the metadata will be fetched
* @return A future containing the metadata within {@link GetObjectAttributesResponse}
*/
@ExperimentalApi
public CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3AsyncClient, String bucketName, String blobName) {
// Fetch blob metadata - part info, size, checksum
final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.build();

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,26 @@

import software.amazon.awssdk.core.exception.SdkException;

import org.opensearch.common.collect.Tuple;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public final class HttpRangeUtils {
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-(\\d+)/(\\d+|.*)$");
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-\\d+[/\\d*]+$");

/**
* Parses the content range header string value to calculate the start and end of the HTTP response.
* Parses the content range header string value to calculate the start (offset) of the HTTP response.
* Tests against the RFC9110 specification of content range string.
* Sample values: "bytes 0-10/200", "bytes 0-10/*"
* <a href="https://www.rfc-editor.org/rfc/rfc9110.html#name-content-range">Details here</a>
* @param headerValue Header content range string value from the HTTP response
* @return Pair of values where v1 represents the lower and v2 represents the upper bound of the stream
* @return Start (Offset) value of the HTTP response
*/
public static Tuple<Long, Long> fromHttpRangeHeader(String headerValue) {
public static Long getStartOffsetFromRangeHeader(String headerValue) {
Matcher matcher = RANGE_PATTERN.matcher(headerValue);
if (!matcher.find()) {
throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException());
}
return new Tuple<>(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2)));
return Long.parseLong(matcher.group(1));
}

/**
Expand Down
Loading

0 comments on commit 3bde359

Please sign in to comment.