diff --git a/eng/spotbugs-aggregate-report/pom.xml b/eng/spotbugs-aggregate-report/pom.xml index 0c100ad2014c8..7a2b72b62ce21 100644 --- a/eng/spotbugs-aggregate-report/pom.xml +++ b/eng/spotbugs-aggregate-report/pom.xml @@ -140,7 +140,7 @@ com.microsoft.azure azure-servicebus - 3.6.0-beta.1 + 3.6.0 diff --git a/eng/versioning/version_data.txt b/eng/versioning/version_data.txt index da53965c2792b..e7b147f2e7bc2 100644 --- a/eng/versioning/version_data.txt +++ b/eng/versioning/version_data.txt @@ -34,7 +34,7 @@ com.microsoft.azure:azure-keyvault-cryptography;1.2.4;1.3.0-beta.1 com.microsoft.azure:azure-keyvault-extensions;1.2.4;1.3.0-beta.1 com.microsoft.azure:azure-keyvault-test;1.2.3;1.2.4 com.microsoft.azure:azure-keyvault-webkey;1.2.4;1.3.0-beta.1 -com.microsoft.azure:azure-servicebus;3.5.1;3.6.0-beta.1 +com.microsoft.azure:azure-servicebus;3.5.1;3.6.0 com.microsoft.azure:azure-storage-blob;11.0.2;11.0.2 com.microsoft.azure.msi_auth_token_provider:azure-authentication-msi-token-provider;1.1.0-beta.1;1.1.0-beta.1 com.microsoft.azure:azure-eventgrid;1.4.0-beta.1;1.4.0-beta.1 diff --git a/sdk/servicebus/microsoft-azure-servicebus/README.md b/sdk/servicebus/microsoft-azure-servicebus/README.md index 35f0342a8562d..816bf1c1f6d6f 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/README.md +++ b/sdk/servicebus/microsoft-azure-servicebus/README.md @@ -19,7 +19,7 @@ The package can be downloaded from [Maven](https://search.maven.org/artifact/com com.microsoft.azure azure-servicebus - 3.5.1 + 3.6.0 ``` [//]: # ({x-version-update-end}) diff --git a/sdk/servicebus/microsoft-azure-servicebus/pom.xml b/sdk/servicebus/microsoft-azure-servicebus/pom.xml index bcbd34e1a115f..e83f978a78588 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/pom.xml +++ b/sdk/servicebus/microsoft-azure-servicebus/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.microsoft.azure azure-servicebus - 3.6.0-beta.1 + 3.6.0 Microsoft Azure SDK for Service Bus Java library for Azure Service Bus diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java index 6604c793e4fdd..1e8922096eb10 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java @@ -146,7 +146,7 @@ private synchronized void setHandlerRegistered() { private void receiveAndPumpMessage() { if (!this.getIsClosingOrClosed()) { - CompletableFuture receiveMessageFuture = this.innerReceiver.receiveAsync(this.messageHandlerOptions.getMessageWaitDuration()); + CompletableFuture receiveMessageFuture = receiveAsyncWrapper(this.innerReceiver, this.messageHandlerOptions.getMessageWaitDuration()); receiveMessageFuture.handleAsync((message, receiveEx) -> { if (receiveEx != null) { receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx); @@ -203,7 +203,7 @@ private void receiveAndPumpMessage() { dispositionPhase = ExceptionPhase.COMPLETE; if (this.messageHandlerOptions.isAutoComplete()) { TRACE_LOGGER.debug("Completing message with sequence number '{}'", message.getSequenceNumber()); - updateDispositionFuture = this.innerReceiver.completeAsync(message.getLockToken()); + updateDispositionFuture = completeAsyncWrapper(this.innerReceiver, message.getLockToken()); } else { updateDispositionFuture = CompletableFuture.completedFuture(null); } @@ -212,7 +212,7 @@ private void receiveAndPumpMessage() { dispositionPhase = ExceptionPhase.ABANDON; if (this.messageHandlerOptions.isAutoComplete()) { TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); - updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken()); + updateDispositionFuture = abandonAsyncWrapper(this.innerReceiver, message.getLockToken()); } else { updateDispositionFuture = CompletableFuture.completedFuture(null); } @@ -290,7 +290,7 @@ private void acceptSessionAndPumpMessages() { private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) { if (!this.getIsClosingOrClosed()) { IMessageSession session = sessionTracker.getSession(); - CompletableFuture receiverFuture = session.receiveAsync(this.sessionHandlerOptions.getMessageWaitDuration()); + CompletableFuture receiverFuture = receiveAsyncWrapper(session, this.sessionHandlerOptions.getMessageWaitDuration()); receiverFuture.handleAsync((message, receiveEx) -> { if (receiveEx != null) { receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx); @@ -350,7 +350,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) { dispositionPhase = ExceptionPhase.COMPLETE; if (this.sessionHandlerOptions.isAutoComplete()) { TRACE_LOGGER.debug("Completing message with sequence number '{}'", message.getSequenceNumber()); - updateDispositionFuture = session.completeAsync(message.getLockToken()); + updateDispositionFuture = completeAsyncWrapper(session, message.getLockToken()); } else { updateDispositionFuture = CompletableFuture.completedFuture(null); } @@ -359,7 +359,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) { dispositionPhase = ExceptionPhase.ABANDON; if (this.sessionHandlerOptions.isAutoComplete()) { TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); - updateDispositionFuture = session.abandonAsync(message.getLockToken()); + updateDispositionFuture = abandonAsyncWrapper(session, message.getLockToken()); } else { updateDispositionFuture = CompletableFuture.completedFuture(null); } @@ -568,7 +568,7 @@ protected void loop() { if (renewInterval != null && !renewInterval.isNegative()) { this.timerFuture = Timer.schedule(() -> { TRACE_LOGGER.debug("Renewing lock on '{}'", this.messageIdentifier); - this.innerReceiver.renewMessageLockAsync(message).handleAsync((v, renewLockEx) -> { + renewMessageLockAsyncWrapper(this.innerReceiver, message).handleAsync((v, renewLockEx) -> { if (renewLockEx != null) { renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx); TRACE_LOGGER.info("Renewing lock on '{}' failed", this.messageIdentifier, renewLockEx); @@ -622,7 +622,7 @@ protected void loop() { if (renewInterval != null && !renewInterval.isNegative()) { this.timerFuture = Timer.schedule(() -> { TRACE_LOGGER.debug("Renewing lock on '{}'", this.sessionIdentifier); - this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) -> { + renewSessionLockAsyncWrapper(this.session).handleAsync((v, renewLockEx) -> { if (renewLockEx != null) { renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx); TRACE_LOGGER.info("Renewing lock on '{}' failed", this.sessionIdentifier, renewLockEx); @@ -839,6 +839,58 @@ private void notifyExceptionToMessageHandler(Throwable ex, ExceptionPhase phase) this.customCodeExecutor.execute(() -> this.messageHandler.notifyException(ex, phase)); } } + + // These wrappers catch any synchronous exceptions and properly complete completablefutures with those excetions. + // Callers of these methods don't expect any synchronous exceptions. + private static CompletableFuture receiveAsyncWrapper(IMessageReceiver receiver, Duration serverWaitTime) { + try { + return receiver.receiveAsync(serverWaitTime); + } catch (Throwable t) { + CompletableFuture exceptionalFuture = new CompletableFuture(); + exceptionalFuture.completeExceptionally(t); + return exceptionalFuture; + } + } + + private static CompletableFuture completeAsyncWrapper(IMessageReceiver receiver, UUID lockToken) { + try { + return receiver.completeAsync(lockToken); + } catch (Throwable t) { + CompletableFuture exceptionalFuture = new CompletableFuture(); + exceptionalFuture.completeExceptionally(t); + return exceptionalFuture; + } + } + + private static CompletableFuture abandonAsyncWrapper(IMessageReceiver receiver, UUID lockToken) { + try { + return receiver.abandonAsync(lockToken); + } catch (Throwable t) { + CompletableFuture exceptionalFuture = new CompletableFuture(); + exceptionalFuture.completeExceptionally(t); + return exceptionalFuture; + } + } + + private static CompletableFuture renewMessageLockAsyncWrapper(IMessageReceiver receiver, IMessage message) { + try { + return receiver.renewMessageLockAsync(message); + } catch (Throwable t) { + CompletableFuture exceptionalFuture = new CompletableFuture(); + exceptionalFuture.completeExceptionally(t); + return exceptionalFuture; + } + } + + private static CompletableFuture renewSessionLockAsyncWrapper(IMessageSession session) { + try { + return session.renewSessionLockAsync(); + } catch (Throwable t) { + CompletableFuture exceptionalFuture = new CompletableFuture(); + exceptionalFuture.completeExceptionally(t); + return exceptionalFuture; + } + } @Override public int getPrefetchCount() { diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index d3cb55f12559a..ff98d196fc461 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -66,7 +66,7 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class); private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5); // service closes link long before this timeout expires - private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1); // Wakes up every 1 millisecond + private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(10); // Wakes up every few milliseconds private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(500); // Wakes up every 500 milliseconds private static final Duration ZERO_TIMEOUT_APPROXIMATION = Duration.ofMillis(200); private static final int CREDIT_FLOW_BATCH_SIZE = 50; // Arbitrarily chosen 50 to avoid sending too many flows in case prefetch count is large diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java index 7ed6de11467e8..6079e0fa44ff0 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java @@ -161,4 +161,10 @@ public void testSessionPumpRenewLock() throws InterruptedException, ServiceBusEx this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testSessionPumpRenewLock(this.sendClient, this.receiveClient); } + + @Test + public void testSessionPumpLockLost() throws InterruptedException, ServiceBusException { + this.createClients(ReceiveMode.PEEKLOCK); + MessageAndSessionPumpTests.testSessionPumpLockLost(this.sendClient, this.receiveClient); + } } diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java index 956fc3f2f9068..914e08e9ce9b6 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java @@ -255,6 +255,34 @@ public static void testSessionPumpRenewLock(IMessageSender sender, IMessageAndSe // So completes will pass before links are closed by teardown Thread.sleep(1000); } + + public static void testSessionPumpLockLost(IMessageSender sender, IMessageAndSessionPump sessionPump) throws InterruptedException, ServiceBusException { + int numSessions = 5; + int numMessagePerSession = 2; + ArrayList sessionIds = new ArrayList<>(); + for (int i = 0; i < numSessions; i++) { + String sessionId = StringUtil.getRandomString(); + sessionIds.add(sessionId); + for (int j = 0; j < numMessagePerSession; j++) { + Message message = new Message("AMQPMessage"); + message.setSessionId(sessionId); + sender.send(message); + } + } + + boolean autoComplete = true; + int sleepMinutes = 2; // This should be less than message lock duration of the queue or subscription + CountingSessionHandler sessionHandler = new CountingSessionHandler(sessionPump, !autoComplete, numSessions * numMessagePerSession, false, Duration.ofMinutes(sleepMinutes)); + // Register a handler that doesn't renew session lock + sessionPump.registerSessionHandler(sessionHandler, new SessionHandlerOptions(1, 1, autoComplete, Duration.ZERO), EXECUTOR_SERVICE); + // Sleep for a little longer than the handler sleep time. + // Session lock should be lost and a new session should be accepted. + Thread.sleep(1000 * 60 * 3); + + Assert.assertTrue("Another session not received by session pump, after session lock lost", sessionHandler.getReceivedSessions().size() > 1); + // So completes will pass before links are closed by teardown + Thread.sleep(1000); + } private static class CountingMessageHandler extends TestMessageHandler { private IMessageAndSessionPump messagePump;