From b32f14858502f6f3fc5bbdb34d6ba784c574a4c2 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 6 Jan 2025 23:18:22 +0530 Subject: [PATCH] fix(spanner): unflake unit tests --- .../spanner/ResumableStreamIterator.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 39165da2d38..eae2f7ed746 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -65,10 +65,11 @@ abstract class ResumableStreamIterator extends AbstractIterator buffer = new LinkedList<>(); + private final Object monitor = new Object(); private final int maxBufferSize; private final ISpan span; private final TraceWrapper tracer; - private CloseableIterator stream; + private volatile CloseableIterator stream; private ByteString resumeToken; private boolean finished; /** @@ -317,15 +318,17 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) { } private void startGrpcStreaming() { - if (stream == null) { - span.addAnnotation( - "Starting/Resuming stream", - "ResumeToken", - resumeToken == null ? "null" : resumeToken.toStringUtf8()); - try (IScope scope = tracer.withSpan(span)) { - // When start a new stream set the Span as current to make the gRPC Span a child of - // this Span. - stream = checkNotNull(startStream(resumeToken, streamMessageListener)); + synchronized (monitor) { + if (stream == null) { + span.addAnnotation( + "Starting/Resuming stream", + "ResumeToken", + resumeToken == null ? "null" : resumeToken.toStringUtf8()); + try (IScope scope = tracer.withSpan(span)) { + // When start a new stream set the Span as current to make the gRPC Span a child of + // this Span. + stream = checkNotNull(startStream(resumeToken, streamMessageListener)); + } } } }