Skip to content

Commit

Permalink
Vijay idle receiver cpu (#18348)
Browse files Browse the repository at this point in the history
* Changing the message return daemon sleep to 10 milliseconds in CoreMessageReceiver.

* Fixing a bug in sessino pump that is causing the pump to stop accepting sessions if a session lock is lost.

* Updating service bus track1 SDK version to 3.6.0.

* Correcting a spelling.
  • Loading branch information
yvgopal authored Dec 30, 2020
1 parent 7ffbe01 commit 51ea22c
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 13 deletions.
2 changes: 1 addition & 1 deletion eng/spotbugs-aggregate-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>3.6.0-beta.1</version> <!-- {x-version-update;com.microsoft.azure:azure-servicebus;current} -->
<version>3.6.0</version> <!-- {x-version-update;com.microsoft.azure:azure-servicebus;current} -->
</dependency>
</dependencies>
<properties>
Expand Down
2 changes: 1 addition & 1 deletion eng/versioning/version_data.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ com.microsoft.azure:azure-keyvault-cryptography;1.2.4;1.3.0-beta.1
com.microsoft.azure:azure-keyvault-extensions;1.2.4;1.3.0-beta.1
com.microsoft.azure:azure-keyvault-test;1.2.3;1.2.4
com.microsoft.azure:azure-keyvault-webkey;1.2.4;1.3.0-beta.1
com.microsoft.azure:azure-servicebus;3.5.1;3.6.0-beta.1
com.microsoft.azure:azure-servicebus;3.5.1;3.6.0
com.microsoft.azure:azure-storage-blob;11.0.2;11.0.2
com.microsoft.azure.msi_auth_token_provider:azure-authentication-msi-token-provider;1.1.0-beta.1;1.1.0-beta.1
com.microsoft.azure:azure-eventgrid;1.4.0-beta.1;1.4.0-beta.1
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/microsoft-azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The package can be downloaded from [Maven](https://search.maven.org/artifact/com
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>3.5.1</version>
<version>3.6.0</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/microsoft-azure-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>3.6.0-beta.1</version> <!-- {x-version-update;com.microsoft.azure:azure-servicebus;current} -->
<version>3.6.0</version> <!-- {x-version-update;com.microsoft.azure:azure-servicebus;current} -->

<name>Microsoft Azure SDK for Service Bus</name>
<description>Java library for Azure Service Bus</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private synchronized void setHandlerRegistered() {

private void receiveAndPumpMessage() {
if (!this.getIsClosingOrClosed()) {
CompletableFuture<IMessage> receiveMessageFuture = this.innerReceiver.receiveAsync(this.messageHandlerOptions.getMessageWaitDuration());
CompletableFuture<IMessage> receiveMessageFuture = receiveAsyncWrapper(this.innerReceiver, this.messageHandlerOptions.getMessageWaitDuration());
receiveMessageFuture.handleAsync((message, receiveEx) -> {
if (receiveEx != null) {
receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx);
Expand Down Expand Up @@ -203,7 +203,7 @@ private void receiveAndPumpMessage() {
dispositionPhase = ExceptionPhase.COMPLETE;
if (this.messageHandlerOptions.isAutoComplete()) {
TRACE_LOGGER.debug("Completing message with sequence number '{}'", message.getSequenceNumber());
updateDispositionFuture = this.innerReceiver.completeAsync(message.getLockToken());
updateDispositionFuture = completeAsyncWrapper(this.innerReceiver, message.getLockToken());
} else {
updateDispositionFuture = CompletableFuture.completedFuture(null);
}
Expand All @@ -212,7 +212,7 @@ private void receiveAndPumpMessage() {
dispositionPhase = ExceptionPhase.ABANDON;
if (this.messageHandlerOptions.isAutoComplete()) {
TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber());
updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken());
updateDispositionFuture = abandonAsyncWrapper(this.innerReceiver, message.getLockToken());
} else {
updateDispositionFuture = CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -290,7 +290,7 @@ private void acceptSessionAndPumpMessages() {
private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
if (!this.getIsClosingOrClosed()) {
IMessageSession session = sessionTracker.getSession();
CompletableFuture<IMessage> receiverFuture = session.receiveAsync(this.sessionHandlerOptions.getMessageWaitDuration());
CompletableFuture<IMessage> receiverFuture = receiveAsyncWrapper(session, this.sessionHandlerOptions.getMessageWaitDuration());
receiverFuture.handleAsync((message, receiveEx) -> {
if (receiveEx != null) {
receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx);
Expand Down Expand Up @@ -350,7 +350,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
dispositionPhase = ExceptionPhase.COMPLETE;
if (this.sessionHandlerOptions.isAutoComplete()) {
TRACE_LOGGER.debug("Completing message with sequence number '{}'", message.getSequenceNumber());
updateDispositionFuture = session.completeAsync(message.getLockToken());
updateDispositionFuture = completeAsyncWrapper(session, message.getLockToken());
} else {
updateDispositionFuture = CompletableFuture.completedFuture(null);
}
Expand All @@ -359,7 +359,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
dispositionPhase = ExceptionPhase.ABANDON;
if (this.sessionHandlerOptions.isAutoComplete()) {
TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber());
updateDispositionFuture = session.abandonAsync(message.getLockToken());
updateDispositionFuture = abandonAsyncWrapper(session, message.getLockToken());
} else {
updateDispositionFuture = CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -568,7 +568,7 @@ protected void loop() {
if (renewInterval != null && !renewInterval.isNegative()) {
this.timerFuture = Timer.schedule(() -> {
TRACE_LOGGER.debug("Renewing lock on '{}'", this.messageIdentifier);
this.innerReceiver.renewMessageLockAsync(message).handleAsync((v, renewLockEx) -> {
renewMessageLockAsyncWrapper(this.innerReceiver, message).handleAsync((v, renewLockEx) -> {
if (renewLockEx != null) {
renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
TRACE_LOGGER.info("Renewing lock on '{}' failed", this.messageIdentifier, renewLockEx);
Expand Down Expand Up @@ -622,7 +622,7 @@ protected void loop() {
if (renewInterval != null && !renewInterval.isNegative()) {
this.timerFuture = Timer.schedule(() -> {
TRACE_LOGGER.debug("Renewing lock on '{}'", this.sessionIdentifier);
this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) -> {
renewSessionLockAsyncWrapper(this.session).handleAsync((v, renewLockEx) -> {
if (renewLockEx != null) {
renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
TRACE_LOGGER.info("Renewing lock on '{}' failed", this.sessionIdentifier, renewLockEx);
Expand Down Expand Up @@ -839,6 +839,58 @@ private void notifyExceptionToMessageHandler(Throwable ex, ExceptionPhase phase)
this.customCodeExecutor.execute(() -> this.messageHandler.notifyException(ex, phase));
}
}

// These wrappers catch any synchronous exceptions and properly complete completablefutures with those excetions.
// Callers of these methods don't expect any synchronous exceptions.
private static CompletableFuture<IMessage> receiveAsyncWrapper(IMessageReceiver receiver, Duration serverWaitTime) {
try {
return receiver.receiveAsync(serverWaitTime);
} catch (Throwable t) {
CompletableFuture<IMessage> exceptionalFuture = new CompletableFuture<IMessage>();
exceptionalFuture.completeExceptionally(t);
return exceptionalFuture;
}
}

private static CompletableFuture<Void> completeAsyncWrapper(IMessageReceiver receiver, UUID lockToken) {
try {
return receiver.completeAsync(lockToken);
} catch (Throwable t) {
CompletableFuture<Void> exceptionalFuture = new CompletableFuture<Void>();
exceptionalFuture.completeExceptionally(t);
return exceptionalFuture;
}
}

private static CompletableFuture<Void> abandonAsyncWrapper(IMessageReceiver receiver, UUID lockToken) {
try {
return receiver.abandonAsync(lockToken);
} catch (Throwable t) {
CompletableFuture<Void> exceptionalFuture = new CompletableFuture<Void>();
exceptionalFuture.completeExceptionally(t);
return exceptionalFuture;
}
}

private static CompletableFuture<Instant> renewMessageLockAsyncWrapper(IMessageReceiver receiver, IMessage message) {
try {
return receiver.renewMessageLockAsync(message);
} catch (Throwable t) {
CompletableFuture<Instant> exceptionalFuture = new CompletableFuture<Instant>();
exceptionalFuture.completeExceptionally(t);
return exceptionalFuture;
}
}

private static CompletableFuture<Void> renewSessionLockAsyncWrapper(IMessageSession session) {
try {
return session.renewSessionLockAsync();
} catch (Throwable t) {
CompletableFuture<Void> exceptionalFuture = new CompletableFuture<Void>();
exceptionalFuture.completeExceptionally(t);
return exceptionalFuture;
}
}

@Override
public int getPrefetchCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class);
private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5); // service closes link long before this timeout expires
private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1); // Wakes up every 1 millisecond
private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(10); // Wakes up every few milliseconds
private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(500); // Wakes up every 500 milliseconds
private static final Duration ZERO_TIMEOUT_APPROXIMATION = Duration.ofMillis(200);
private static final int CREDIT_FLOW_BATCH_SIZE = 50; // Arbitrarily chosen 50 to avoid sending too many flows in case prefetch count is large
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,10 @@ public void testSessionPumpRenewLock() throws InterruptedException, ServiceBusEx
this.createClients(ReceiveMode.PEEKLOCK);
MessageAndSessionPumpTests.testSessionPumpRenewLock(this.sendClient, this.receiveClient);
}

@Test
public void testSessionPumpLockLost() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
MessageAndSessionPumpTests.testSessionPumpLockLost(this.sendClient, this.receiveClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,34 @@ public static void testSessionPumpRenewLock(IMessageSender sender, IMessageAndSe
// So completes will pass before links are closed by teardown
Thread.sleep(1000);
}

public static void testSessionPumpLockLost(IMessageSender sender, IMessageAndSessionPump sessionPump) throws InterruptedException, ServiceBusException {
int numSessions = 5;
int numMessagePerSession = 2;
ArrayList<String> sessionIds = new ArrayList<>();
for (int i = 0; i < numSessions; i++) {
String sessionId = StringUtil.getRandomString();
sessionIds.add(sessionId);
for (int j = 0; j < numMessagePerSession; j++) {
Message message = new Message("AMQPMessage");
message.setSessionId(sessionId);
sender.send(message);
}
}

boolean autoComplete = true;
int sleepMinutes = 2; // This should be less than message lock duration of the queue or subscription
CountingSessionHandler sessionHandler = new CountingSessionHandler(sessionPump, !autoComplete, numSessions * numMessagePerSession, false, Duration.ofMinutes(sleepMinutes));
// Register a handler that doesn't renew session lock
sessionPump.registerSessionHandler(sessionHandler, new SessionHandlerOptions(1, 1, autoComplete, Duration.ZERO), EXECUTOR_SERVICE);
// Sleep for a little longer than the handler sleep time.
// Session lock should be lost and a new session should be accepted.
Thread.sleep(1000 * 60 * 3);

Assert.assertTrue("Another session not received by session pump, after session lock lost", sessionHandler.getReceivedSessions().size() > 1);
// So completes will pass before links are closed by teardown
Thread.sleep(1000);
}

private static class CountingMessageHandler extends TestMessageHandler {
private IMessageAndSessionPump messagePump;
Expand Down

0 comments on commit 51ea22c

Please sign in to comment.