From 03013c5c124efc73e843aa3d75292d39db47b83c Mon Sep 17 00:00:00 2001 From: Vikas Bansal <43470111+vikasvb90@users.noreply.github.com> Date: Thu, 19 Oct 2023 08:40:13 +0530 Subject: [PATCH] Added close on buffered stream in s3 async upload for additional cleanup (#10710) Signed-off-by: vikasvb90 --- .../s3/async/AsyncPartsHandler.java | 44 +++++++++++------- .../s3/async/AsyncTransferManager.java | 20 +++++--- .../s3/async/AsyncTransferManagerTests.java | 46 ++++++++++++++----- 3 files changed, 75 insertions(+), 35 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index 86bb70e5a40a2..6007d9f9c8a1c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -29,6 +29,7 @@ import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -140,28 +141,39 @@ private static void uploadPart( ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH ? priorityExecutorService : executorService; + // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered + // data can be retried instead of retrying whole file by the application. + InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); CompletableFuture uploadPartResponseFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.uploadPart( uploadPartRequest, - AsyncRequestBody.fromInputStream( - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)), - inputStreamContainer.getContentLength(), - streamReadExecutor - ) + AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) ) ); - CompletableFuture convertFuture = uploadPartResponseFuture.thenApply( - uploadPartResponse -> convertUploadPartResponse( - completedParts, - inputStreamContainers, - uploadPartResponse, - partNumber, - uploadRequest.doRemoteDataIntegrityCheck() - ) - ); + CompletableFuture convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> { + try { + inputStream.close(); + } catch (IOException ex) { + log.error( + () -> new ParameterizedMessage( + "Failed to close stream while uploading a part of idx {} and file {}.", + uploadPartRequest.partNumber(), + uploadPartRequest.key() + ), + ex + ); + } + }) + .thenApply( + uploadPartResponse -> convertUploadPartResponse( + completedParts, + inputStreamContainers, + uploadPartResponse, + partNumber, + uploadRequest.doRemoteDataIntegrityCheck() + ) + ); futures.add(convertFuture); CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index db04636b89d50..a52745e33073e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -40,6 +40,7 @@ import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Base64; import java.util.List; @@ -310,17 +311,22 @@ private void uploadInOneChunk( ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH ? priorityExecutorService : executorService; + // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered + // data can be retried instead of retrying whole file by the application. + InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); CompletableFuture putObjectFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.putObject( putObjectRequestBuilder.build(), - AsyncRequestBody.fromInputStream( - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)), - inputStreamContainer.getContentLength(), - streamReadExecutor - ) + AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) ).handle((resp, throwable) -> { + try { + inputStream.close(); + } catch (IOException e) { + log.error( + () -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()), + e + ); + } if (throwable != null) { Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class); if (unwrappedThrowable != null) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 607453986ab16..97a746cdeed93 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -37,9 +37,14 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -71,17 +76,16 @@ public void testOneChunkUpload() { putObjectResponseCompletableFuture ); + AtomicReference streamRef = new AtomicReference<>(); CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing }, false, null), - new StreamContext( - (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), - ByteSizeUnit.MB.toBytes(1), - ByteSizeUnit.MB.toBytes(1), - 1 - ), + new StreamContext((partIdx, partSize, position) -> { + streamRef.set(new ZeroInputStream(partSize)); + return new InputStreamContainer(streamRef.get(), partSize, position); + }, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1), new StatsMetricPublisher() ); @@ -92,6 +96,14 @@ public void testOneChunkUpload() { } verify(s3AsyncClient, times(1)).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + + boolean closeError = false; + try { + streamRef.get().available(); + } catch (IOException e) { + closeError = e.getMessage().equals("Stream closed"); + } + assertTrue("InputStream was still open after upload", closeError); } public void testOneChunkUploadCorruption() { @@ -162,17 +174,17 @@ public void testMultipartUpload() { abortMultipartUploadResponseCompletableFuture ); + List streams = new ArrayList<>(); CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing }, true, 3376132981L), - new StreamContext( - (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), - ByteSizeUnit.MB.toBytes(1), - ByteSizeUnit.MB.toBytes(1), - 5 - ), + new StreamContext((partIdx, partSize, position) -> { + InputStream stream = new ZeroInputStream(partSize); + streams.add(stream); + return new InputStreamContainer(stream, partSize, position); + }, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5), new StatsMetricPublisher() ); @@ -182,6 +194,16 @@ public void testMultipartUpload() { fail("did not expect resultFuture to fail"); } + streams.forEach(stream -> { + boolean closeError = false; + try { + stream.available(); + } catch (IOException e) { + closeError = e.getMessage().equals("Stream closed"); + } + assertTrue("InputStream was still open after upload", closeError); + }); + verify(s3AsyncClient, times(1)).createMultipartUpload(any(CreateMultipartUploadRequest.class)); verify(s3AsyncClient, times(5)).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class)); verify(s3AsyncClient, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));