From 0c839c3e35b4086acb88a276de9d10ece637d318 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Wed, 23 Aug 2023 14:42:12 -0700 Subject: [PATCH] Fix range reads in respository-s3 (#9516) The `readBlob(String, long, long)` method in the S3 repository has been broken since the upgrade to AWS SDK v2. The cause is that the SDK v2 returns the content range length details in a string formatting per the [RFC 9110][1] spec. For example: ``` bytes 0-100/200 ``` However, the code was attempting to parse it as: ``` bytes=0-100 ``` The fix here is to not parse this string at all and instead use `GetObjectResponse#contentLength`. Note that the incorrect format matches how a range is specified in a _request_ per the [byte ranges][2] section of the RFC and that is likely the source of the confusion. We lack any dedicated integration testing of this method so the bug was not caught by any tests. Additionally, the range read is only used by the searchable snapshot feature currently and we have no automated integration testing with the different repository implementations. One other complicating factor is that due to a fallback path that returns `Long.MAX_VALUE - 1` when the range is failed to be parsed, the bug will only manifest as a long overflow error when requesting past the first block and therefore wasn't caught with very simple manual testing. [1]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-range [2]: https://www.rfc-editor.org/rfc/rfc9110.html#name-byte-ranges Signed-off-by: Andrew Ross --- CHANGELOG.md | 1 + .../hdfs/HdfsBlobStoreRepositoryTests.java | 4 +++ .../s3/S3RetryingInputStream.java | 26 +----------------- .../repositories/s3/utils/HttpRangeUtils.java | 27 ++++++------------- .../s3/S3RetryingInputStreamTests.java | 5 ++-- ...earchBlobStoreRepositoryIntegTestCase.java | 21 +++++++++++++++ 6 files changed, 37 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 199461fd93cd7..80721b0f1b3e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -161,6 +161,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix flaky ResourceAwareTasksTests.testBasicTaskResourceTracking test ([#8993](https://github.com/opensearch-project/OpenSearch/pull/8993)) - Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403)) +- Fix range reads in respository-s3 ([9512](https://github.com/opensearch-project/OpenSearch/issues/9512)) ### Security diff --git a/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java b/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java index 0df39636b8ffa..6ff18b20036a8 100644 --- a/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java +++ b/plugins/repository-hdfs/src/test/java/org/opensearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java @@ -66,4 +66,8 @@ protected Settings repositorySettings() { protected Collection> nodePlugins() { return Collections.singletonList(HdfsPlugin.class); } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9513") + @Override + public void testReadRange() {} } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java index 6d41a72ac9af8..3a35f6135f28b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java @@ -40,7 +40,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.util.io.IOUtils; import org.opensearch.repositories.s3.utils.HttpRangeUtils; @@ -120,7 +119,7 @@ private void openStream() throws IOException { ); this.currentStreamLastOffset = Math.addExact( Math.addExact(start, currentOffset), - getStreamLength(getObjectResponseInputStream.response()) + getObjectResponseInputStream.response().contentLength() ); this.currentStream = getObjectResponseInputStream; this.isStreamAborted.set(false); @@ -134,29 +133,6 @@ private void openStream() throws IOException { } } - private long getStreamLength(final GetObjectResponse getObjectResponse) { - try { - // Returns the content range of the object if response contains the Content-Range header. - if (getObjectResponse.contentRange() != null) { - final Tuple s3ResponseRange = HttpRangeUtils.fromHttpRangeHeader(getObjectResponse.contentRange()); - assert s3ResponseRange.v2() >= s3ResponseRange.v1() : s3ResponseRange.v2() + " vs " + s3ResponseRange.v1(); - assert s3ResponseRange.v1() == start + currentOffset : "Content-Range start value [" - + s3ResponseRange.v1() - + "] exceeds start [" - + start - + "] + current offset [" - + currentOffset - + ']'; - assert s3ResponseRange.v2() == end : "Content-Range end value [" + s3ResponseRange.v2() + "] exceeds end [" + end + ']'; - return s3ResponseRange.v2() - s3ResponseRange.v1() + 1L; - } - return getObjectResponse.contentLength(); - } catch (Exception e) { - assert false : e; - return Long.MAX_VALUE - 1L; // assume a large stream so that the underlying stream is aborted on closing, unless eof is reached - } - } - @Override public int read() throws IOException { ensureOpen(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java index 97b9829124d0d..40aec7d52847b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java @@ -8,25 +8,14 @@ package org.opensearch.repositories.s3.utils; -import software.amazon.awssdk.core.exception.SdkException; - -import org.opensearch.common.collect.Tuple; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class HttpRangeUtils { - - private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$"); - - public static Tuple fromHttpRangeHeader(String headerValue) { - Matcher matcher = RANGE_PATTERN.matcher(headerValue); - if (!matcher.find()) { - throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException()); - } - return new Tuple<>(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2))); - } - +public final class HttpRangeUtils { + + /** + * Provides a byte range string per RFC 9110 + * @param start start position (inclusive) + * @param end end position (inclusive) + * @return A 'bytes=start-end' string + */ public static String toHttpRangeHeader(long start, long end) { return "bytes=" + start + "-" + end; } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java index 8be1d72c95b15..b38d5119b4108 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java @@ -38,7 +38,6 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import org.opensearch.common.io.Streams; -import org.opensearch.repositories.s3.utils.HttpRangeUtils; import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; @@ -104,11 +103,11 @@ public void testRangeInputStreamIsAborted() throws IOException { } private S3RetryingInputStream createInputStream(final byte[] data, final Long start, final Long length) throws IOException { - long end = Math.addExact(start, length - 1); + final long end = Math.addExact(start, length - 1); final S3Client client = mock(S3Client.class); when(client.getObject(any(GetObjectRequest.class))).thenReturn( new ResponseInputStream<>( - GetObjectResponse.builder().contentLength(length).contentRange(HttpRangeUtils.toHttpRangeHeader(start, end)).build(), + GetObjectResponse.builder().contentLength(length).build(), new ByteArrayInputStream(data, Math.toIntExact(start), Math.toIntExact(length)) ) ); diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java index 7dfe6781bf669..789858ca38fad 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java @@ -165,6 +165,27 @@ public void testWriteRead() throws IOException { } } + public void testReadRange() throws IOException { + try (BlobStore store = newBlobStore()) { + final BlobContainer container = store.blobContainer(new BlobPath()); + final byte[] data = randomBytes(4096); + + // Pick a subrange starting somewhere between position 100 and 1000 + // and ending somewhere between 100 bytes past that position and + // 100 bytes before the end + final int startOffset = randomIntBetween(100, 1000); + final int endOffset = randomIntBetween(startOffset + 100, data.length - 100); + final byte[] subrangeData = Arrays.copyOfRange(data, startOffset, endOffset); + + writeBlob(container, "foobar", new BytesArray(data), randomBoolean()); + try (InputStream stream = container.readBlob("foobar", startOffset, subrangeData.length)) { + final byte[] actual = stream.readAllBytes(); + assertArrayEquals(subrangeData, actual); + } + container.delete(); + } + } + public void testList() throws IOException { try (BlobStore store = newBlobStore()) { final BlobContainer container = store.blobContainer(new BlobPath());