From baf2b5ab77922169d9c71dd7b79826115049b574 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Tue, 7 Jan 2025 02:10:45 +0530 Subject: [PATCH] fix(spanner): unflake unit tests --- .../java/com/google/cloud/spanner/AbstractReadContext.java | 2 -- .../java/com/google/cloud/spanner/AbstractResultSet.java | 2 ++ .../java/com/google/cloud/spanner/GrpcStreamIterator.java | 7 +++++++ .../com/google/cloud/spanner/ResumableStreamIterator.java | 1 + 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index abcaa41e32c..4ceb72a466d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -797,7 +797,6 @@ CloseableIterator startStream( isRouteToLeader()); session.markUsed(clock.instant()); stream.setCall(call, request.getTransaction().hasBegin()); - call.request(prefetchChunks); return stream; } @@ -992,7 +991,6 @@ CloseableIterator startStream( isRouteToLeader()); session.markUsed(clock.instant()); stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); - call.request(prefetchChunks); return stream; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 3dca970f96e..3ae1cecf813 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -158,6 +158,8 @@ interface CloseableIterator extends Iterator { default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { return false; } + + default void requestPrefetchChunks() {}; } static double valueProtoToFloat64(com.google.protobuf.Value proto) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index 60a52b78f25..411ee6c345d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -39,6 +39,7 @@ class GrpcStreamIterator extends AbstractIterator implements CloseableIterator { private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName()); static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); + private final int prefetchChunks; private AsyncResultSet.StreamMessageListener streamMessageListener; private final ConsumerImpl consumer; @@ -60,6 +61,7 @@ class GrpcStreamIterator extends AbstractIterator GrpcStreamIterator( Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { this.statement = statement; + this.prefetchChunks = prefetchChunks; this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed); // One extra to allow for END_OF_STREAM message. this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1); @@ -102,6 +104,11 @@ public void close(@Nullable String message) { } } + @Override + public void requestPrefetchChunks() { + call.request(prefetchChunks); + } + @Override public boolean isWithBeginTransaction() { return withBeginTransaction; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 39165da2d38..793f3bcbe32 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -326,6 +326,7 @@ private void startGrpcStreaming() { // When start a new stream set the Span as current to make the gRPC Span a child of // this Span. stream = checkNotNull(startStream(resumeToken, streamMessageListener)); + stream.requestPrefetchChunks(); } } }