From 045afcc9831558cc2b935b2f16e0fa92903954eb Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Tue, 11 Jun 2024 20:13:46 +0900 Subject: [PATCH 01/26] Implement TimeoutStreamMessage for stream message timeout functionality --- .../com/linecorp/armeria/common/stream/StreamTimeoutMode.java | 2 ++ .../linecorp/armeria/common/stream/TimeoutStreamMessage.java | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java create mode 100644 core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java new file mode 100644 index 00000000000..907451280c3 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java @@ -0,0 +1,2 @@ +package com.linecorp.armeria.common.stream;public enum StreamTiemoutMode { +} diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java new file mode 100644 index 00000000000..964b27444dd --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -0,0 +1,2 @@ +package com.linecorp.armeria.common.stream;public class TimeoutStreamMessage { +} From 5818e6cb7970ec9a9bf6402c482d5de9ac064ef5 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Tue, 11 Jun 2024 21:33:46 +0900 Subject: [PATCH 02/26] feat: Implement TimeoutSubscriber for stream timeout handling --- .../common/stream/StreamTimeoutMode.java | 23 ++++- .../common/stream/TimeoutStreamMessage.java | 73 +++++++++++++- .../common/stream/TimeoutSubscriber.java | 98 +++++++++++++++++++ 3 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java index 907451280c3..6f12e68b404 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java @@ -1,2 +1,23 @@ -package com.linecorp.armeria.common.stream;public enum StreamTiemoutMode { +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.stream; + +public enum StreamTimeoutMode { + UNTIL_FIRST, + UNTIL_NEXT, + UNTIL_EOS } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 964b27444dd..199f6b9e17e 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -1,2 +1,73 @@ -package com.linecorp.armeria.common.stream;public class TimeoutStreamMessage { +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.stream; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import org.reactivestreams.Subscriber; + +import io.netty.util.concurrent.EventExecutor; + +public class TimeoutStreamMessage implements StreamMessage { + + private final StreamMessage delegate; + private final long timeoutMillis; + private final StreamTimeoutMode streamTimeoutMode; + + public TimeoutStreamMessage(StreamMessage delegate, Duration timeout, StreamTimeoutMode streamTimeoutMode) { + this.delegate = delegate; + this.timeoutMillis = timeout.toMillis(); + this.streamTimeoutMode = streamTimeoutMode; + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public long demand() { + return delegate.demand(); + } + + @Override + public CompletableFuture whenComplete() { + return delegate.whenComplete(); + } + + @Override + public void subscribe(Subscriber subscriber, EventExecutor executor, + SubscriptionOption... options) { + delegate.subscribe(new TimeoutSubscriber(subscriber, executor, timeoutMillis, streamTimeoutMode), executor, options); + } + + @Override + public void abort() { + delegate.abort(); + } + + @Override + public void abort(Throwable cause) { + delegate.abort(cause); + } } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java new file mode 100644 index 00000000000..ebde30b00cd --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.stream; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.ScheduledFuture; + +public class TimeoutSubscriber implements Subscriber { + + private final Subscriber delegate; + private final EventExecutor executor; + + private final StreamTimeoutMode streamTimeoutMode; + private ScheduledFuture timeoutFuture; + + private Subscription subscription; + private final long timeoutMillis; + + public TimeoutSubscriber(Subscriber delegate, EventExecutor executor, long timeoutMillis, StreamTimeoutMode streamTimeoutMode) { + this.delegate = requireNonNull(delegate, "delegate"); + this.executor = requireNonNull(executor, "executor"); + this.timeoutMillis = timeoutMillis; + this.streamTimeoutMode = requireNonNull(streamTimeoutMode, "streamTimeoutMode"); + } + + private ScheduledFuture scheduleTimeout() { + return executor.schedule(() -> { + subscription.cancel(); + delegate.onError(new TimeoutException( + String.format("Stream timed out after %d ms (timeout mode: %s)", timeoutMillis, streamTimeoutMode))); + }, timeoutMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + subscription = s; + timeoutFuture = scheduleTimeout(); + } + + @Override + public void onNext(T t) { + delegate.onNext(t); + if (timeoutFuture == null) { + return; + } + switch (streamTimeoutMode) { + case UNTIL_NEXT: + timeoutFuture.cancel(false); + timeoutFuture = scheduleTimeout(); + break; + case UNTIL_FIRST: + timeoutFuture.cancel(false); + timeoutFuture = null; + break; + case UNTIL_EOS: + break; + } + } + + @Override + public void onError(Throwable throwable) { + if(timeoutFuture != null) { + timeoutFuture.cancel(false); + } + delegate.onError(throwable); + } + + @Override + public void onComplete() { + if(timeoutFuture != null) { + timeoutFuture.cancel(false); + } + delegate.onComplete(); + } +} From 02cccc36008b069b075dbb3b64ed701ac7576298 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 12 Jun 2024 17:54:20 +0900 Subject: [PATCH 03/26] Add timeout method to StreamMessage --- .../linecorp/armeria/common/stream/StreamMessage.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java index 1eae26bc655..ca32ec1f2d7 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java @@ -28,6 +28,7 @@ import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -1203,4 +1204,12 @@ default StreamMessage subscribeOn(EventExecutor eventExecutor) { requireNonNull(eventExecutor, "eventExecutor"); return new SubscribeOnStreamMessage<>(this, eventExecutor); } + + default StreamMessage timeout(Duration timeout) { + return timeout(timeout, StreamTimeoutMode.UNTIL_NEXT); + } + + default StreamMessage timeout(Duration timeout, StreamTimeoutMode streamTimeoutMode) { + return new TimeoutStreamMessage<>(this, timeout, streamTimeoutMode); + } } From f47216ea687124a56fbc0b8f276c02ceaaf964ba Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 12 Jun 2024 18:23:39 +0900 Subject: [PATCH 04/26] refactor: Timeout, streamTimeoutMode variable names changed --- .../linecorp/armeria/common/HttpRequest.java | 15 +++++++++++++++ .../armeria/common/stream/StreamMessage.java | 11 +++++++---- .../common/stream/TimeoutStreamMessage.java | 16 +++++++++------- .../common/stream/TimeoutSubscriber.java | 17 +++++++++-------- 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java index 8967b6cc0c0..f4a64fcbf83 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java @@ -21,6 +21,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Formatter; import java.util.List; import java.util.Locale; @@ -47,6 +48,7 @@ import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage; import com.linecorp.armeria.common.stream.StreamMessage; +import com.linecorp.armeria.common.stream.StreamTimeoutMode; import com.linecorp.armeria.common.stream.SubscriptionOption; import com.linecorp.armeria.internal.common.DefaultHttpRequest; import com.linecorp.armeria.internal.common.DefaultSplitHttpRequest; @@ -816,4 +818,17 @@ default HttpRequest subscribeOn(EventExecutor eventExecutor) { requireNonNull(eventExecutor, "eventExecutor"); return of(headers(), HttpMessage.super.subscribeOn(eventExecutor)); } + +// @Override +// default HttpRequest timeout(Duration timeoutDuration) { +// requireNonNull(timeoutDuration, "timeoutDuration"); +// return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); +// } +// +// @Override +// default HttpRequest timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { +// requireNonNull(timeoutDuration, "timeoutDuration"); +// requireNonNull(timeoutMode, "timeoutMode"); +// return of(headers(), HttpMessage.super.timeout(timeoutDuration, timeoutMode)); +// } } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java index ca32ec1f2d7..79ee35c0bd7 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java @@ -1205,11 +1205,14 @@ default StreamMessage subscribeOn(EventExecutor eventExecutor) { return new SubscribeOnStreamMessage<>(this, eventExecutor); } - default StreamMessage timeout(Duration timeout) { - return timeout(timeout, StreamTimeoutMode.UNTIL_NEXT); + default StreamMessage timeout(Duration timeoutDuration) { + requireNonNull(timeoutDuration, "timeoutDuration"); + return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); } - default StreamMessage timeout(Duration timeout, StreamTimeoutMode streamTimeoutMode) { - return new TimeoutStreamMessage<>(this, timeout, streamTimeoutMode); + default StreamMessage timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { + requireNonNull(timeoutDuration, "timeoutDuration"); + requireNonNull(timeoutMode, "timeoutMode"); + return new TimeoutStreamMessage<>(this, timeoutDuration, timeoutMode); } } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 199f6b9e17e..04d31fba3a0 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.common.stream; +import static java.util.Objects.requireNonNull; + import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -26,13 +28,13 @@ public class TimeoutStreamMessage implements StreamMessage { private final StreamMessage delegate; - private final long timeoutMillis; - private final StreamTimeoutMode streamTimeoutMode; + private final Duration timeoutDuration; + private final StreamTimeoutMode timeoutMode; - public TimeoutStreamMessage(StreamMessage delegate, Duration timeout, StreamTimeoutMode streamTimeoutMode) { - this.delegate = delegate; - this.timeoutMillis = timeout.toMillis(); - this.streamTimeoutMode = streamTimeoutMode; + public TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { + this.delegate = requireNonNull(delegate, "delegate"); + this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); + this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); } @Override @@ -58,7 +60,7 @@ public CompletableFuture whenComplete() { @Override public void subscribe(Subscriber subscriber, EventExecutor executor, SubscriptionOption... options) { - delegate.subscribe(new TimeoutSubscriber(subscriber, executor, timeoutMillis, streamTimeoutMode), executor, options); + delegate.subscribe(new TimeoutSubscriber(subscriber, executor, timeoutDuration, timeoutMode), executor, options); } @Override diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index ebde30b00cd..d4548b04a2b 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -18,6 +18,7 @@ import static java.util.Objects.requireNonNull; +import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -32,25 +33,25 @@ public class TimeoutSubscriber implements Subscriber { private final Subscriber delegate; private final EventExecutor executor; - private final StreamTimeoutMode streamTimeoutMode; + private final StreamTimeoutMode timeoutMode; private ScheduledFuture timeoutFuture; private Subscription subscription; - private final long timeoutMillis; + private final Duration timeoutDuration; - public TimeoutSubscriber(Subscriber delegate, EventExecutor executor, long timeoutMillis, StreamTimeoutMode streamTimeoutMode) { + public TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { this.delegate = requireNonNull(delegate, "delegate"); this.executor = requireNonNull(executor, "executor"); - this.timeoutMillis = timeoutMillis; - this.streamTimeoutMode = requireNonNull(streamTimeoutMode, "streamTimeoutMode"); + this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); + this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); } private ScheduledFuture scheduleTimeout() { return executor.schedule(() -> { subscription.cancel(); delegate.onError(new TimeoutException( - String.format("Stream timed out after %d ms (timeout mode: %s)", timeoutMillis, streamTimeoutMode))); - }, timeoutMillis, TimeUnit.MILLISECONDS); + String.format("Stream timed out after %d ms (timeout mode: %s)", timeoutDuration.toMillis(), timeoutMode))); + }, timeoutDuration.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -66,7 +67,7 @@ public void onNext(T t) { if (timeoutFuture == null) { return; } - switch (streamTimeoutMode) { + switch (timeoutMode) { case UNTIL_NEXT: timeoutFuture.cancel(false); timeoutFuture = scheduleTimeout(); From 1d8a37f1b4b4a8dd75a9ed2f8c162d0613b0f968 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 12 Jun 2024 18:26:13 +0900 Subject: [PATCH 05/26] feat: Add timeout method to HttpRequest --- .../linecorp/armeria/common/HttpRequest.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java index f4a64fcbf83..42234b27ea2 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java @@ -819,16 +819,16 @@ default HttpRequest subscribeOn(EventExecutor eventExecutor) { return of(headers(), HttpMessage.super.subscribeOn(eventExecutor)); } -// @Override -// default HttpRequest timeout(Duration timeoutDuration) { -// requireNonNull(timeoutDuration, "timeoutDuration"); -// return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); -// } -// -// @Override -// default HttpRequest timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { -// requireNonNull(timeoutDuration, "timeoutDuration"); -// requireNonNull(timeoutMode, "timeoutMode"); -// return of(headers(), HttpMessage.super.timeout(timeoutDuration, timeoutMode)); -// } + @Override + default HttpRequest timeout(Duration timeoutDuration) { + requireNonNull(timeoutDuration, "timeoutDuration"); + return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); + } + + @Override + default HttpRequest timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { + requireNonNull(timeoutDuration, "timeoutDuration"); + requireNonNull(timeoutMode, "timeoutMode"); + return of(headers(), HttpMessage.super.timeout(timeoutDuration, timeoutMode)); + } } From 8cd5d4fc6700355716da46fbadc36541bd4f2b33 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 12 Jun 2024 22:11:04 +0900 Subject: [PATCH 06/26] feat: Add timeout method to HttpResponse --- .../com/linecorp/armeria/common/HttpResponse.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java b/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java index a92db4b2789..0eaa89372bd 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java @@ -52,6 +52,7 @@ import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage; import com.linecorp.armeria.common.stream.StreamMessage; +import com.linecorp.armeria.common.stream.StreamTimeoutMode; import com.linecorp.armeria.common.stream.SubscriptionOption; import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.internal.common.AbortedHttpResponse; @@ -1183,4 +1184,17 @@ default HttpResponse recover(Class causeClass, default HttpResponse subscribeOn(EventExecutor eventExecutor) { return of(HttpMessage.super.subscribeOn(eventExecutor)); } + + @Override + default HttpResponse timeout(Duration timeoutDuration) { + requireNonNull(timeoutDuration, "timeoutDuration"); + return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); + } + + @Override + default HttpResponse timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { + requireNonNull(timeoutDuration, "timeoutDuration"); + requireNonNull(timeoutMode, "timeoutMode"); + return of(HttpMessage.super.timeout(timeoutDuration, timeoutMode)); + } } From 926d53dcc9489478f9b5f78c3c612b929dd10ef5 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 19 Jun 2024 18:29:00 +0900 Subject: [PATCH 07/26] Refactor TimeoutSubscriber to use isCancelled() instead of null checks --- .../linecorp/armeria/common/stream/TimeoutSubscriber.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index d4548b04a2b..0518529a5d4 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -64,7 +64,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { delegate.onNext(t); - if (timeoutFuture == null) { + if (timeoutFuture.isCancelled()) { return; } switch (timeoutMode) { @@ -74,7 +74,6 @@ public void onNext(T t) { break; case UNTIL_FIRST: timeoutFuture.cancel(false); - timeoutFuture = null; break; case UNTIL_EOS: break; @@ -83,7 +82,7 @@ public void onNext(T t) { @Override public void onError(Throwable throwable) { - if(timeoutFuture != null) { + if(!timeoutFuture.isCancelled()) { timeoutFuture.cancel(false); } delegate.onError(throwable); @@ -91,7 +90,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { - if(timeoutFuture != null) { + if(!timeoutFuture.isCancelled()) { timeoutFuture.cancel(false); } delegate.onComplete(); From 4d0e10f9a55bcc3b4af1e5796b0fbac87adf4621 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Mon, 24 Jun 2024 21:20:03 +0900 Subject: [PATCH 08/26] Add TimeoutStreamMessageTest --- .../armeria/common/stream/TimeoutStreamMessageTest.java | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java new file mode 100644 index 00000000000..4c35a4f9003 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java @@ -0,0 +1,2 @@ +package com.linecorp.armeria.common.stream;public class TimeoutStreamMessageTest { +} From d1ea29749eb2eee52aa70e9549965149458fcb24 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Mon, 24 Jun 2024 22:04:16 +0900 Subject: [PATCH 09/26] refactor: Optimize timeout handling in TimeoutSubscriber --- .../common/stream/TimeoutSubscriber.java | 58 +++-- .../stream/TimeoutStreamMessageTest.java | 227 +++++++++++++++++- 2 files changed, 263 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index 0518529a5d4..0a092c57856 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -29,48 +29,69 @@ import io.netty.util.concurrent.ScheduledFuture; public class TimeoutSubscriber implements Subscriber { - + private static final String TIMEOUT_MESSAGE = "Stream timed out after %d ms (timeout mode: %s)"; private final Subscriber delegate; private final EventExecutor executor; - private final StreamTimeoutMode timeoutMode; + private final Duration timeoutDuration; private ScheduledFuture timeoutFuture; - + private Runnable timeoutTask; private Subscription subscription; - private final Duration timeoutDuration; + private long timeoutNanos; + private long lastOnNextTimeNanos; public TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { this.delegate = requireNonNull(delegate, "delegate"); this.executor = requireNonNull(executor, "executor"); this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); + timeoutNanos = timeoutDuration.toNanos(); this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); + timeoutTask = createTimeoutTask(); } - private ScheduledFuture scheduleTimeout() { - return executor.schedule(() -> { + private Runnable createTimeoutTask() { + return () -> { + if(timeoutMode == StreamTimeoutMode.UNTIL_NEXT) { + long currentTimeNanos = System.nanoTime(); + long elapsedNanos = currentTimeNanos - lastOnNextTimeNanos; + + if(elapsedNanos <= timeoutNanos) { + long delayNanos = timeoutNanos - (currentTimeNanos - lastOnNextTimeNanos); + timeoutFuture = createTimeoutSchedule(delayNanos, TimeUnit.NANOSECONDS); + return; + } + } subscription.cancel(); - delegate.onError(new TimeoutException( - String.format("Stream timed out after %d ms (timeout mode: %s)", timeoutDuration.toMillis(), timeoutMode))); - }, timeoutDuration.toMillis(), TimeUnit.MILLISECONDS); + delegate.onError(new TimeoutException(String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); + }; + } + + private ScheduledFuture createTimeoutSchedule(long delay, TimeUnit unit) { + return executor.schedule(timeoutTask, delay, unit); + } + + private void cancelSchedule() { + if(!timeoutFuture.isCancelled()) { + timeoutFuture.cancel(false); + } } @Override public void onSubscribe(Subscription s) { - delegate.onSubscribe(s); subscription = s; - timeoutFuture = scheduleTimeout(); + lastOnNextTimeNanos = System.nanoTime(); + timeoutFuture = createTimeoutSchedule(timeoutNanos, TimeUnit.NANOSECONDS); + delegate.onSubscribe(s); } @Override public void onNext(T t) { - delegate.onNext(t); if (timeoutFuture.isCancelled()) { return; } switch (timeoutMode) { case UNTIL_NEXT: - timeoutFuture.cancel(false); - timeoutFuture = scheduleTimeout(); + lastOnNextTimeNanos = System.nanoTime(); break; case UNTIL_FIRST: timeoutFuture.cancel(false); @@ -78,21 +99,18 @@ public void onNext(T t) { case UNTIL_EOS: break; } + delegate.onNext(t); } @Override public void onError(Throwable throwable) { - if(!timeoutFuture.isCancelled()) { - timeoutFuture.cancel(false); - } + cancelSchedule(); delegate.onError(throwable); } @Override public void onComplete() { - if(!timeoutFuture.isCancelled()) { - timeoutFuture.cancel(false); - } + cancelSchedule(); delegate.onComplete(); } } diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java index 4c35a4f9003..8a9ad76a18e 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java @@ -1,2 +1,225 @@ -package com.linecorp.armeria.common.stream;public class TimeoutStreamMessageTest { -} +package com.linecorp.armeria.common.stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.netty.util.concurrent.DefaultEventExecutor; +import io.netty.util.concurrent.EventExecutor; + +public class TimeoutStreamMessageTest { + private EventExecutor executor; + + @BeforeEach + public void setUp() { + executor = new DefaultEventExecutor(); + } + + @AfterEach + public void tearDown() { + executor.shutdownGracefully(); + } + + + @Test + public void timeoutNextMode() { + StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( + Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_NEXT); + CompletableFuture future = new CompletableFuture<>(); + + timeoutStreamMessage.subscribe(new Subscriber() { + private Subscription subscription; + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(String s) { + executor.schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + future.complete(null); + } + }, executor); + + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(TimeoutException.class); + } + + @Test + public void noTimeoutNextMode() throws Exception { + StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_NEXT); + + CompletableFuture future = new CompletableFuture<>(); + + timeoutStreamMessage.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(2); + } + + @Override + public void onNext(String s) { + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + future.complete(null); + } + }, executor); + + assertThat(future.get()).isNull(); + } + + @Test + public void timeoutFirstMode() { + StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_FIRST); + CompletableFuture future = new CompletableFuture<>(); + + timeoutStreamMessage.subscribe(new Subscriber() { + private Subscription subscription; + @Override + public void onSubscribe(Subscription s) { + subscription = s; + executor.schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); + } + + @Override + public void onNext(String s) { + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + future.complete(null); + } + }, executor); + + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(TimeoutException.class); + } + + @Test + public void noTimeoutModeFirst() throws Exception { + StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_FIRST); + CompletableFuture future = new CompletableFuture<>(); + + timeoutStreamMessage.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(2); + } + + @Override + public void onNext(String s) { + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + future.complete(null); + } + }, executor); + + assertThat(future.get()).isNull(); + } + + @Test + public void timeoutEOSMode() { + StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(2), StreamTimeoutMode.UNTIL_EOS); + CompletableFuture future = new CompletableFuture<>(); + + timeoutStreamMessage.subscribe(new Subscriber() { + private Subscription subscription; + @Override + public void onSubscribe(Subscription s) { + subscription = s; + executor.schedule(() -> subscription.request(1), 1, TimeUnit.SECONDS); + } + + @Override + public void onNext(String s) { + executor.schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + future.complete(null); + } + }, executor); + + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(TimeoutException.class); + } + + @Test + public void noTimeoutEOSMode() throws Exception { + StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(2), StreamTimeoutMode.UNTIL_EOS); + CompletableFuture future = new CompletableFuture<>(); + + timeoutStreamMessage.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(2); + } + + @Override + public void onNext(String s) { + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + future.complete(null); + } + }, executor); + + assertThat(future.get()).isNull(); + } +} \ No newline at end of file From 318ebf366d57762632de50fbdee7042cc48743f5 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 26 Jun 2024 02:49:55 +0900 Subject: [PATCH 10/26] refactor: deduplication, Runnable implementation, change boundary conditions --- .../common/stream/TimeoutStreamMessage.java | 8 +-- .../common/stream/TimeoutSubscriber.java | 49 +++++++++---------- .../stream/TimeoutStreamMessageTest.java | 45 ++++++++++------- 3 files changed, 55 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 04d31fba3a0..41e1446c9bb 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -31,7 +31,8 @@ public class TimeoutStreamMessage implements StreamMessage { private final Duration timeoutDuration; private final StreamTimeoutMode timeoutMode; - public TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { + public TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, + StreamTimeoutMode timeoutMode) { this.delegate = requireNonNull(delegate, "delegate"); this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); @@ -60,7 +61,8 @@ public CompletableFuture whenComplete() { @Override public void subscribe(Subscriber subscriber, EventExecutor executor, SubscriptionOption... options) { - delegate.subscribe(new TimeoutSubscriber(subscriber, executor, timeoutDuration, timeoutMode), executor, options); + delegate.subscribe(new TimeoutSubscriber(subscriber, executor, timeoutDuration, timeoutMode), + executor, options); } @Override @@ -72,4 +74,4 @@ public void abort() { public void abort(Throwable cause) { delegate.abort(cause); } -} +} \ No newline at end of file diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index 0a092c57856..c2b511bb9d1 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -28,54 +28,53 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ScheduledFuture; -public class TimeoutSubscriber implements Subscriber { +final class TimeoutSubscriber implements Runnable, Subscriber { private static final String TIMEOUT_MESSAGE = "Stream timed out after %d ms (timeout mode: %s)"; private final Subscriber delegate; private final EventExecutor executor; private final StreamTimeoutMode timeoutMode; private final Duration timeoutDuration; + private final long timeoutNanos; private ScheduledFuture timeoutFuture; - private Runnable timeoutTask; private Subscription subscription; - private long timeoutNanos; private long lastOnNextTimeNanos; - public TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { + TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, + StreamTimeoutMode timeoutMode) { this.delegate = requireNonNull(delegate, "delegate"); this.executor = requireNonNull(executor, "executor"); this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); timeoutNanos = timeoutDuration.toNanos(); this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); - timeoutTask = createTimeoutTask(); - } - - private Runnable createTimeoutTask() { - return () -> { - if(timeoutMode == StreamTimeoutMode.UNTIL_NEXT) { - long currentTimeNanos = System.nanoTime(); - long elapsedNanos = currentTimeNanos - lastOnNextTimeNanos; - - if(elapsedNanos <= timeoutNanos) { - long delayNanos = timeoutNanos - (currentTimeNanos - lastOnNextTimeNanos); - timeoutFuture = createTimeoutSchedule(delayNanos, TimeUnit.NANOSECONDS); - return; - } - } - subscription.cancel(); - delegate.onError(new TimeoutException(String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); - }; } private ScheduledFuture createTimeoutSchedule(long delay, TimeUnit unit) { - return executor.schedule(timeoutTask, delay, unit); + return executor.schedule(this, delay, unit); } private void cancelSchedule() { - if(!timeoutFuture.isCancelled()) { + if (!timeoutFuture.isCancelled()) { timeoutFuture.cancel(false); } } + @Override + public void run() { + if (timeoutMode == StreamTimeoutMode.UNTIL_NEXT) { + final long currentTimeNanos = System.nanoTime(); + final long elapsedNanos = currentTimeNanos - lastOnNextTimeNanos; + + if (elapsedNanos < timeoutNanos) { + final long delayNanos = timeoutNanos - elapsedNanos; + timeoutFuture = createTimeoutSchedule(delayNanos, TimeUnit.NANOSECONDS); + return; + } + } + subscription.cancel(); + delegate.onError(new TimeoutException( + String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); + } + @Override public void onSubscribe(Subscription s) { subscription = s; @@ -113,4 +112,4 @@ public void onComplete() { cancelSchedule(); delegate.onComplete(); } -} +} \ No newline at end of file diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java index 8a9ad76a18e..7817f31a559 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java @@ -18,7 +18,7 @@ import io.netty.util.concurrent.DefaultEventExecutor; import io.netty.util.concurrent.EventExecutor; -public class TimeoutStreamMessageTest { +class TimeoutStreamMessageTest { private EventExecutor executor; @BeforeEach @@ -31,15 +31,15 @@ public void tearDown() { executor.shutdownGracefully(); } - @Test public void timeoutNextMode() { - StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( + final StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_NEXT); - CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); timeoutStreamMessage.subscribe(new Subscriber() { private Subscription subscription; + @Override public void onSubscribe(Subscription s) { subscription = s; @@ -68,10 +68,11 @@ public void onComplete() { } @Test - public void noTimeoutNextMode() throws Exception { - StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_NEXT); + void noTimeoutNextMode() throws Exception { + final StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( + Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_NEXT); - CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); timeoutStreamMessage.subscribe(new Subscriber() { @Override @@ -98,12 +99,14 @@ public void onComplete() { } @Test - public void timeoutFirstMode() { - StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_FIRST); - CompletableFuture future = new CompletableFuture<>(); + void timeoutFirstMode() { + final StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( + Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_FIRST); + final CompletableFuture future = new CompletableFuture<>(); timeoutStreamMessage.subscribe(new Subscriber() { private Subscription subscription; + @Override public void onSubscribe(Subscription s) { subscription = s; @@ -132,9 +135,10 @@ public void onComplete() { } @Test - public void noTimeoutModeFirst() throws Exception { - StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_FIRST); - CompletableFuture future = new CompletableFuture<>(); + void noTimeoutModeFirst() throws Exception { + final StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( + Duration.ofSeconds(1), StreamTimeoutMode.UNTIL_FIRST); + final CompletableFuture future = new CompletableFuture<>(); timeoutStreamMessage.subscribe(new Subscriber() { @Override @@ -161,12 +165,14 @@ public void onComplete() { } @Test - public void timeoutEOSMode() { - StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(2), StreamTimeoutMode.UNTIL_EOS); - CompletableFuture future = new CompletableFuture<>(); + void timeoutEOSMode() { + final StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( + Duration.ofSeconds(2), StreamTimeoutMode.UNTIL_EOS); + final CompletableFuture future = new CompletableFuture<>(); timeoutStreamMessage.subscribe(new Subscriber() { private Subscription subscription; + @Override public void onSubscribe(Subscription s) { subscription = s; @@ -195,9 +201,10 @@ public void onComplete() { } @Test - public void noTimeoutEOSMode() throws Exception { - StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout(Duration.ofSeconds(2), StreamTimeoutMode.UNTIL_EOS); - CompletableFuture future = new CompletableFuture<>(); + void noTimeoutEOSMode() throws Exception { + final StreamMessage timeoutStreamMessage = StreamMessage.of("message1", "message2").timeout( + Duration.ofSeconds(2), StreamTimeoutMode.UNTIL_EOS); + final CompletableFuture future = new CompletableFuture<>(); timeoutStreamMessage.subscribe(new Subscriber() { @Override From b095d3346bc6c1f0f19ae55001f6002b831d3d14 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Fri, 28 Jun 2024 01:14:25 +0900 Subject: [PATCH 11/26] Add timeout unscheduling logic to abort(). Unsubscribing due to a timeout does not guarantee synchronization with upstream. Therefore, we implemented an attempt attemptTerminate() method. --- .../common/stream/TimeoutStreamMessage.java | 13 +++++++++-- .../common/stream/TimeoutSubscriber.java | 22 +++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 41e1446c9bb..f699b229d20 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -30,6 +30,7 @@ public class TimeoutStreamMessage implements StreamMessage { private final StreamMessage delegate; private final Duration timeoutDuration; private final StreamTimeoutMode timeoutMode; + private TimeoutSubscriber timeoutSubscriber; public TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { @@ -61,17 +62,25 @@ public CompletableFuture whenComplete() { @Override public void subscribe(Subscriber subscriber, EventExecutor executor, SubscriptionOption... options) { - delegate.subscribe(new TimeoutSubscriber(subscriber, executor, timeoutDuration, timeoutMode), - executor, options); + timeoutSubscriber = new TimeoutSubscriber(subscriber, executor, timeoutDuration, timeoutMode); + delegate.subscribe(timeoutSubscriber, executor, options); + } + + private void cancelSchedule() { + if (timeoutSubscriber != null) { + timeoutSubscriber.cancelSchedule(); + } } @Override public void abort() { + cancelSchedule(); delegate.abort(); } @Override public void abort(Throwable cause) { + cancelSchedule(); delegate.abort(cause); } } \ No newline at end of file diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index c2b511bb9d1..5b0095f7809 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -38,6 +38,7 @@ final class TimeoutSubscriber implements Runnable, Subscriber { private ScheduledFuture timeoutFuture; private Subscription subscription; private long lastOnNextTimeNanos; + private boolean isTerminated; TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { @@ -52,7 +53,15 @@ private ScheduledFuture createTimeoutSchedule(long delay, TimeUnit unit) { return executor.schedule(this, delay, unit); } - private void cancelSchedule() { + private boolean attemptTerminate() { + if (isTerminated) { + return false; + } + isTerminated = true; + return true; + } + + public void cancelSchedule() { if (!timeoutFuture.isCancelled()) { timeoutFuture.cancel(false); } @@ -70,6 +79,9 @@ public void run() { return; } } + if (!attemptTerminate()) { + return; + } subscription.cancel(); delegate.onError(new TimeoutException( String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); @@ -85,7 +97,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (timeoutFuture.isCancelled()) { + if (isTerminated || timeoutFuture.isCancelled()) { return; } switch (timeoutMode) { @@ -103,12 +115,18 @@ public void onNext(T t) { @Override public void onError(Throwable throwable) { + if(!attemptTerminate()) { + return; + } cancelSchedule(); delegate.onError(throwable); } @Override public void onComplete() { + if(!attemptTerminate()) { + return; + } cancelSchedule(); delegate.onComplete(); } From 621e1415a6d8d613482e6808a1bf738f92cd83ff Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Tue, 2 Jul 2024 00:06:00 +0900 Subject: [PATCH 12/26] Create Java Docs, Lint error correction --- .../armeria/common/stream/StreamMessage.java | 32 +++++++++ .../common/stream/StreamTimeoutMode.java | 17 +++++ .../common/stream/TimeoutStreamMessage.java | 67 ++++++++++++++++++- .../common/stream/TimeoutSubscriber.java | 9 +-- .../stream/TimeoutStreamMessageTest.java | 21 +++++- 5 files changed, 139 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java index 79ee35c0bd7..6daa684e9b5 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java @@ -1205,11 +1205,43 @@ default StreamMessage subscribeOn(EventExecutor eventExecutor) { return new SubscribeOnStreamMessage<>(this, eventExecutor); } + /** + * Configures a timeout for the stream based on the specified duration. + * The default timeout mode is {@link StreamTimeoutMode#UNTIL_NEXT}. + * + *

Example usage: + *

{@code
+     * StreamMessage stream = ...;
+     * StreamMessage timeoutStream = stream.timeout(Duration.ofSeconds(10));
+     * }
+ * + * @param timeoutDuration the duration before a timeout occurs + * @return a new {@link TimeoutStreamMessage} with the specified timeout duration and default mode + * @throws NullPointerException if {@code timeoutDuration} is null + */ default StreamMessage timeout(Duration timeoutDuration) { requireNonNull(timeoutDuration, "timeoutDuration"); return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); } + /** + * Configures a timeout for the stream based on the specified duration and mode. + * Internally, it creates and returns a {@link TimeoutStreamMessage} with the specified parameters. + * + *

Example usage: + *

{@code
+     * StreamMessage stream = ...;
+     * StreamMessage timeoutStream = stream.timeout(
+     *     Duration.ofSeconds(10),
+     *     StreamTimeoutMode.UNTIL_FIRST
+     * );
+     * }
+ * + * @param timeoutDuration the duration before a timeout occurs + * @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details) + * @return a new {@link TimeoutStreamMessage} with the specified timeout duration and mode applied + * @throws NullPointerException if {@code timeoutDuration} or {@code timeoutMode} is null + */ default StreamMessage timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { requireNonNull(timeoutDuration, "timeoutDuration"); requireNonNull(timeoutMode, "timeoutMode"); diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java index 6f12e68b404..482db9dfa85 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java @@ -16,6 +16,23 @@ package com.linecorp.armeria.common.stream; +import com.linecorp.armeria.common.TimeoutException; + +/** + * Stream Timeout Mode consists of three modes. + * + *
    + *
  • {@code UNTIL_FIRST} - Based on the first data chunk. + * If the first data chunk is not received within the specified time, + * a {@link TimeoutException} is thrown.
  • + *
  • {@code UNTIL_NEXT} - Based on each data chunk. + * If each data chunk is not received within the specified time after the previous chunk, + * a {@link TimeoutException} is thrown.
  • + *
  • {@code UNTIL_EOS} - Based on the entire stream. + * If all data chunks are not received within the specified time before the end of the stream, + * a {@link TimeoutException} is thrown.
  • + *
+ */ public enum StreamTimeoutMode { UNTIL_FIRST, UNTIL_NEXT, diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index f699b229d20..4c014ca688b 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -23,8 +23,20 @@ import org.reactivestreams.Subscriber; +import com.linecorp.armeria.common.TimeoutException; + import io.netty.util.concurrent.EventExecutor; +/** + * This class provides timeout functionality to a base StreamMessage. + * If data is not received within the specified time, a {@link TimeoutException} is thrown. + * + *

The timeout functionality helps to release resources and throw appropriate exceptions + * if the stream becomes inactive or data is not received within a certain time frame, + * thereby improving system efficiency. + * + * @param the type of the elements signaled + */ public class TimeoutStreamMessage implements StreamMessage { private final StreamMessage delegate; @@ -32,6 +44,13 @@ public class TimeoutStreamMessage implements StreamMessage { private final StreamTimeoutMode timeoutMode; private TimeoutSubscriber timeoutSubscriber; + /** + * Creates a new TimeoutStreamMessage with the specified base stream message and timeout settings. + * + * @param delegate the original stream message + * @param timeoutDuration the duration before a timeout occurs + * @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details) + */ public TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { this.delegate = requireNonNull(delegate, "delegate"); @@ -39,26 +58,57 @@ public TimeoutStreamMessage(StreamMessage delegate, Duration timeou this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); } + /** + * Returns {@code true} if this stream is still open. + * + * @return {@code true} if the stream is open, otherwise {@code false} + */ @Override public boolean isOpen() { return delegate.isOpen(); } + /** + * Returns {@code true} if this stream is closed and did not publish any elements. + * + * @return {@code true} if the stream is empty, otherwise {@code false} + */ @Override public boolean isEmpty() { return delegate.isEmpty(); } + /** + * Returns the current demand of this stream. + * + * @return the current demand + */ @Override public long demand() { return delegate.demand(); } + /** + * Returns a {@link CompletableFuture} that completes when this stream is complete, + * either successfully or exceptionally. + * + * @return a future that completes when the stream is complete + */ @Override public CompletableFuture whenComplete() { return delegate.whenComplete(); } + /** + * Creates a {@link TimeoutSubscriber} with timeout logic applied using the specified + * subscriber, executor, duration, and mode. + * Then subscribes to the original stream with the TimeoutSubscriber, executor, and options. + * + * @param subscriber the subscriber to this stream + * @param executor the executor for running timeout tasks and stream operations + * @param options subscription options + * @see StreamMessage#subscribe(Subscriber, EventExecutor, SubscriptionOption...) + */ @Override public void subscribe(Subscriber subscriber, EventExecutor executor, SubscriptionOption... options) { @@ -66,21 +116,36 @@ public void subscribe(Subscriber subscriber, EventExecutor executor, delegate.subscribe(timeoutSubscriber, executor, options); } + /** + * Cancels the timeout schedule if it is set. + */ private void cancelSchedule() { if (timeoutSubscriber != null) { timeoutSubscriber.cancelSchedule(); } } + /** + * Aborts the stream with an {@link AbortedStreamException} and prevents further subscriptions. + * Calling this method on a closed or aborted stream has no effect. + * Cancels the timeout schedule if it is set. + */ @Override public void abort() { cancelSchedule(); delegate.abort(); } + /** + * Aborts the stream with the specified {@link Throwable} and prevents further subscriptions. + * Calling this method on a closed or aborted stream has no effect. + * Cancels the timeout schedule if it is set. + * + * @param cause the cause of the abort + */ @Override public void abort(Throwable cause) { cancelSchedule(); delegate.abort(cause); } -} \ No newline at end of file +} diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index 5b0095f7809..4fc70c5f09e 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -20,11 +20,12 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import com.linecorp.armeria.common.TimeoutException; + import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ScheduledFuture; @@ -115,7 +116,7 @@ public void onNext(T t) { @Override public void onError(Throwable throwable) { - if(!attemptTerminate()) { + if (!attemptTerminate()) { return; } cancelSchedule(); @@ -124,10 +125,10 @@ public void onError(Throwable throwable) { @Override public void onComplete() { - if(!attemptTerminate()) { + if (!attemptTerminate()) { return; } cancelSchedule(); delegate.onComplete(); } -} \ No newline at end of file +} diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java index 7817f31a559..793b8de3ccc 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.linecorp.armeria.common.stream; import static org.assertj.core.api.Assertions.assertThat; @@ -7,7 +23,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -15,6 +30,8 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import com.linecorp.armeria.common.TimeoutException; + import io.netty.util.concurrent.DefaultEventExecutor; import io.netty.util.concurrent.EventExecutor; @@ -229,4 +246,4 @@ public void onComplete() { assertThat(future.get()).isNull(); } -} \ No newline at end of file +} From 6c86d2f77ddde8e62bffbc136c90630bcf8eb5a2 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Tue, 2 Jul 2024 17:29:54 +0900 Subject: [PATCH 13/26] Applying reviews 1. remove @throws from docs 2. add @UnstableApi with timeout method 3. public -> final 4. add @Nullable to timeoutSubscriber 5. cancel() override (add cancelSchedule()) 6. lastontime nano -> lasteventtime nano 7. createTimeoutSchedule -> scheduleTimeout 8. call onError after canceling --- .../armeria/common/stream/StreamMessage.java | 4 +-- .../common/stream/TimeoutStreamMessage.java | 6 ++-- .../common/stream/TimeoutSubscriber.java | 29 +++++++++++++------ 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java index 6daa684e9b5..1e166b8bece 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java @@ -1217,8 +1217,8 @@ default StreamMessage subscribeOn(EventExecutor eventExecutor) { * * @param timeoutDuration the duration before a timeout occurs * @return a new {@link TimeoutStreamMessage} with the specified timeout duration and default mode - * @throws NullPointerException if {@code timeoutDuration} is null */ + @UnstableApi default StreamMessage timeout(Duration timeoutDuration) { requireNonNull(timeoutDuration, "timeoutDuration"); return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); @@ -1240,8 +1240,8 @@ default StreamMessage timeout(Duration timeoutDuration) { * @param timeoutDuration the duration before a timeout occurs * @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details) * @return a new {@link TimeoutStreamMessage} with the specified timeout duration and mode applied - * @throws NullPointerException if {@code timeoutDuration} or {@code timeoutMode} is null */ + @UnstableApi default StreamMessage timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { requireNonNull(timeoutDuration, "timeoutDuration"); requireNonNull(timeoutMode, "timeoutMode"); diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 4c014ca688b..0543b5d6f47 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -24,6 +24,7 @@ import org.reactivestreams.Subscriber; import com.linecorp.armeria.common.TimeoutException; +import com.linecorp.armeria.common.annotation.Nullable; import io.netty.util.concurrent.EventExecutor; @@ -37,11 +38,12 @@ * * @param the type of the elements signaled */ -public class TimeoutStreamMessage implements StreamMessage { +final class TimeoutStreamMessage implements StreamMessage { private final StreamMessage delegate; private final Duration timeoutDuration; private final StreamTimeoutMode timeoutMode; + @Nullable private TimeoutSubscriber timeoutSubscriber; /** @@ -51,7 +53,7 @@ public class TimeoutStreamMessage implements StreamMessage { * @param timeoutDuration the duration before a timeout occurs * @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details) */ - public TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, + TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { this.delegate = requireNonNull(delegate, "delegate"); this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index 4fc70c5f09e..759e5676385 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -29,7 +29,7 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ScheduledFuture; -final class TimeoutSubscriber implements Runnable, Subscriber { +final class TimeoutSubscriber implements Runnable, Subscriber, Subscription { private static final String TIMEOUT_MESSAGE = "Stream timed out after %d ms (timeout mode: %s)"; private final Subscriber delegate; private final EventExecutor executor; @@ -38,7 +38,7 @@ final class TimeoutSubscriber implements Runnable, Subscriber { private final long timeoutNanos; private ScheduledFuture timeoutFuture; private Subscription subscription; - private long lastOnNextTimeNanos; + private long lastEventTimeNanos; private boolean isTerminated; TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, @@ -50,7 +50,7 @@ final class TimeoutSubscriber implements Runnable, Subscriber { this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); } - private ScheduledFuture createTimeoutSchedule(long delay, TimeUnit unit) { + private ScheduledFuture scheduleTimeout(long delay, TimeUnit unit) { return executor.schedule(this, delay, unit); } @@ -72,27 +72,27 @@ public void cancelSchedule() { public void run() { if (timeoutMode == StreamTimeoutMode.UNTIL_NEXT) { final long currentTimeNanos = System.nanoTime(); - final long elapsedNanos = currentTimeNanos - lastOnNextTimeNanos; + final long elapsedNanos = currentTimeNanos - lastEventTimeNanos; if (elapsedNanos < timeoutNanos) { final long delayNanos = timeoutNanos - elapsedNanos; - timeoutFuture = createTimeoutSchedule(delayNanos, TimeUnit.NANOSECONDS); + timeoutFuture = scheduleTimeout(delayNanos, TimeUnit.NANOSECONDS); return; } } if (!attemptTerminate()) { return; } - subscription.cancel(); delegate.onError(new TimeoutException( String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); + subscription.cancel(); } @Override public void onSubscribe(Subscription s) { subscription = s; - lastOnNextTimeNanos = System.nanoTime(); - timeoutFuture = createTimeoutSchedule(timeoutNanos, TimeUnit.NANOSECONDS); + lastEventTimeNanos = System.nanoTime(); + timeoutFuture = scheduleTimeout(timeoutNanos, TimeUnit.NANOSECONDS); delegate.onSubscribe(s); } @@ -103,7 +103,7 @@ public void onNext(T t) { } switch (timeoutMode) { case UNTIL_NEXT: - lastOnNextTimeNanos = System.nanoTime(); + lastEventTimeNanos = System.nanoTime(); break; case UNTIL_FIRST: timeoutFuture.cancel(false); @@ -131,4 +131,15 @@ public void onComplete() { cancelSchedule(); delegate.onComplete(); } + + @Override + public void request(long l) { + subscription.request(l); + } + + @Override + public void cancel() { + cancelSchedule(); + subscription.cancel(); + } } From f8248a73b13c8ffebf7abe625f33a84cf04ce2d9 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 3 Jul 2024 22:10:21 +0900 Subject: [PATCH 14/26] 1. Delete aptTerminate Method -> Replace with completed 2. onNext -> Delete isCancelled() from condition case UNTIL_FIRST: Change timeoutfuture.cancel(false) to cancelSchedule() --- .../common/stream/TimeoutStreamMessage.java | 4 +-- .../common/stream/TimeoutSubscriber.java | 32 ++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 0543b5d6f47..251d5dd7920 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -54,7 +54,7 @@ final class TimeoutStreamMessage implements StreamMessage { * @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details) */ TimeoutStreamMessage(StreamMessage delegate, Duration timeoutDuration, - StreamTimeoutMode timeoutMode) { + StreamTimeoutMode timeoutMode) { this.delegate = requireNonNull(delegate, "delegate"); this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); @@ -114,7 +114,7 @@ public CompletableFuture whenComplete() { @Override public void subscribe(Subscriber subscriber, EventExecutor executor, SubscriptionOption... options) { - timeoutSubscriber = new TimeoutSubscriber(subscriber, executor, timeoutDuration, timeoutMode); + timeoutSubscriber = new TimeoutSubscriber<>(subscriber, executor, timeoutDuration, timeoutMode); delegate.subscribe(timeoutSubscriber, executor, options); } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index 759e5676385..deecfff6bd4 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -25,21 +25,25 @@ import org.reactivestreams.Subscription; import com.linecorp.armeria.common.TimeoutException; +import com.linecorp.armeria.common.annotation.Nullable; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ScheduledFuture; final class TimeoutSubscriber implements Runnable, Subscriber, Subscription { + private static final String TIMEOUT_MESSAGE = "Stream timed out after %d ms (timeout mode: %s)"; private final Subscriber delegate; private final EventExecutor executor; private final StreamTimeoutMode timeoutMode; private final Duration timeoutDuration; private final long timeoutNanos; + @Nullable private ScheduledFuture timeoutFuture; + @Nullable private Subscription subscription; private long lastEventTimeNanos; - private boolean isTerminated; + private boolean completed; TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { @@ -54,16 +58,8 @@ private ScheduledFuture scheduleTimeout(long delay, TimeUnit unit) { return executor.schedule(this, delay, unit); } - private boolean attemptTerminate() { - if (isTerminated) { - return false; - } - isTerminated = true; - return true; - } - - public void cancelSchedule() { - if (!timeoutFuture.isCancelled()) { + void cancelSchedule() { + if (timeoutFuture != null && !timeoutFuture.isCancelled()) { timeoutFuture.cancel(false); } } @@ -80,9 +76,7 @@ public void run() { return; } } - if (!attemptTerminate()) { - return; - } + completed = true; delegate.onError(new TimeoutException( String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); subscription.cancel(); @@ -98,7 +92,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (isTerminated || timeoutFuture.isCancelled()) { + if (completed) { return; } switch (timeoutMode) { @@ -106,7 +100,7 @@ public void onNext(T t) { lastEventTimeNanos = System.nanoTime(); break; case UNTIL_FIRST: - timeoutFuture.cancel(false); + cancelSchedule(); break; case UNTIL_EOS: break; @@ -116,18 +110,20 @@ public void onNext(T t) { @Override public void onError(Throwable throwable) { - if (!attemptTerminate()) { + if (completed) { return; } + completed = true; cancelSchedule(); delegate.onError(throwable); } @Override public void onComplete() { - if (!attemptTerminate()) { + if (completed) { return; } + completed = true; cancelSchedule(); delegate.onComplete(); } From b1d03c7b712e1aa8376c509a6ba3bf0ec695e8f9 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 3 Jul 2024 23:08:12 +0900 Subject: [PATCH 15/26] delegate.onSubscribe(s); -> delegate.onSubscribe(this); --- .../com/linecorp/armeria/common/stream/TimeoutSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index deecfff6bd4..32e8e7b728f 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -87,7 +87,7 @@ public void onSubscribe(Subscription s) { subscription = s; lastEventTimeNanos = System.nanoTime(); timeoutFuture = scheduleTimeout(timeoutNanos, TimeUnit.NANOSECONDS); - delegate.onSubscribe(s); + delegate.onSubscribe(this); } @Override From 45008d820de4dfbf8ee0e053ae7c868d589e08be Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Sun, 7 Jul 2024 20:08:24 +0900 Subject: [PATCH 16/26] 1. replace 2019 with 2024 2. add docs to StreamTimeoutMode parameter 3. use test code executor.get() --- .../common/stream/StreamTimeoutMode.java | 21 +++++++++- .../common/stream/TimeoutStreamMessage.java | 2 +- .../common/stream/TimeoutSubscriber.java | 2 +- .../stream/TimeoutStreamMessageTest.java | 40 +++++++------------ 4 files changed, 37 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java index 482db9dfa85..190b4f03657 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2024 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -34,7 +34,26 @@ * */ public enum StreamTimeoutMode { + + /** + * Based on the first data chunk. + * If the first data chunk is not received within the specified time, + * a {@link TimeoutException} is thrown. + */ UNTIL_FIRST, + + /** + * Based on each data chunk. + * If each data chunk is not received within the specified time after the previous chunk, + * a {@link TimeoutException} is thrown. + */ + UNTIL_NEXT, + + /** + * Based on the entire stream. + * If all data chunks are not received within the specified time before the end of the stream, + * a {@link TimeoutException} is thrown. + */ UNTIL_EOS } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 251d5dd7920..bcc83cd21a9 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2024 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java index 32e8e7b728f..97d0ea9e4bb 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2024 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java index 793b8de3ccc..4fc76e1b726 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2024 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -24,29 +24,19 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import com.linecorp.armeria.common.TimeoutException; +import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; -import io.netty.util.concurrent.DefaultEventExecutor; -import io.netty.util.concurrent.EventExecutor; class TimeoutStreamMessageTest { - private EventExecutor executor; - @BeforeEach - public void setUp() { - executor = new DefaultEventExecutor(); - } - - @AfterEach - public void tearDown() { - executor.shutdownGracefully(); - } + @RegisterExtension + static final EventLoopExtension executor = new EventLoopExtension(); @Test public void timeoutNextMode() { @@ -65,7 +55,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(String s) { - executor.schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); + executor.get().schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); } @Override @@ -77,7 +67,7 @@ public void onError(Throwable throwable) { public void onComplete() { future.complete(null); } - }, executor); + }, executor.get()); assertThatThrownBy(future::get) .isInstanceOf(ExecutionException.class) @@ -110,7 +100,7 @@ public void onError(Throwable throwable) { public void onComplete() { future.complete(null); } - }, executor); + }, executor.get()); assertThat(future.get()).isNull(); } @@ -127,7 +117,7 @@ void timeoutFirstMode() { @Override public void onSubscribe(Subscription s) { subscription = s; - executor.schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); + executor.get().schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); } @Override @@ -144,7 +134,7 @@ public void onError(Throwable throwable) { public void onComplete() { future.complete(null); } - }, executor); + }, executor.get()); assertThatThrownBy(future::get) .isInstanceOf(ExecutionException.class) @@ -176,7 +166,7 @@ public void onError(Throwable throwable) { public void onComplete() { future.complete(null); } - }, executor); + }, executor.get()); assertThat(future.get()).isNull(); } @@ -193,12 +183,12 @@ void timeoutEOSMode() { @Override public void onSubscribe(Subscription s) { subscription = s; - executor.schedule(() -> subscription.request(1), 1, TimeUnit.SECONDS); + executor.get().schedule(() -> subscription.request(1), 1, TimeUnit.SECONDS); } @Override public void onNext(String s) { - executor.schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); + executor.get().schedule(() -> subscription.request(1), 2, TimeUnit.SECONDS); } @Override @@ -210,7 +200,7 @@ public void onError(Throwable throwable) { public void onComplete() { future.complete(null); } - }, executor); + }, executor.get()); assertThatThrownBy(future::get) .isInstanceOf(ExecutionException.class) @@ -242,7 +232,7 @@ public void onError(Throwable throwable) { public void onComplete() { future.complete(null); } - }, executor); + }, executor.get()); assertThat(future.get()).isNull(); } From e4c39168e800632d9669c36566a3da4b81d24692 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Mon, 8 Jul 2024 19:44:28 +0900 Subject: [PATCH 17/26] 1. call delegate.onSubscribe() in TimeoutSubscriber's onSubscribe(), and set the timeout schedule 2. change TimeoutSubscriber class to TimeoutStreamMessage's inner class --- .../common/stream/TimeoutStreamMessage.java | 117 +++++++++++++++ .../common/stream/TimeoutSubscriber.java | 141 ------------------ 2 files changed, 117 insertions(+), 141 deletions(-) delete mode 100644 core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index bcc83cd21a9..c6ec333196a 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -20,13 +20,16 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import com.linecorp.armeria.common.TimeoutException; import com.linecorp.armeria.common.annotation.Nullable; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.ScheduledFuture; /** * This class provides timeout functionality to a base StreamMessage. @@ -150,4 +153,118 @@ public void abort(Throwable cause) { cancelSchedule(); delegate.abort(cause); } + + static final class TimeoutSubscriber implements Runnable, Subscriber, Subscription { + + private static final String TIMEOUT_MESSAGE = "Stream timed out after %d ms (timeout mode: %s)"; + private final Subscriber delegate; + private final EventExecutor executor; + private final StreamTimeoutMode timeoutMode; + private final Duration timeoutDuration; + private final long timeoutNanos; + @Nullable + private ScheduledFuture timeoutFuture; + @Nullable + private Subscription subscription; + private long lastEventTimeNanos; + private boolean completed; + + TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, + StreamTimeoutMode timeoutMode) { + this.delegate = requireNonNull(delegate, "delegate"); + this.executor = requireNonNull(executor, "executor"); + this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); + timeoutNanos = timeoutDuration.toNanos(); + this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); + } + + private ScheduledFuture scheduleTimeout(long delay, TimeUnit unit) { + return executor.schedule(this, delay, unit); + } + + void cancelSchedule() { + if (timeoutFuture != null && !timeoutFuture.isCancelled()) { + timeoutFuture.cancel(false); + } + } + + @Override + public void run() { + if (timeoutMode == StreamTimeoutMode.UNTIL_NEXT) { + final long currentTimeNanos = System.nanoTime(); + final long elapsedNanos = currentTimeNanos - lastEventTimeNanos; + + if (elapsedNanos < timeoutNanos) { + final long delayNanos = timeoutNanos - elapsedNanos; + timeoutFuture = scheduleTimeout(delayNanos, TimeUnit.NANOSECONDS); + return; + } + } + completed = true; + delegate.onError(new TimeoutException( + String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); + subscription.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + delegate.onSubscribe(this); + if(completed) { + return; + } + lastEventTimeNanos = System.nanoTime(); + timeoutFuture = scheduleTimeout(timeoutNanos, TimeUnit.NANOSECONDS); + } + + @Override + public void onNext(T t) { + if (completed) { + return; + } + switch (timeoutMode) { + case UNTIL_NEXT: + lastEventTimeNanos = System.nanoTime(); + break; + case UNTIL_FIRST: + cancelSchedule(); + break; + case UNTIL_EOS: + break; + } + delegate.onNext(t); + } + + @Override + public void onError(Throwable throwable) { + if (completed) { + return; + } + completed = true; + cancelSchedule(); + delegate.onError(throwable); + } + + @Override + public void onComplete() { + if (completed) { + return; + } + completed = true; + cancelSchedule(); + delegate.onComplete(); + } + + @Override + public void request(long l) { + subscription.request(l); + } + + @Override + public void cancel() { + completed = true; + cancelSchedule(); + subscription.cancel(); + } + } } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java deleted file mode 100644 index 97d0ea9e4bb..00000000000 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2024 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.common.stream; - -import static java.util.Objects.requireNonNull; - -import java.time.Duration; -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import com.linecorp.armeria.common.TimeoutException; -import com.linecorp.armeria.common.annotation.Nullable; - -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.ScheduledFuture; - -final class TimeoutSubscriber implements Runnable, Subscriber, Subscription { - - private static final String TIMEOUT_MESSAGE = "Stream timed out after %d ms (timeout mode: %s)"; - private final Subscriber delegate; - private final EventExecutor executor; - private final StreamTimeoutMode timeoutMode; - private final Duration timeoutDuration; - private final long timeoutNanos; - @Nullable - private ScheduledFuture timeoutFuture; - @Nullable - private Subscription subscription; - private long lastEventTimeNanos; - private boolean completed; - - TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, - StreamTimeoutMode timeoutMode) { - this.delegate = requireNonNull(delegate, "delegate"); - this.executor = requireNonNull(executor, "executor"); - this.timeoutDuration = requireNonNull(timeoutDuration, "timeoutDuration"); - timeoutNanos = timeoutDuration.toNanos(); - this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); - } - - private ScheduledFuture scheduleTimeout(long delay, TimeUnit unit) { - return executor.schedule(this, delay, unit); - } - - void cancelSchedule() { - if (timeoutFuture != null && !timeoutFuture.isCancelled()) { - timeoutFuture.cancel(false); - } - } - - @Override - public void run() { - if (timeoutMode == StreamTimeoutMode.UNTIL_NEXT) { - final long currentTimeNanos = System.nanoTime(); - final long elapsedNanos = currentTimeNanos - lastEventTimeNanos; - - if (elapsedNanos < timeoutNanos) { - final long delayNanos = timeoutNanos - elapsedNanos; - timeoutFuture = scheduleTimeout(delayNanos, TimeUnit.NANOSECONDS); - return; - } - } - completed = true; - delegate.onError(new TimeoutException( - String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); - subscription.cancel(); - } - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - lastEventTimeNanos = System.nanoTime(); - timeoutFuture = scheduleTimeout(timeoutNanos, TimeUnit.NANOSECONDS); - delegate.onSubscribe(this); - } - - @Override - public void onNext(T t) { - if (completed) { - return; - } - switch (timeoutMode) { - case UNTIL_NEXT: - lastEventTimeNanos = System.nanoTime(); - break; - case UNTIL_FIRST: - cancelSchedule(); - break; - case UNTIL_EOS: - break; - } - delegate.onNext(t); - } - - @Override - public void onError(Throwable throwable) { - if (completed) { - return; - } - completed = true; - cancelSchedule(); - delegate.onError(throwable); - } - - @Override - public void onComplete() { - if (completed) { - return; - } - completed = true; - cancelSchedule(); - delegate.onComplete(); - } - - @Override - public void request(long l) { - subscription.request(l); - } - - @Override - public void cancel() { - cancelSchedule(); - subscription.cancel(); - } -} From 13228b3d5cb5843dcef6f71bec4841e8d3869462 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Mon, 8 Jul 2024 20:19:19 +0900 Subject: [PATCH 18/26] lint --- .../linecorp/armeria/common/stream/TimeoutStreamMessage.java | 2 +- .../armeria/common/stream/TimeoutStreamMessageTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index c6ec333196a..8d153873eed 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -210,7 +210,7 @@ public void run() { public void onSubscribe(Subscription s) { subscription = s; delegate.onSubscribe(this); - if(completed) { + if (completed) { return; } lastEventTimeNanos = System.nanoTime(); diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java index 4fc76e1b726..28a2cc8b425 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java @@ -32,7 +32,6 @@ import com.linecorp.armeria.common.TimeoutException; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; - class TimeoutStreamMessageTest { @RegisterExtension From 73adfffa5d1eb81db365ae855dd501498a9bb64a Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Mon, 8 Jul 2024 22:19:33 +0900 Subject: [PATCH 19/26] Remove blank lines --- .../com/linecorp/armeria/common/stream/StreamTimeoutMode.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java index 190b4f03657..949324ca3d4 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java @@ -47,7 +47,6 @@ public enum StreamTimeoutMode { * If each data chunk is not received within the specified time after the previous chunk, * a {@link TimeoutException} is thrown. */ - UNTIL_NEXT, /** From 38b6eb9c6a6391b7a97d1be79dd6a7bd29f8bc30 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 10 Jul 2024 23:48:06 +0900 Subject: [PATCH 20/26] 1. Add `@UnstableApi` to `HttpResponse`, `HttpRequest`, and `StreamTimeoutMode` 2. Added leak prevention code when completed in onNext() - PooledObjects.close(t) --- .../src/main/java/com/linecorp/armeria/common/HttpRequest.java | 2 ++ .../main/java/com/linecorp/armeria/common/HttpResponse.java | 2 ++ .../com/linecorp/armeria/common/stream/StreamTimeoutMode.java | 2 ++ .../linecorp/armeria/common/stream/TimeoutStreamMessage.java | 3 ++- 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java index 42234b27ea2..58efe7f4ef0 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java @@ -819,12 +819,14 @@ default HttpRequest subscribeOn(EventExecutor eventExecutor) { return of(headers(), HttpMessage.super.subscribeOn(eventExecutor)); } + @UnstableApi @Override default HttpRequest timeout(Duration timeoutDuration) { requireNonNull(timeoutDuration, "timeoutDuration"); return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); } + @UnstableApi @Override default HttpRequest timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { requireNonNull(timeoutDuration, "timeoutDuration"); diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java b/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java index 0eaa89372bd..ab86d0c60eb 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java @@ -1185,12 +1185,14 @@ default HttpResponse subscribeOn(EventExecutor eventExecutor) { return of(HttpMessage.super.subscribeOn(eventExecutor)); } + @UnstableApi @Override default HttpResponse timeout(Duration timeoutDuration) { requireNonNull(timeoutDuration, "timeoutDuration"); return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); } + @UnstableApi @Override default HttpResponse timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { requireNonNull(timeoutDuration, "timeoutDuration"); diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java index 949324ca3d4..a81b8f5accb 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java @@ -17,6 +17,7 @@ package com.linecorp.armeria.common.stream; import com.linecorp.armeria.common.TimeoutException; +import com.linecorp.armeria.common.annotation.UnstableApi; /** * Stream Timeout Mode consists of three modes. @@ -33,6 +34,7 @@ * a {@link TimeoutException} is thrown. * */ +@UnstableApi public enum StreamTimeoutMode { /** diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 8d153873eed..7cce818749e 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -27,6 +27,7 @@ import com.linecorp.armeria.common.TimeoutException; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.unsafe.PooledObjects; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ScheduledFuture; @@ -220,6 +221,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { if (completed) { + PooledObjects.close(t); return; } switch (timeoutMode) { @@ -262,7 +264,6 @@ public void request(long l) { @Override public void cancel() { - completed = true; cancelSchedule(); subscription.cancel(); } From 566d725123903da4f66a734dce1e76f6e1a4d8c7 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Thu, 11 Jul 2024 16:27:12 +0900 Subject: [PATCH 21/26] 1. Added `boolean canceled` to prevent unnecessary scheduling. --- .../armeria/common/stream/TimeoutStreamMessage.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 7cce818749e..016c478c446 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -169,6 +169,7 @@ static final class TimeoutSubscriber implements Runnable, Subscriber, Subs private Subscription subscription; private long lastEventTimeNanos; private boolean completed; + private boolean canceled; TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { @@ -211,7 +212,7 @@ public void run() { public void onSubscribe(Subscription s) { subscription = s; delegate.onSubscribe(this); - if (completed) { + if (completed || canceled) { return; } lastEventTimeNanos = System.nanoTime(); @@ -220,7 +221,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (completed) { + if (completed || canceled) { PooledObjects.close(t); return; } @@ -264,6 +265,7 @@ public void request(long l) { @Override public void cancel() { + canceled = true; cancelSchedule(); subscription.cancel(); } From eb8d450db684aa989a642fb8651d3c90c457ac32 Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 17 Jul 2024 19:54:03 +0900 Subject: [PATCH 22/26] 1. Modify JavaDoc 2. Add a StreamTimeoutException 3. Overriding WebSocket's timeout method 4. Remove cancelSchedule() from abort() 5. Set timeoutFuture to null to avoid calling .isCancelled() which may access a volatile field. 6. Modify boolean cancels to volatile boolean cancels --- .../linecorp/armeria/common/HttpRequest.java | 3 - .../linecorp/armeria/common/HttpResponse.java | 3 - .../common/StreamTimeoutException.java | 32 +++++++++ .../armeria/common/stream/StreamMessage.java | 6 +- .../common/stream/TimeoutStreamMessage.java | 71 ++++--------------- .../armeria/common/websocket/WebSocket.java | 17 +++++ .../stream/TimeoutStreamMessageTest.java | 8 +-- 7 files changed, 67 insertions(+), 73 deletions(-) create mode 100644 core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java index 58efe7f4ef0..773e2927224 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpRequest.java @@ -822,15 +822,12 @@ default HttpRequest subscribeOn(EventExecutor eventExecutor) { @UnstableApi @Override default HttpRequest timeout(Duration timeoutDuration) { - requireNonNull(timeoutDuration, "timeoutDuration"); return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); } @UnstableApi @Override default HttpRequest timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { - requireNonNull(timeoutDuration, "timeoutDuration"); - requireNonNull(timeoutMode, "timeoutMode"); return of(headers(), HttpMessage.super.timeout(timeoutDuration, timeoutMode)); } } diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java b/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java index ab86d0c60eb..2c944f824ca 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpResponse.java @@ -1188,15 +1188,12 @@ default HttpResponse subscribeOn(EventExecutor eventExecutor) { @UnstableApi @Override default HttpResponse timeout(Duration timeoutDuration) { - requireNonNull(timeoutDuration, "timeoutDuration"); return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); } @UnstableApi @Override default HttpResponse timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { - requireNonNull(timeoutDuration, "timeoutDuration"); - requireNonNull(timeoutMode, "timeoutMode"); return of(HttpMessage.super.timeout(timeoutDuration, timeoutMode)); } } diff --git a/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java new file mode 100644 index 00000000000..b01382dbbef --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common; + +import com.linecorp.armeria.common.annotation.Nullable; + +/** + * A {@link TimeoutException} raised when a stream operation exceeds the configured timeout. + */ +public final class StreamTimeoutException extends TimeoutException { + + private static final long serialVersionUID = 7585558758307122722L; + + /** + * Creates a new instance with the specified {@code message}. + */ + public StreamTimeoutException(@Nullable String message) {super(message);} +} diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java index 1e166b8bece..47069dd982a 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java @@ -1216,17 +1216,15 @@ default StreamMessage subscribeOn(EventExecutor eventExecutor) { * } * * @param timeoutDuration the duration before a timeout occurs - * @return a new {@link TimeoutStreamMessage} with the specified timeout duration and default mode + * @return a new {@link StreamMessage} with the specified timeout duration and default mode */ @UnstableApi default StreamMessage timeout(Duration timeoutDuration) { - requireNonNull(timeoutDuration, "timeoutDuration"); return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); } /** * Configures a timeout for the stream based on the specified duration and mode. - * Internally, it creates and returns a {@link TimeoutStreamMessage} with the specified parameters. * *

Example usage: *

{@code
@@ -1239,7 +1237,7 @@ default StreamMessage timeout(Duration timeoutDuration) {
      *
      * @param timeoutDuration the duration before a timeout occurs
      * @param timeoutMode the mode in which the timeout is applied (see {@link StreamTimeoutMode} for details)
-     * @return a new {@link TimeoutStreamMessage} with the specified timeout duration and mode applied
+     * @return a new {@link StreamMessage} with the specified timeout duration and mode applied
      */
     @UnstableApi
     default StreamMessage timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java
index 016c478c446..9ca591f514d 100644
--- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java
+++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java
@@ -25,7 +25,7 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-import com.linecorp.armeria.common.TimeoutException;
+import com.linecorp.armeria.common.StreamTimeoutException;
 import com.linecorp.armeria.common.annotation.Nullable;
 import com.linecorp.armeria.unsafe.PooledObjects;
 
@@ -34,7 +34,7 @@
 
 /**
  * This class provides timeout functionality to a base StreamMessage.
- * If data is not received within the specified time, a {@link TimeoutException} is thrown.
+ * If data is not received within the specified time, a {@link StreamTimeoutException} is thrown.
  *
  * 

The timeout functionality helps to release resources and throw appropriate exceptions * if the stream becomes inactive or data is not received within a certain time frame, @@ -47,8 +47,6 @@ final class TimeoutStreamMessage implements StreamMessage { private final StreamMessage delegate; private final Duration timeoutDuration; private final StreamTimeoutMode timeoutMode; - @Nullable - private TimeoutSubscriber timeoutSubscriber; /** * Creates a new TimeoutStreamMessage with the specified base stream message and timeout settings. @@ -64,51 +62,28 @@ final class TimeoutStreamMessage implements StreamMessage { this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); } - /** - * Returns {@code true} if this stream is still open. - * - * @return {@code true} if the stream is open, otherwise {@code false} - */ @Override public boolean isOpen() { return delegate.isOpen(); } - /** - * Returns {@code true} if this stream is closed and did not publish any elements. - * - * @return {@code true} if the stream is empty, otherwise {@code false} - */ @Override public boolean isEmpty() { return delegate.isEmpty(); } - /** - * Returns the current demand of this stream. - * - * @return the current demand - */ @Override public long demand() { return delegate.demand(); } - /** - * Returns a {@link CompletableFuture} that completes when this stream is complete, - * either successfully or exceptionally. - * - * @return a future that completes when the stream is complete - */ @Override public CompletableFuture whenComplete() { return delegate.whenComplete(); } /** - * Creates a {@link TimeoutSubscriber} with timeout logic applied using the specified - * subscriber, executor, duration, and mode. - * Then subscribes to the original stream with the TimeoutSubscriber, executor, and options. + * Subscribes the given subscriber to this stream with timeout logic applied. * * @param subscriber the subscriber to this stream * @param executor the executor for running timeout tasks and stream operations @@ -118,40 +93,17 @@ public CompletableFuture whenComplete() { @Override public void subscribe(Subscriber subscriber, EventExecutor executor, SubscriptionOption... options) { - timeoutSubscriber = new TimeoutSubscriber<>(subscriber, executor, timeoutDuration, timeoutMode); - delegate.subscribe(timeoutSubscriber, executor, options); - } - - /** - * Cancels the timeout schedule if it is set. - */ - private void cancelSchedule() { - if (timeoutSubscriber != null) { - timeoutSubscriber.cancelSchedule(); - } + delegate.subscribe(new TimeoutSubscriber<>(subscriber, executor, timeoutDuration, timeoutMode), + executor, options); } - /** - * Aborts the stream with an {@link AbortedStreamException} and prevents further subscriptions. - * Calling this method on a closed or aborted stream has no effect. - * Cancels the timeout schedule if it is set. - */ @Override public void abort() { - cancelSchedule(); delegate.abort(); } - /** - * Aborts the stream with the specified {@link Throwable} and prevents further subscriptions. - * Calling this method on a closed or aborted stream has no effect. - * Cancels the timeout schedule if it is set. - * - * @param cause the cause of the abort - */ @Override public void abort(Throwable cause) { - cancelSchedule(); delegate.abort(cause); } @@ -169,7 +121,7 @@ static final class TimeoutSubscriber implements Runnable, Subscriber, Subs private Subscription subscription; private long lastEventTimeNanos; private boolean completed; - private boolean canceled; + private volatile boolean canceled; TimeoutSubscriber(Subscriber delegate, EventExecutor executor, Duration timeoutDuration, StreamTimeoutMode timeoutMode) { @@ -180,8 +132,8 @@ static final class TimeoutSubscriber implements Runnable, Subscriber, Subs this.timeoutMode = requireNonNull(timeoutMode, "timeoutMode"); } - private ScheduledFuture scheduleTimeout(long delay, TimeUnit unit) { - return executor.schedule(this, delay, unit); + private ScheduledFuture scheduleTimeout(long delay) { + return executor.schedule(this, delay, TimeUnit.NANOSECONDS); } void cancelSchedule() { @@ -198,12 +150,12 @@ public void run() { if (elapsedNanos < timeoutNanos) { final long delayNanos = timeoutNanos - elapsedNanos; - timeoutFuture = scheduleTimeout(delayNanos, TimeUnit.NANOSECONDS); + timeoutFuture = scheduleTimeout(delayNanos); return; } } completed = true; - delegate.onError(new TimeoutException( + delegate.onError(new StreamTimeoutException( String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); subscription.cancel(); } @@ -216,7 +168,7 @@ public void onSubscribe(Subscription s) { return; } lastEventTimeNanos = System.nanoTime(); - timeoutFuture = scheduleTimeout(timeoutNanos, TimeUnit.NANOSECONDS); + timeoutFuture = scheduleTimeout(timeoutNanos); } @Override @@ -231,6 +183,7 @@ public void onNext(T t) { break; case UNTIL_FIRST: cancelSchedule(); + timeoutFuture = null; break; case UNTIL_EOS: break; diff --git a/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java b/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java index 10ae7240377..ded8d4a3762 100644 --- a/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java +++ b/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java @@ -15,8 +15,13 @@ */ package com.linecorp.armeria.common.websocket; +import static java.util.Objects.requireNonNull; + +import java.time.Duration; + import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.stream.StreamMessage; +import com.linecorp.armeria.common.stream.StreamTimeoutMode; import com.linecorp.armeria.internal.common.websocket.WebSocketWrapper; /** @@ -39,4 +44,16 @@ static WebSocketWriter streaming() { static WebSocket of(StreamMessage delegate) { return new WebSocketWrapper(delegate); } + + @UnstableApi + @Override + default WebSocket timeout(Duration timeoutDuration) { + return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT); + } + + @UnstableApi + @Override + default WebSocket timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) { + return of(StreamMessage.super.timeout(timeoutDuration, timeoutMode)); + } } diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java index 28a2cc8b425..085372b27f6 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/TimeoutStreamMessageTest.java @@ -29,7 +29,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import com.linecorp.armeria.common.TimeoutException; +import com.linecorp.armeria.common.StreamTimeoutException; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; class TimeoutStreamMessageTest { @@ -70,7 +70,7 @@ public void onComplete() { assertThatThrownBy(future::get) .isInstanceOf(ExecutionException.class) - .hasCauseInstanceOf(TimeoutException.class); + .hasCauseInstanceOf(StreamTimeoutException.class); } @Test @@ -137,7 +137,7 @@ public void onComplete() { assertThatThrownBy(future::get) .isInstanceOf(ExecutionException.class) - .hasCauseInstanceOf(TimeoutException.class); + .hasCauseInstanceOf(StreamTimeoutException.class); } @Test @@ -203,7 +203,7 @@ public void onComplete() { assertThatThrownBy(future::get) .isInstanceOf(ExecutionException.class) - .hasCauseInstanceOf(TimeoutException.class); + .hasCauseInstanceOf(StreamTimeoutException.class); } @Test From 994ce960c49a97ae4cec09742462ad26efcfc5cc Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 17 Jul 2024 20:11:41 +0900 Subject: [PATCH 23/26] lint pass --- .../com/linecorp/armeria/common/StreamTimeoutException.java | 2 +- .../java/com/linecorp/armeria/common/websocket/WebSocket.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java index b01382dbbef..a6c6a5b5458 100644 --- a/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java +++ b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java @@ -28,5 +28,5 @@ public final class StreamTimeoutException extends TimeoutException { /** * Creates a new instance with the specified {@code message}. */ - public StreamTimeoutException(@Nullable String message) {super(message);} + public StreamTimeoutException(@Nullable String message) { super(message); } } diff --git a/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java b/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java index ded8d4a3762..85e68b3591e 100644 --- a/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java +++ b/core/src/main/java/com/linecorp/armeria/common/websocket/WebSocket.java @@ -15,8 +15,6 @@ */ package com.linecorp.armeria.common.websocket; -import static java.util.Objects.requireNonNull; - import java.time.Duration; import com.linecorp.armeria.common.annotation.UnstableApi; From cdeec5c0f3ffebd305bd5918ff4bccef47be972a Mon Sep 17 00:00:00 2001 From: jeong-yong-shin Date: Wed, 17 Jul 2024 20:29:07 +0900 Subject: [PATCH 24/26] lint pass --- .../com/linecorp/armeria/common/StreamTimeoutException.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java index a6c6a5b5458..cae609c0d11 100644 --- a/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java +++ b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java @@ -28,5 +28,7 @@ public final class StreamTimeoutException extends TimeoutException { /** * Creates a new instance with the specified {@code message}. */ - public StreamTimeoutException(@Nullable String message) { super(message); } + public StreamTimeoutException(@Nullable String message) { + super(message); + } } From a95f279da3456ead8abad9146b021e97dba2d2a2 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Thu, 25 Jul 2024 18:13:11 +0900 Subject: [PATCH 25/26] javadoc --- .../common/StreamTimeoutException.java | 5 +++++ .../armeria/common/stream/StreamMessage.java | 10 +++++++--- .../common/stream/StreamTimeoutMode.java | 20 +++++++++---------- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java index cae609c0d11..112183f3f78 100644 --- a/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java +++ b/core/src/main/java/com/linecorp/armeria/common/StreamTimeoutException.java @@ -16,10 +16,15 @@ package com.linecorp.armeria.common; +import java.time.Duration; + import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.stream.StreamMessage; /** * A {@link TimeoutException} raised when a stream operation exceeds the configured timeout. + * + * @see StreamMessage#timeout(Duration) */ public final class StreamTimeoutException extends TimeoutException { diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java index 47069dd982a..9565eaa8270 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java @@ -48,6 +48,7 @@ import com.linecorp.armeria.common.CommonPools; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.StreamTimeoutException; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.util.Exceptions; @@ -1206,12 +1207,14 @@ default StreamMessage subscribeOn(EventExecutor eventExecutor) { } /** - * Configures a timeout for the stream based on the specified duration. - * The default timeout mode is {@link StreamTimeoutMode#UNTIL_NEXT}. + * Configures a timeout for the stream based on the specified duration with + * {@link StreamTimeoutMode#UNTIL_NEXT}. If no events are received within the specified duration, + * the stream will be terminated with a {@link StreamTimeoutException}. * *

Example usage: *

{@code
      * StreamMessage stream = ...;
+     * // An item must be received within 10 seconds of the previous item to avoid a timeout.
      * StreamMessage timeoutStream = stream.timeout(Duration.ofSeconds(10));
      * }
* @@ -1224,7 +1227,8 @@ default StreamMessage timeout(Duration timeoutDuration) { } /** - * Configures a timeout for the stream based on the specified duration and mode. + * Configures a timeout for the stream based on the specified duration and mode. >If no events are received + * within the specified duration, the stream will be terminated with a {@link StreamTimeoutException}. * *

Example usage: *

{@code
diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java
index a81b8f5accb..620fec4bcad 100644
--- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java
+++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamTimeoutMode.java
@@ -16,22 +16,22 @@
 
 package com.linecorp.armeria.common.stream;
 
-import com.linecorp.armeria.common.TimeoutException;
+import com.linecorp.armeria.common.StreamTimeoutException;
 import com.linecorp.armeria.common.annotation.UnstableApi;
 
 /**
  * Stream Timeout Mode consists of three modes.
  *
  * 
    - *
  • {@code UNTIL_FIRST} - Based on the first data chunk. + *
  • {@link #UNTIL_FIRST} - Based on the first data chunk. * If the first data chunk is not received within the specified time, - * a {@link TimeoutException} is thrown.
  • - *
  • {@code UNTIL_NEXT} - Based on each data chunk. + * a {@link StreamTimeoutException} is thrown.
  • + *
  • {@link #UNTIL_NEXT} - Based on each data chunk. * If each data chunk is not received within the specified time after the previous chunk, - * a {@link TimeoutException} is thrown.
  • - *
  • {@code UNTIL_EOS} - Based on the entire stream. + * a {@link StreamTimeoutException} is thrown.
  • + *
  • {@link #UNTIL_EOS} - Based on the entire stream. * If all data chunks are not received within the specified time before the end of the stream, - * a {@link TimeoutException} is thrown.
  • + * a {@link StreamTimeoutException} is thrown. *
*/ @UnstableApi @@ -40,21 +40,21 @@ public enum StreamTimeoutMode { /** * Based on the first data chunk. * If the first data chunk is not received within the specified time, - * a {@link TimeoutException} is thrown. + * a {@link StreamTimeoutException} is thrown. */ UNTIL_FIRST, /** * Based on each data chunk. * If each data chunk is not received within the specified time after the previous chunk, - * a {@link TimeoutException} is thrown. + * a {@link StreamTimeoutException} is thrown. */ UNTIL_NEXT, /** * Based on the entire stream. * If all data chunks are not received within the specified time before the end of the stream, - * a {@link TimeoutException} is thrown. + * a {@link StreamTimeoutException} is thrown. */ UNTIL_EOS } From 256009268560539fbb404166d7d3333655a1e394 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Thu, 25 Jul 2024 18:27:18 +0900 Subject: [PATCH 26/26] nullaway --- .../com/linecorp/armeria/client/RestClientPreparation.java | 2 ++ .../armeria/client/TransformingRequestPreparation.java | 3 ++- .../linecorp/armeria/common/stream/TimeoutStreamMessage.java | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/RestClientPreparation.java b/core/src/main/java/com/linecorp/armeria/client/RestClientPreparation.java index 86aea236e12..6a045373b0a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/RestClientPreparation.java +++ b/core/src/main/java/com/linecorp/armeria/client/RestClientPreparation.java @@ -173,6 +173,7 @@ public RestClientPreparation content(MediaType contentType, String content) { @Override @FormatMethod + @SuppressWarnings("FormatStringAnnotation") public RestClientPreparation content(@FormatString String format, Object... content) { delegate.content(format, content); return this; @@ -180,6 +181,7 @@ public RestClientPreparation content(@FormatString String format, Object... cont @Override @FormatMethod + @SuppressWarnings("FormatStringAnnotation") public RestClientPreparation content(MediaType contentType, @FormatString String format, Object... content) { delegate.content(contentType, format, content); diff --git a/core/src/main/java/com/linecorp/armeria/client/TransformingRequestPreparation.java b/core/src/main/java/com/linecorp/armeria/client/TransformingRequestPreparation.java index 852131f6a5d..911cb7c62a5 100644 --- a/core/src/main/java/com/linecorp/armeria/client/TransformingRequestPreparation.java +++ b/core/src/main/java/com/linecorp/armeria/client/TransformingRequestPreparation.java @@ -25,6 +25,7 @@ import org.reactivestreams.Publisher; import com.google.errorprone.annotations.FormatMethod; +import com.google.errorprone.annotations.FormatString; import com.linecorp.armeria.common.Cookie; import com.linecorp.armeria.common.ExchangeType; @@ -186,7 +187,7 @@ public TransformingRequestPreparation content(String format, Object... con @Override @FormatMethod @SuppressWarnings("FormatStringAnnotation") - public TransformingRequestPreparation content(MediaType contentType, String format, + public TransformingRequestPreparation content(MediaType contentType, @FormatString String format, Object... content) { delegate.content(contentType, format, content); return this; diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java index 9ca591f514d..4e13c462d68 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/TimeoutStreamMessage.java @@ -157,6 +157,7 @@ public void run() { completed = true; delegate.onError(new StreamTimeoutException( String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); + assert subscription != null; subscription.cancel(); } @@ -213,6 +214,7 @@ public void onComplete() { @Override public void request(long l) { + assert subscription != null; subscription.request(l); } @@ -220,6 +222,7 @@ public void request(long l) { public void cancel() { canceled = true; cancelSchedule(); + assert subscription != null; subscription.cancel(); } }