From c14eabf53e8a59558e2b5fd106ea88dbac3ab3a1 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sat, 16 Nov 2024 00:45:16 +0800 Subject: [PATCH] feat(stream): fail pending request if server close stream without exception --- .../io/streamnative/oxia/client/batch/ReadBatch.java | 8 ++++++++ .../oxia/client/grpc/WriteStreamWrapper.java | 12 +++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java index 348e08bc..39b2cc11 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java @@ -23,6 +23,8 @@ import io.streamnative.oxia.proto.ReadResponse; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; + import lombok.NonNull; final class ReadBatch extends BatchBase implements Batch, StreamObserver { @@ -83,6 +85,12 @@ public void onError(Throwable batchError) { @Override public void onCompleted() { + // complete pending request if the server close stream without any response + gets.forEach(g -> { + if (!g.callback().isDone()) { + g.fail(new CancellationException()); + } + }); factory.getReadRequestLatencyHistogram().recordSuccess(System.nanoTime() - startSendTimeNanos); } diff --git a/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java b/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java index a12367cd..4f450d77 100644 --- a/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java +++ b/client/src/main/java/io/streamnative/oxia/client/grpc/WriteStreamWrapper.java @@ -22,6 +22,7 @@ import io.streamnative.oxia.proto.WriteResponse; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; @@ -63,7 +64,16 @@ public void onError(Throwable t) { } @Override - public void onCompleted() {} + public void onCompleted() { + synchronized (WriteStreamWrapper.this) { + // complete pending request if the server close stream without any response + pendingWrites.forEach(f -> { + if (!f.isDone()) { + f.completeExceptionally(new CancellationException()); + } + }); + } + } public CompletableFuture send(WriteRequest request) { synchronized (WriteStreamWrapper.this) {