diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index abcaa41e32..4ceb72a466 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -797,7 +797,6 @@ CloseableIterator startStream( isRouteToLeader()); session.markUsed(clock.instant()); stream.setCall(call, request.getTransaction().hasBegin()); - call.request(prefetchChunks); return stream; } @@ -992,7 +991,6 @@ CloseableIterator startStream( isRouteToLeader()); session.markUsed(clock.instant()); stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); - call.request(prefetchChunks); return stream; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 3dca970f96..f9f5062d20 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -158,6 +158,9 @@ interface CloseableIterator extends Iterator { default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { return false; } + + /** it requests the initial prefetch chunks from gRPC stream */ + default void requestPrefetchChunks() {}; } static double valueProtoToFloat64(com.google.protobuf.Value proto) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index 60a52b78f2..e4196e2ac6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -39,6 +40,7 @@ class GrpcStreamIterator extends AbstractIterator implements CloseableIterator { private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName()); static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); + private final int prefetchChunks; private AsyncResultSet.StreamMessageListener streamMessageListener; private final ConsumerImpl consumer; @@ -60,6 +62,7 @@ class GrpcStreamIterator extends AbstractIterator GrpcStreamIterator( Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { this.statement = statement; + this.prefetchChunks = prefetchChunks; this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed); // One extra to allow for END_OF_STREAM message. this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1); @@ -102,6 +105,13 @@ public void close(@Nullable String message) { } } + @Override + @InternalApi + public void requestPrefetchChunks() { + Preconditions.checkState(call != null, "The StreamingCall object is not initialized"); + call.request(prefetchChunks); + } + @Override public boolean isWithBeginTransaction() { return withBeginTransaction; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 39165da2d3..793f3bcbe3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -326,6 +326,7 @@ private void startGrpcStreaming() { // When start a new stream set the Span as current to make the gRPC Span a child of // this Span. stream = checkNotNull(startStream(resumeToken, streamMessageListener)); + stream.requestPrefetchChunks(); } } }