-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add async blob read and download support using multiple streams (#9592)
Signed-off-by: Kunal Kotwani <[email protected]>
- Loading branch information
1 parent
1126d2f
commit 6765b16
Showing
15 changed files
with
745 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
46 changes: 46 additions & 0 deletions
46
server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read; | ||
|
||
import org.opensearch.common.annotation.ExperimentalApi; | ||
import org.opensearch.common.io.InputStreamContainer; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* ReadContext is used to encapsulate all data needed by <code>BlobContainer#readBlobAsync</code> | ||
*/ | ||
@ExperimentalApi | ||
public class ReadContext { | ||
private final long blobSize; | ||
private final List<InputStreamContainer> partStreams; | ||
private final String blobChecksum; | ||
|
||
public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String blobChecksum) { | ||
this.blobSize = blobSize; | ||
this.partStreams = partStreams; | ||
this.blobChecksum = blobChecksum; | ||
} | ||
|
||
public String getBlobChecksum() { | ||
return blobChecksum; | ||
} | ||
|
||
public int getNumberOfParts() { | ||
return partStreams.size(); | ||
} | ||
|
||
public long getBlobSize() { | ||
return blobSize; | ||
} | ||
|
||
public List<InputStreamContainer> getPartStreams() { | ||
return partStreams; | ||
} | ||
} |
47 changes: 47 additions & 0 deletions
47
...ain/java/org/opensearch/common/blobstore/stream/read/listener/FileCompletionListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read.listener; | ||
|
||
import org.opensearch.common.annotation.InternalApi; | ||
import org.opensearch.core.action.ActionListener; | ||
|
||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** | ||
* FileCompletionListener listens for completion of fetch on all the streams for a file, where | ||
* individual streams are handled using {@link FilePartWriter}. The {@link FilePartWriter}(s) | ||
* hold a reference to the file completion listener to be notified. | ||
*/ | ||
@InternalApi | ||
class FileCompletionListener implements ActionListener<Integer> { | ||
|
||
private final int numberOfParts; | ||
private final String fileName; | ||
private final AtomicInteger completedPartsCount; | ||
private final ActionListener<String> completionListener; | ||
|
||
public FileCompletionListener(int numberOfParts, String fileName, ActionListener<String> completionListener) { | ||
this.completedPartsCount = new AtomicInteger(); | ||
this.numberOfParts = numberOfParts; | ||
this.fileName = fileName; | ||
this.completionListener = completionListener; | ||
} | ||
|
||
@Override | ||
public void onResponse(Integer unused) { | ||
if (completedPartsCount.incrementAndGet() == numberOfParts) { | ||
completionListener.onResponse(fileName); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
completionListener.onFailure(e); | ||
} | ||
} |
90 changes: 90 additions & 0 deletions
90
...er/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read.listener; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.common.annotation.InternalApi; | ||
import org.opensearch.common.io.Channels; | ||
import org.opensearch.common.io.InputStreamContainer; | ||
import org.opensearch.core.action.ActionListener; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
/** | ||
* FilePartWriter transfers the provided stream into the specified file path using a {@link FileChannel} | ||
* instance. It performs offset based writes to the file and notifies the {@link FileCompletionListener} on completion. | ||
*/ | ||
@InternalApi | ||
class FilePartWriter implements Runnable { | ||
|
||
private final int partNumber; | ||
private final InputStreamContainer blobPartStreamContainer; | ||
private final Path fileLocation; | ||
private final AtomicBoolean anyPartStreamFailed; | ||
private final ActionListener<Integer> fileCompletionListener; | ||
private static final Logger logger = LogManager.getLogger(FilePartWriter.class); | ||
|
||
// 8 MB buffer for transfer | ||
private static final int BUFFER_SIZE = 8 * 1024 * 2024; | ||
|
||
public FilePartWriter( | ||
int partNumber, | ||
InputStreamContainer blobPartStreamContainer, | ||
Path fileLocation, | ||
AtomicBoolean anyPartStreamFailed, | ||
ActionListener<Integer> fileCompletionListener | ||
) { | ||
this.partNumber = partNumber; | ||
this.blobPartStreamContainer = blobPartStreamContainer; | ||
this.fileLocation = fileLocation; | ||
this.anyPartStreamFailed = anyPartStreamFailed; | ||
this.fileCompletionListener = fileCompletionListener; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
// Ensures no writes to the file if any stream fails. | ||
if (anyPartStreamFailed.get() == false) { | ||
try (FileChannel outputFileChannel = FileChannel.open(fileLocation, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { | ||
try (InputStream inputStream = blobPartStreamContainer.getInputStream()) { | ||
long streamOffset = blobPartStreamContainer.getOffset(); | ||
final byte[] buffer = new byte[BUFFER_SIZE]; | ||
int bytesRead; | ||
while ((bytesRead = inputStream.read(buffer)) != -1) { | ||
Channels.writeToChannel(buffer, 0, bytesRead, outputFileChannel, streamOffset); | ||
streamOffset += bytesRead; | ||
} | ||
} | ||
} catch (IOException e) { | ||
processFailure(e); | ||
return; | ||
} | ||
fileCompletionListener.onResponse(partNumber); | ||
} | ||
} | ||
|
||
void processFailure(Exception e) { | ||
try { | ||
Files.deleteIfExists(fileLocation); | ||
} catch (IOException ex) { | ||
// Die silently | ||
logger.info("Failed to delete file {} on stream failure: {}", fileLocation, ex); | ||
} | ||
if (anyPartStreamFailed.getAndSet(true) == false) { | ||
fileCompletionListener.onFailure(e); | ||
} | ||
} | ||
} |
65 changes: 65 additions & 0 deletions
65
...c/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read.listener; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.common.annotation.InternalApi; | ||
import org.opensearch.common.blobstore.stream.read.ReadContext; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.nio.file.Path; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
/** | ||
* ReadContextListener orchestrates the async file fetch from the {@link org.opensearch.common.blobstore.BlobContainer} | ||
* using a {@link ReadContext} callback. On response, it spawns off the download using multiple streams which are | ||
* spread across a {@link ThreadPool} executor. | ||
*/ | ||
@InternalApi | ||
public class ReadContextListener implements ActionListener<ReadContext> { | ||
|
||
private final String fileName; | ||
private final Path fileLocation; | ||
private final ThreadPool threadPool; | ||
private final ActionListener<String> completionListener; | ||
private static final Logger logger = LogManager.getLogger(ReadContextListener.class); | ||
|
||
public ReadContextListener(String fileName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) { | ||
this.fileName = fileName; | ||
this.fileLocation = fileLocation; | ||
this.threadPool = threadPool; | ||
this.completionListener = completionListener; | ||
} | ||
|
||
@Override | ||
public void onResponse(ReadContext readContext) { | ||
logger.trace("Streams received for blob {}", fileName); | ||
final int numParts = readContext.getNumberOfParts(); | ||
final AtomicBoolean anyPartStreamFailed = new AtomicBoolean(); | ||
FileCompletionListener fileCompletionListener = new FileCompletionListener(numParts, fileName, completionListener); | ||
|
||
for (int partNumber = 0; partNumber < numParts; partNumber++) { | ||
FilePartWriter filePartWriter = new FilePartWriter( | ||
partNumber, | ||
readContext.getPartStreams().get(partNumber), | ||
fileLocation, | ||
anyPartStreamFailed, | ||
fileCompletionListener | ||
); | ||
threadPool.executor(ThreadPool.Names.GENERIC).submit(filePartWriter); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
completionListener.onFailure(e); | ||
} | ||
} |
Oops, something went wrong.