From 7067f9538b420546415c0a09fa2bb7a553ed05d4 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 18 Sep 2023 06:15:21 +0000 Subject: [PATCH] Add async read support for S3 plugin (#9694) * Add async read support for S3 plugin Signed-off-by: Kunal Kotwani (cherry picked from commit 03ddc8a6b07221b1ed60ad5627939d1f957b1c49) Signed-off-by: Kunal Kotwani * Move functionality to S3BlobContainer Signed-off-by: Kunal Kotwani --------- Signed-off-by: Kunal Kotwani (cherry picked from commit 70a582fcbe82e9b4250e29cbf7d6db470370906a) Signed-off-by: github-actions[bot] --- CHANGELOG.md | 1 + .../repositories/s3/S3BlobContainer.java | 109 +++++- .../repositories/s3/utils/HttpRangeUtils.java | 22 ++ .../s3/S3BlobStoreContainerTests.java | 310 +++++++++++++++++- .../s3/utils/HttpRangeUtilsTests.java | 29 ++ 5 files changed, 458 insertions(+), 13 deletions(-) create mode 100644 plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b5eda7234d901..075f993d19009 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added - Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) +- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694)) ### Dependencies - Bump OpenTelemetry from 1.26.0 to 1.30.1 ([#9950](https://github.com/opensearch-project/OpenSearch/pull/9950)) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index bb1643faecc95..2911a018df337 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -32,6 +32,8 @@ package org.opensearch.repositories.s3; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -44,10 +46,15 @@ import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectAttributes; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Error; @@ -63,6 +70,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; @@ -75,11 +83,13 @@ import org.opensearch.common.blobstore.support.AbstractBlobContainer; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.async.UploadRequest; +import org.opensearch.repositories.s3.utils.HttpRangeUtils; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -212,9 +222,45 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } + @ExperimentalApi @Override public void readBlobAsync(String blobName, ActionListener listener) { - throw new UnsupportedOperationException(); + try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { + final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client(); + final String bucketName = blobStore.bucket(); + + final GetObjectAttributesResponse blobMetadata = getBlobMetadata(s3AsyncClient, bucketName, blobName).get(); + + final long blobSize = blobMetadata.objectSize(); + final int numberOfParts = blobMetadata.objectParts().totalPartsCount(); + final String blobChecksum = blobMetadata.checksum().checksumCRC32(); + + final List blobPartStreams = new ArrayList<>(); + final List> blobPartInputStreamFutures = new ArrayList<>(); + // S3 multipart files use 1 to n indexing + for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) { + blobPartInputStreamFutures.add(getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobName, partNumber)); + } + + CompletableFuture.allOf(blobPartInputStreamFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> { + if (throwable == null) { + listener.onResponse( + new ReadContext( + blobSize, + blobPartInputStreamFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()), + blobChecksum + ) + ); + } else { + Exception ex = throwable.getCause() instanceof Exception + ? (Exception) throwable.getCause() + : new Exception(throwable.getCause()); + listener.onFailure(ex); + } + }); + } catch (Exception ex) { + listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex)); + } } public boolean remoteIntegrityCheckSupported() { @@ -633,4 +679,65 @@ static Tuple numberOfMultiparts(final long totalSize, final long par return Tuple.tuple(parts + 1, remaining); } } + + /** + * Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds + * the stream and its related metadata. + * @param s3AsyncClient Async client to be utilized to fetch the object part + * @param bucketName Name of the S3 bucket + * @param blobName Identifier of the blob for which the parts will be fetched + * @param partNumber Part number for the blob to be retrieved + * @return A future of {@link InputStreamContainer} containing the stream and stream metadata. + */ + CompletableFuture getBlobPartInputStreamContainer( + S3AsyncClient s3AsyncClient, + String bucketName, + String blobName, + int partNumber + ) { + final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder() + .bucket(bucketName) + .key(blobName) + .partNumber(partNumber); + + return SocketAccess.doPrivileged( + () -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream()) + .thenApply(S3BlobContainer::transformResponseToInputStreamContainer) + ); + } + + /** + * Transforms the stream response object from S3 into an {@link InputStreamContainer} + * @param streamResponse Response stream object from S3 + * @return {@link InputStreamContainer} containing the stream and stream metadata + */ + // Package-Private for testing. + static InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream streamResponse) { + final GetObjectResponse getObjectResponse = streamResponse.response(); + final String contentRange = getObjectResponse.contentRange(); + final Long contentLength = getObjectResponse.contentLength(); + if (contentRange == null || contentLength == null) { + throw SdkException.builder().message("Failed to fetch required metadata for blob part").build(); + } + final Long offset = HttpRangeUtils.getStartOffsetFromRangeHeader(getObjectResponse.contentRange()); + return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset); + } + + /** + * Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket. + * @param s3AsyncClient Async client to be utilized to fetch the metadata + * @param bucketName Name of the S3 bucket + * @param blobName Identifier of the blob for which the metadata will be fetched + * @return A future containing the metadata within {@link GetObjectAttributesResponse} + */ + CompletableFuture getBlobMetadata(S3AsyncClient s3AsyncClient, String bucketName, String blobName) { + // Fetch blob metadata - part info, size, checksum + final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder() + .bucket(bucketName) + .key(blobName) + .objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS) + .build(); + + return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest)); + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java index 40aec7d52847b..2e2fc9b86a45b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java @@ -8,7 +8,29 @@ package org.opensearch.repositories.s3.utils; +import software.amazon.awssdk.core.exception.SdkException; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + public final class HttpRangeUtils { + private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-\\d+[/\\d*]+$"); + + /** + * Parses the content range header string value to calculate the start (offset) of the HTTP response. + * Tests against the RFC9110 specification of content range string. + * Sample values: "bytes 0-10/200", "bytes 0-10/*" + * Details here + * @param headerValue Header content range string value from the HTTP response + * @return Start (Offset) value of the HTTP response + */ + public static Long getStartOffsetFromRangeHeader(String headerValue) { + Matcher matcher = RANGE_PATTERN.matcher(headerValue); + if (!matcher.find()) { + throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException()); + } + return Long.parseLong(matcher.group(1)); + } /** * Provides a byte range string per RFC 9110 diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 1c4936cae7eba..a87c060dcc60a 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -32,11 +32,15 @@ package org.opensearch.repositories.s3; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.Checksum; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedPart; @@ -44,6 +48,11 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesParts; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; @@ -61,15 +70,18 @@ import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; -import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; @@ -86,14 +98,19 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -882,17 +899,6 @@ public void onFailure(Exception e) {} } } - public void testAsyncBlobDownload() { - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = mock(BlobPath.class); - final String blobName = "test-blob"; - - final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> { - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, new PlainActionFuture<>()); - }); - } - public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); } @@ -912,4 +918,284 @@ public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanPageSiz public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); } + + public void testReadBlobAsync() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String checksum = randomAlphaOfLength(10); + + final long objectSize = 100L; + final int objectPartCount = 10; + final int partSize = 10; + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + ); + final AsyncTransferManager asyncTransferManager = new AsyncTransferManager( + 10000L, + mock(ExecutorService.class), + mock(ExecutorService.class) + ); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = new BlobPath(); + + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); + when(blobStore.getAsyncTransferManager()).thenReturn(asyncTransferManager); + + CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); + getObjectAttributesResponseCompletableFuture.complete( + GetObjectAttributesResponse.builder() + .checksum(Checksum.builder().checksumCRC32(checksum).build()) + .objectSize(objectSize) + .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) + .build() + ); + when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( + getObjectAttributesResponseCompletableFuture + ); + + mockObjectPartResponse(s3AsyncClient, bucketName, blobName, objectPartCount, partSize, objectSize); + + CountDownLatch countDownLatch = new CountDownLatch(1); + CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); + LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.readBlobAsync(blobName, listener); + countDownLatch.await(); + + assertEquals(1, readContextActionListener.getResponseCount()); + assertEquals(0, readContextActionListener.getFailureCount()); + ReadContext readContext = readContextActionListener.getResponse(); + assertEquals(objectPartCount, readContext.getNumberOfParts()); + assertEquals(checksum, readContext.getBlobChecksum()); + assertEquals(objectSize, readContext.getBlobSize()); + + for (int partNumber = 1; partNumber < objectPartCount; partNumber++) { + InputStreamContainer inputStreamContainer = readContext.getPartStreams().get(partNumber); + final int offset = partNumber * partSize; + assertEquals(partSize, inputStreamContainer.getContentLength()); + assertEquals(offset, inputStreamContainer.getOffset()); + assertEquals(partSize, inputStreamContainer.getInputStream().readAllBytes().length); + } + } + + public void testReadBlobAsyncFailure() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String checksum = randomAlphaOfLength(10); + + final long objectSize = 100L; + final int objectPartCount = 10; + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + ); + final AsyncTransferManager asyncTransferManager = new AsyncTransferManager( + 10000L, + mock(ExecutorService.class), + mock(ExecutorService.class) + ); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = new BlobPath(); + + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); + when(blobStore.getAsyncTransferManager()).thenReturn(asyncTransferManager); + + CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); + getObjectAttributesResponseCompletableFuture.complete( + GetObjectAttributesResponse.builder() + .checksum(Checksum.builder().checksumCRC32(checksum).build()) + .objectSize(objectSize) + .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) + .build() + ); + when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenThrow(new RuntimeException()); + + CountDownLatch countDownLatch = new CountDownLatch(1); + CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); + LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.readBlobAsync(blobName, listener); + countDownLatch.await(); + + assertEquals(0, readContextActionListener.getResponseCount()); + assertEquals(1, readContextActionListener.getFailureCount()); + } + + public void testGetBlobMetadata() throws Exception { + final String checksum = randomAlphaOfLengthBetween(1, 10); + final long objectSize = 100L; + final int objectPartCount = 10; + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String bucketName = randomAlphaOfLengthBetween(1, 10); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = new BlobPath(); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); + getObjectAttributesResponseCompletableFuture.complete( + GetObjectAttributesResponse.builder() + .checksum(Checksum.builder().checksumCRC32(checksum).build()) + .objectSize(objectSize) + .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) + .build() + ); + when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( + getObjectAttributesResponseCompletableFuture + ); + + CompletableFuture responseFuture = blobContainer.getBlobMetadata(s3AsyncClient, bucketName, blobName); + GetObjectAttributesResponse objectAttributesResponse = responseFuture.get(); + + assertEquals(checksum, objectAttributesResponse.checksum().checksumCRC32()); + assertEquals(Long.valueOf(objectSize), objectAttributesResponse.objectSize()); + assertEquals(Integer.valueOf(objectPartCount), objectAttributesResponse.objectParts().totalPartsCount()); + } + + public void testGetBlobPartInputStream() throws Exception { + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final long contentLength = 10L; + final String contentRange = "bytes 0-10/100"; + final InputStream inputStream = ResponseInputStream.nullInputStream(); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = new BlobPath(); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).contentRange(contentRange).build(); + + CompletableFuture> getObjectPartResponse = new CompletableFuture<>(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + getObjectPartResponse.complete(responseInputStream); + + when( + s3AsyncClient.getObject( + any(GetObjectRequest.class), + ArgumentMatchers.>>any() + ) + ).thenReturn(getObjectPartResponse); + + InputStreamContainer inputStreamContainer = blobContainer.getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobName, 0) + .get(); + + assertEquals(0, inputStreamContainer.getOffset()); + assertEquals(contentLength, inputStreamContainer.getContentLength()); + assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); + } + + public void testTransformResponseToInputStreamContainer() throws Exception { + final String contentRange = "bytes 0-10/100"; + final long contentLength = 10L; + final InputStream inputStream = ResponseInputStream.nullInputStream(); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).build(); + + ResponseInputStream responseInputStreamNoRange = new ResponseInputStream<>(getObjectResponse, inputStream); + assertThrows(SdkException.class, () -> S3BlobContainer.transformResponseToInputStreamContainer(responseInputStreamNoRange)); + + getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).build(); + ResponseInputStream responseInputStreamNoContentLength = new ResponseInputStream<>( + getObjectResponse, + inputStream + ); + assertThrows(SdkException.class, () -> S3BlobContainer.transformResponseToInputStreamContainer(responseInputStreamNoContentLength)); + + getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).contentLength(contentLength).build(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + InputStreamContainer inputStreamContainer = S3BlobContainer.transformResponseToInputStreamContainer(responseInputStream); + assertEquals(contentLength, inputStreamContainer.getContentLength()); + assertEquals(0, inputStreamContainer.getOffset()); + assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); + } + + private void mockObjectPartResponse( + S3AsyncClient s3AsyncClient, + String bucketName, + String blobName, + int totalNumberOfParts, + int partSize, + long objectSize + ) { + for (int partNumber = 1; partNumber <= totalNumberOfParts; partNumber++) { + final int start = (partNumber - 1) * partSize; + final int end = partNumber * partSize; + final String contentRange = "bytes " + start + "-" + end + "/" + objectSize; + final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(partSize)); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder() + .contentLength((long) partSize) + .contentRange(contentRange) + .build(); + + CompletableFuture> getObjectPartResponse = new CompletableFuture<>(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + getObjectPartResponse.complete(responseInputStream); + + GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucketName).key(blobName).partNumber(partNumber).build(); + + when( + s3AsyncClient.getObject( + eq(getObjectRequest), + ArgumentMatchers.>>any() + ) + ).thenReturn(getObjectPartResponse); + } + } + + private static class CountingCompletionListener implements ActionListener { + private int responseCount; + private int failureCount; + private T response; + private Exception exception; + + @Override + public void onResponse(T response) { + this.response = response; + responseCount++; + } + + @Override + public void onFailure(Exception e) { + exception = e; + failureCount++; + } + + public int getResponseCount() { + return responseCount; + } + + public int getFailureCount() { + return failureCount; + } + + public T getResponse() { + return response; + } + + public Exception getException() { + return exception; + } + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java new file mode 100644 index 0000000000000..9a4267c5266e5 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.utils; + +import software.amazon.awssdk.core.exception.SdkException; + +import org.opensearch.test.OpenSearchTestCase; + +public final class HttpRangeUtilsTests extends OpenSearchTestCase { + + public void testFromHttpRangeHeader() { + String headerValue = "bytes 0-10/200"; + Long offset = HttpRangeUtils.getStartOffsetFromRangeHeader(headerValue); + assertEquals(0L, offset.longValue()); + + headerValue = "bytes 0-10/*"; + offset = HttpRangeUtils.getStartOffsetFromRangeHeader(headerValue); + assertEquals(0L, offset.longValue()); + + final String invalidHeaderValue = "bytes */*"; + assertThrows(SdkException.class, () -> HttpRangeUtils.getStartOffsetFromRangeHeader(invalidHeaderValue)); + } +}