From 7bffae067aa741fb320143fc1305502f1f31a6f7 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Wed, 6 Mar 2024 20:13:25 -0800 Subject: [PATCH] Expose subscribeTimeout config for BlockingOutputStreamAsyncRequestBody --- .../feature-AWSSDKforJavav2-0e79eba.json | 6 ++ .../awssdk/core/async/AsyncRequestBody.java | 7 ++- .../BlockingOutputStreamAsyncRequestBody.java | 57 ++++++++++++++++--- ...ckingOutputStreamAsyncRequestBodyTest.java | 4 +- 4 files changed, 64 insertions(+), 10 deletions(-) create mode 100644 .changes/next-release/feature-AWSSDKforJavav2-0e79eba.json diff --git a/.changes/next-release/feature-AWSSDKforJavav2-0e79eba.json b/.changes/next-release/feature-AWSSDKforJavav2-0e79eba.json new file mode 100644 index 000000000000..3342d8419699 --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-0e79eba.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Allow users to configure `subscribeTimeout` for BlockingOutputStreamAsyncRequestBody. See [#4893](https://github.com/aws/aws-sdk-java-v2/issues/4893)" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 7f9d4be67b2d..cde394b6ed42 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -418,6 +418,8 @@ static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLe *

The caller is responsible for calling {@link OutputStream#close()} on the * {@link BlockingOutputStreamAsyncRequestBody#outputStream()} when writing is complete. * + *

By default, it will time out if streaming hasn't started within 10 seconds, and you can configure the timeout + * via {@link BlockingOutputStreamAsyncRequestBody#builder()} *

Example Usage *

* {@snippet : @@ -440,9 +442,12 @@ static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLe * // Wait for the service to respond. * PutObjectResponse response = responseFuture.join(); * } + * @see BlockingOutputStreamAsyncRequestBody */ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) { - return new BlockingOutputStreamAsyncRequestBody(contentLength); + return BlockingOutputStreamAsyncRequestBody.builder() + .contentLength(contentLength) + .build(); } /** diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBody.java index 872e25306cc8..c0a044ffa3ba 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBody.java @@ -27,6 +27,7 @@ import software.amazon.awssdk.core.exception.NonRetryableException; import software.amazon.awssdk.core.internal.util.NoopSubscription; import software.amazon.awssdk.utils.CancellableOutputStream; +import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.async.OutputStreamPublisher; /** @@ -46,13 +47,11 @@ public final class BlockingOutputStreamAsyncRequestBody implements AsyncRequestB private final Long contentLength; private final Duration subscribeTimeout; - BlockingOutputStreamAsyncRequestBody(Long contentLength) { - this(contentLength, Duration.ofSeconds(10)); - } - - BlockingOutputStreamAsyncRequestBody(Long contentLength, Duration subscribeTimeout) { - this.contentLength = contentLength; - this.subscribeTimeout = subscribeTimeout; + private BlockingOutputStreamAsyncRequestBody(Builder builder) { + this.contentLength = builder.contentLength; + this.subscribeTimeout = Validate.isPositiveOrNull(builder.subscribeTimeout, "subscribeTimeout") != null ? + builder.subscribeTimeout : + Duration.ofSeconds(10); } /** @@ -69,6 +68,13 @@ public CancellableOutputStream outputStream() { return delegate; } + /** + * Creates a default builder for {@link BlockingOutputStreamAsyncRequestBody}. + */ + public static Builder builder() { + return new Builder(); + } + @Override public Optional contentLength() { return Optional.ofNullable(contentLength); @@ -99,4 +105,41 @@ private void waitForSubscriptionIfNeeded() { throw new RuntimeException("Interrupted while waiting for subscription.", e); } } + + public static final class Builder { + private Duration subscribeTimeout; + private Long contentLength; + + private Builder() { + } + + /** + * Defines how long it should wait for this AsyncRequestBody to be subscribed (to start streaming) before timing out. + * By default, it's 10 seconds. + * + *

You may want to increase it if the request may not be executed right away. + * + * @param subscribeTimeout the timeout + * @return Returns a reference to this object so that method calls can be chained together. + */ + public Builder subscribeTimeout(Duration subscribeTimeout) { + this.subscribeTimeout = subscribeTimeout; + return this; + } + + /** + * The content length of the output stream. + * + * @param contentLength the content length + * @return Returns a reference to this object so that method calls can be chained together. + */ + public Builder contentLength(Long contentLength) { + this.contentLength = contentLength; + return this; + } + + public BlockingOutputStreamAsyncRequestBody build() { + return new BlockingOutputStreamAsyncRequestBody(this); + } + } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBodyTest.java index 5cfcbc593916..ed2aa9b1a7ca 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBodyTest.java @@ -50,9 +50,9 @@ public void outputStream_waitsForSubscription() throws IOException { @Test @Timeout(10) - public void outputStream_failsIfSubscriptionNeverComes() { + public void outputStream_overrideSubscribeTimeout_failsIfSubscriptionNeverComes() { BlockingOutputStreamAsyncRequestBody requestBody = - new BlockingOutputStreamAsyncRequestBody(0L, Duration.ofSeconds(1)); + BlockingOutputStreamAsyncRequestBody.builder().contentLength(0L).subscribeTimeout(Duration.ofSeconds(1)).build(); assertThatThrownBy(requestBody::outputStream).hasMessageContaining("The service request was not made"); }