Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamMessage.timeout() #5761

Merged
merged 28 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
045afcc
Implement TimeoutStreamMessage for stream message timeout functionality
sjy982 Jun 11, 2024
5818e6c
feat: Implement TimeoutSubscriber for stream timeout handling
sjy982 Jun 11, 2024
02cccc3
Add timeout method to StreamMessage
sjy982 Jun 12, 2024
f47216e
refactor: Timeout, streamTimeoutMode variable names changed
sjy982 Jun 12, 2024
1d8a37f
feat: Add timeout method to HttpRequest
sjy982 Jun 12, 2024
8cd5d4f
feat: Add timeout method to HttpResponse
sjy982 Jun 12, 2024
926d53d
Refactor TimeoutSubscriber to use isCancelled() instead of null checks
sjy982 Jun 19, 2024
4d0e10f
Add TimeoutStreamMessageTest
sjy982 Jun 24, 2024
d1ea297
refactor: Optimize timeout handling in TimeoutSubscriber
sjy982 Jun 24, 2024
318ebf3
refactor: deduplication, Runnable implementation, change boundary con…
sjy982 Jun 25, 2024
b095d33
Add timeout unscheduling logic to abort().
sjy982 Jun 27, 2024
621e141
Create Java Docs,
sjy982 Jul 1, 2024
6c86d2f
Applying reviews
sjy982 Jul 2, 2024
f8248a7
1. Delete aptTerminate Method -> Replace with completed
sjy982 Jul 3, 2024
b1d03c7
delegate.onSubscribe(s); -> delegate.onSubscribe(this);
sjy982 Jul 3, 2024
45008d8
1. replace 2019 with 2024
sjy982 Jul 7, 2024
e4c3916
1. call delegate.onSubscribe() in TimeoutSubscriber's onSubscribe(), …
sjy982 Jul 8, 2024
13228b3
lint
sjy982 Jul 8, 2024
73adfff
Remove blank lines
sjy982 Jul 8, 2024
38b6eb9
1. Add `@UnstableApi` to `HttpResponse`, `HttpRequest`, and `StreamTi…
sjy982 Jul 10, 2024
566d725
1. Added `boolean canceled` to prevent unnecessary scheduling.
sjy982 Jul 11, 2024
eb8d450
1. Modify JavaDoc
sjy982 Jul 17, 2024
994ce96
lint pass
sjy982 Jul 17, 2024
cdeec5c
lint pass
sjy982 Jul 17, 2024
a95f279
javadoc
ikhoon Jul 25, 2024
efb32b3
Merge branch 'main' into pr-5761-sjy982-feature/stream-message-timeout
ikhoon Jul 25, 2024
2560092
nullaway
ikhoon Jul 25, 2024
24245a4
Merge branch 'main' into feature/stream-message-timeout
jrhee17 Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/HttpRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Formatter;
import java.util.List;
import java.util.Locale;
Expand All @@ -47,6 +48,7 @@
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.stream.StreamTimeoutMode;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.internal.common.DefaultHttpRequest;
import com.linecorp.armeria.internal.common.DefaultSplitHttpRequest;
Expand Down Expand Up @@ -816,4 +818,17 @@ default HttpRequest subscribeOn(EventExecutor eventExecutor) {
requireNonNull(eventExecutor, "eventExecutor");
return of(headers(), HttpMessage.super.subscribeOn(eventExecutor));
}

@Override
default HttpRequest timeout(Duration timeoutDuration) {
sjy982 marked this conversation as resolved.
Show resolved Hide resolved
requireNonNull(timeoutDuration, "timeoutDuration");
return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT);
}

@Override
sjy982 marked this conversation as resolved.
Show resolved Hide resolved
default HttpRequest timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
requireNonNull(timeoutDuration, "timeoutDuration");
requireNonNull(timeoutMode, "timeoutMode");
return of(headers(), HttpMessage.super.timeout(timeoutDuration, timeoutMode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.stream.StreamTimeoutMode;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.common.AbortedHttpResponse;
Expand Down Expand Up @@ -1183,4 +1184,17 @@ default <T extends Throwable> HttpResponse recover(Class<T> causeClass,
default HttpResponse subscribeOn(EventExecutor eventExecutor) {
return of(HttpMessage.super.subscribeOn(eventExecutor));
}

@Override
default HttpResponse timeout(Duration timeoutDuration) {
requireNonNull(timeoutDuration, "timeoutDuration");
return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT);
}

@Override
default HttpResponse timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
requireNonNull(timeoutDuration, "timeoutDuration");
requireNonNull(timeoutMode, "timeoutMode");
return of(HttpMessage.super.timeout(timeoutDuration, timeoutMode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -1203,4 +1204,47 @@ default StreamMessage<T> subscribeOn(EventExecutor eventExecutor) {
requireNonNull(eventExecutor, "eventExecutor");
return new SubscribeOnStreamMessage<>(this, eventExecutor);
}

/**
* Configures a timeout for the stream based on the specified duration.
* The default timeout mode is {@link StreamTimeoutMode#UNTIL_NEXT}.
*
* <p>Example usage:
* <pre>{@code
* StreamMessage<String> stream = ...;
* StreamMessage<String> timeoutStream = stream.timeout(Duration.ofSeconds(10));
* }</pre>
*
* @param timeoutDuration the duration before a timeout occurs
* @return a new {@link TimeoutStreamMessage} with the specified timeout duration and default mode
sjy982 marked this conversation as resolved.
Show resolved Hide resolved
*/
@UnstableApi
default StreamMessage<T> timeout(Duration timeoutDuration) {
minwoox marked this conversation as resolved.
Show resolved Hide resolved
requireNonNull(timeoutDuration, "timeoutDuration");
return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT);
}

/**
* Configures a timeout for the stream based on the specified duration and mode.
* Internally, it creates and returns a {@link TimeoutStreamMessage} with the specified parameters.
sjy982 marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>Example usage:
* <pre>{@code
* StreamMessage<String> stream = ...;
* StreamMessage<String> timeoutStream = stream.timeout(
* Duration.ofSeconds(10),
* StreamTimeoutMode.UNTIL_FIRST
* );
* }</pre>
*
* @param timeoutDuration the duration before a timeout occurs
* @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details)
* @return a new {@link TimeoutStreamMessage} with the specified timeout duration and mode applied
*/
@UnstableApi
default StreamMessage<T> timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
minwoox marked this conversation as resolved.
Show resolved Hide resolved
requireNonNull(timeoutDuration, "timeoutDuration");
requireNonNull(timeoutMode, "timeoutMode");
return new TimeoutStreamMessage<>(this, timeoutDuration, timeoutMode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2019 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.TimeoutException;

/**
* Stream Timeout Mode consists of three modes.
*
* <ul>
* <li>{@code UNTIL_FIRST} - Based on the first data chunk.
* If the first data chunk is not received within the specified time,
* a {@link TimeoutException} is thrown.</li>
* <li>{@code UNTIL_NEXT} - Based on each data chunk.
* If each data chunk is not received within the specified time after the previous chunk,
* a {@link TimeoutException} is thrown.</li>
* <li>{@code UNTIL_EOS} - Based on the entire stream.
* If all data chunks are not received within the specified time before the end of the stream,
* a {@link TimeoutException} is thrown.</li>
* </ul>
*/
public enum StreamTimeoutMode {
UNTIL_FIRST,
UNTIL_NEXT,
UNTIL_EOS
sjy982 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2019 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.stream;

import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import org.reactivestreams.Subscriber;

import com.linecorp.armeria.common.TimeoutException;
import com.linecorp.armeria.common.annotation.Nullable;

import io.netty.util.concurrent.EventExecutor;

/**
* This class provides timeout functionality to a base StreamMessage.
* If data is not received within the specified time, a {@link TimeoutException} is thrown.
*
* <p>The timeout functionality helps to release resources and throw appropriate exceptions
* if the stream becomes inactive or data is not received within a certain time frame,
* thereby improving system efficiency.
*
* @param <T> the type of the elements signaled
*/
final class TimeoutStreamMessage<T> implements StreamMessage<T> {

private final StreamMessage<? extends T> delegate;
private final Duration timeoutDuration;
private final StreamTimeoutMode timeoutMode;
@Nullable
private TimeoutSubscriber<T> timeoutSubscriber;
minwoox marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new TimeoutStreamMessage with the specified base stream message and timeout settings.
*
* @param delegate the original stream message
* @param timeoutDuration the duration before a timeout occurs
* @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details)
*/
TimeoutStreamMessage(StreamMessage<? extends T> delegate, Duration timeoutDuration,
StreamTimeoutMode timeoutMode) {
this.delegate = requireNonNull(delegate, "delegate");
this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration");
this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode");
}

/**
* Returns {@code true} if this stream is still open.
*
* @return {@code true} if the stream is open, otherwise {@code false}
*/
sjy982 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public boolean isOpen() {
return delegate.isOpen();
}

/**
* Returns {@code true} if this stream is closed and did not publish any elements.
*
* @return {@code true} if the stream is empty, otherwise {@code false}
*/
@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

/**
* Returns the current demand of this stream.
*
* @return the current demand
*/
@Override
public long demand() {
return delegate.demand();
}

/**
* Returns a {@link CompletableFuture} that completes when this stream is complete,
* either successfully or exceptionally.
*
* @return a future that completes when the stream is complete
*/
@Override
public CompletableFuture<Void> whenComplete() {
return delegate.whenComplete();
}

/**
* Creates a {@link TimeoutSubscriber} with timeout logic applied using the specified
* subscriber, executor, duration, and mode.
* Then subscribes to the original stream with the TimeoutSubscriber, executor, and options.
*
* @param subscriber the subscriber to this stream
* @param executor the executor for running timeout tasks and stream operations
* @param options subscription options
* @see StreamMessage#subscribe(Subscriber, EventExecutor, SubscriptionOption...)
*/
@Override
public void subscribe(Subscriber<? super T> subscriber, EventExecutor executor,
SubscriptionOption... options) {
timeoutSubscriber = new TimeoutSubscriber<>(subscriber, executor, timeoutDuration, timeoutMode);
delegate.subscribe(timeoutSubscriber, executor, options);
}

/**
* Cancels the timeout schedule if it is set.
*/
private void cancelSchedule() {
if (timeoutSubscriber != null) {
timeoutSubscriber.cancelSchedule();
}
}

/**
* Aborts the stream with an {@link AbortedStreamException} and prevents further subscriptions.
* Calling this method on a closed or aborted stream has no effect.
* Cancels the timeout schedule if it is set.
*/
@Override
public void abort() {
sjy982 marked this conversation as resolved.
Show resolved Hide resolved
cancelSchedule();
delegate.abort();
}

/**
* Aborts the stream with the specified {@link Throwable} and prevents further subscriptions.
* Calling this method on a closed or aborted stream has no effect.
* Cancels the timeout schedule if it is set.
*
* @param cause the cause of the abort
*/
@Override
public void abort(Throwable cause) {
cancelSchedule();
delegate.abort(cause);
}
}
Loading
Loading