diff --git a/CHANGELOG.md b/CHANGELOG.md index 422a8cb8e7260..eda8fab74e87e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow test clusters to run with TLS ([#8900](https://github.com/opensearch-project/OpenSearch/pull/8900)) - Add jdk.incubator.vector module support for JDK 20+ ([#8601](https://github.com/opensearch-project/OpenSearch/pull/8601)) - [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081)) +- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java index dcbd52d311230..669190f4e2490 100644 --- a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java @@ -32,6 +32,7 @@ package org.opensearch.repositories.hdfs; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; @@ -46,6 +47,7 @@ import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.support.AbstractBlobContainer; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.io.Streams; import org.opensearch.repositories.hdfs.HdfsBlobStore.Operation; import java.io.FileNotFoundException; @@ -125,8 +127,23 @@ public InputStream readBlob(String blobName) throws IOException { } @Override - public InputStream readBlob(String blobName, long position, long length) { - throw new UnsupportedOperationException(); + public InputStream readBlob(String blobName, long position, long length) throws IOException { + return store.execute(fileContext -> { + final FSDataInputStream stream; + try { + stream = fileContext.open(new Path(path, blobName), bufferSize); + } catch (FileNotFoundException fnfe) { + throw new NoSuchFileException("[" + blobName + "] blob not found"); + } + // Seek to the desired start position, closing the stream if any error occurs + try { + stream.seek(position); + } catch (Exception e) { + stream.close(); + throw e; + } + return Streams.limitStream(new HDFSPrivilegedInputSteam(stream, securityContext), length); + }); } @Override