From 25cfedef26ffca19427399f06943c33aef560903 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 6 Jan 2025 22:49:49 +0530 Subject: [PATCH] fix --- .../com/google/cloud/spanner/SessionPool.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index e560ca71deb..b66ff4f99f2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -55,6 +55,7 @@ import com.google.cloud.Tuple; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; +import com.google.cloud.spanner.AsyncResultSet.StreamMessageListener; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.Options.TransactionOption; @@ -266,6 +267,55 @@ private ResultSet wrap(final CachedResultSetSupplier resultSetSupplier) { return new ForwardingResultSet(resultSetSupplier) { private boolean beforeFirst = true; + @Override + public boolean initiateStreaming(StreamMessageListener streamMessageListener) { + while (true) { + try { + return internalInitiateStreaming(streamMessageListener); + } catch (SessionNotFoundException e) { + while (true) { + // Keep the replace-if-possible outside the try-block to let the exception bubble up + // if it's too late to replace the session. + replaceSessionIfPossible(e); + try { + replaceDelegate(resultSetSupplier.reload()); + break; + } catch (SessionNotFoundException snfe) { + e = snfe; + // retry on yet another session. + } + } + } + } + } + + private boolean internalInitiateStreaming(final StreamMessageListener streamMessageListener) { + try { + boolean ret = super.initiateStreaming(streamMessageListener); + if (beforeFirst) { + synchronized (lock) { + session.get().markUsed(); + beforeFirst = false; + sessionUsedForQuery = true; + } + } + if (!ret && isSingleUse) { + close(); + } + return ret; + } catch (SessionNotFoundException e) { + throw e; + } catch (SpannerException e) { + synchronized (lock) { + if (!closed && isSingleUse) { + session.get().setLastException(e); + AutoClosingReadContext.this.close(); + } + } + throw e; + } + } + @Override public boolean next() throws SpannerException { while (true) {