Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to configure subscribeTimeout for BlockingOutputStreamAsyncRequestBody. #5000

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-0e79eba.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Allow users to configure `subscribeTimeout` for BlockingOutputStreamAsyncRequestBody. See [#4893](https://github.com/aws/aws-sdk-java-v2/issues/4893)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLe
* <p>The caller is responsible for calling {@link OutputStream#close()} on the
* {@link BlockingOutputStreamAsyncRequestBody#outputStream()} when writing is complete.
*
* <p>By default, it will time out if streaming hasn't started within 10 seconds, and you can configure the timeout
* via {@link BlockingOutputStreamAsyncRequestBody#builder()}
* <p><b>Example Usage</b>
* <p>
* {@snippet :
Expand All @@ -440,9 +442,12 @@ static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLe
* // Wait for the service to respond.
* PutObjectResponse response = responseFuture.join();
* }
* @see BlockingOutputStreamAsyncRequestBody
*/
static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) {
return new BlockingOutputStreamAsyncRequestBody(contentLength);
return BlockingOutputStreamAsyncRequestBody.builder()
.contentLength(contentLength)
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.CancellableOutputStream;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.OutputStreamPublisher;

/**
Expand All @@ -46,13 +47,11 @@ public final class BlockingOutputStreamAsyncRequestBody implements AsyncRequestB
private final Long contentLength;
private final Duration subscribeTimeout;

BlockingOutputStreamAsyncRequestBody(Long contentLength) {
this(contentLength, Duration.ofSeconds(10));
}

BlockingOutputStreamAsyncRequestBody(Long contentLength, Duration subscribeTimeout) {
this.contentLength = contentLength;
this.subscribeTimeout = subscribeTimeout;
private BlockingOutputStreamAsyncRequestBody(Builder builder) {
this.contentLength = builder.contentLength;
this.subscribeTimeout = Validate.isPositiveOrNull(builder.subscribeTimeout, "subscribeTimeout") != null ?
builder.subscribeTimeout :
Duration.ofSeconds(10);
}

/**
Expand All @@ -69,6 +68,13 @@ public CancellableOutputStream outputStream() {
return delegate;
}

/**
* Creates a default builder for {@link BlockingOutputStreamAsyncRequestBody}.
*/
public static Builder builder() {
return new Builder();
}

@Override
public Optional<Long> contentLength() {
return Optional.ofNullable(contentLength);
Expand Down Expand Up @@ -99,4 +105,41 @@ private void waitForSubscriptionIfNeeded() {
throw new RuntimeException("Interrupted while waiting for subscription.", e);
}
}

public static final class Builder {
private Duration subscribeTimeout;
private Long contentLength;

private Builder() {
}

/**
* Defines how long it should wait for this AsyncRequestBody to be subscribed (to start streaming) before timing out.
* By default, it's 10 seconds.
*
* <p>You may want to increase it if the request may not be executed right away.
*
* @param subscribeTimeout the timeout
* @return Returns a reference to this object so that method calls can be chained together.
*/
public Builder subscribeTimeout(Duration subscribeTimeout) {
this.subscribeTimeout = subscribeTimeout;
return this;
}

/**
* The content length of the output stream.
*
* @param contentLength the content length
* @return Returns a reference to this object so that method calls can be chained together.
*/
public Builder contentLength(Long contentLength) {
this.contentLength = contentLength;
return this;
}

public BlockingOutputStreamAsyncRequestBody build() {
return new BlockingOutputStreamAsyncRequestBody(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public void outputStream_waitsForSubscription() throws IOException {

@Test
@Timeout(10)
public void outputStream_failsIfSubscriptionNeverComes() {
public void outputStream_overrideSubscribeTimeout_failsIfSubscriptionNeverComes() {
BlockingOutputStreamAsyncRequestBody requestBody =
new BlockingOutputStreamAsyncRequestBody(0L, Duration.ofSeconds(1));
BlockingOutputStreamAsyncRequestBody.builder().contentLength(0L).subscribeTimeout(Duration.ofSeconds(1)).build();
assertThatThrownBy(requestBody::outputStream).hasMessageContaining("The service request was not made");
}

Expand Down
Loading