Skip to content

Commit

Permalink
fix(spanner): unflake unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sakthivelmanii committed Jan 7, 2025
1 parent 6225efa commit aedcfd1
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,6 @@ CloseableIterator<PartialResultSet> startStream(
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down Expand Up @@ -992,7 +991,6 @@ CloseableIterator<PartialResultSet> startStream(
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ interface CloseableIterator<T> extends Iterator<T> {
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
return false;
}

/** it requests the initial prefetch chunks from gRPC stream */
default void requestPrefetchChunks() {};
}

static double valueProtoToFloat64(com.google.protobuf.Value proto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All @@ -39,6 +40,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
implements CloseableIterator<PartialResultSet> {
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
private final int prefetchChunks;
private AsyncResultSet.StreamMessageListener streamMessageListener;

private final ConsumerImpl consumer;
Expand All @@ -60,6 +62,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
GrpcStreamIterator(
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
this.statement = statement;
this.prefetchChunks = prefetchChunks;
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
// One extra to allow for END_OF_STREAM message.
this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1);
Expand Down Expand Up @@ -102,6 +105,13 @@ public void close(@Nullable String message) {
}
}

@Override
@InternalApi
public void requestPrefetchChunks() {
Preconditions.checkState(call != null, "The StreamingCall object is not initialized");
call.request(prefetchChunks);
}

@Override
public boolean isWithBeginTransaction() {
return withBeginTransaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ private void startGrpcStreaming() {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
stream.requestPrefetchChunks();
}
}
}
Expand Down

0 comments on commit aedcfd1

Please sign in to comment.