diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index d93b398cf5da4..416286ace2002 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -108,7 +108,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption final Mono activeEndpoint = getEndpointStates() .filter(state -> state == AmqpEndpointState.ACTIVE) .next() - .timeout(operationTimeout, Mono.error(new AmqpException(false, String.format( + .timeout(operationTimeout, Mono.error(new AmqpException(true, String.format( "Connection '%s' not opened within AmqpRetryOptions.tryTimeout(): %s", connectionId, operationTimeout), handler.getErrorContext()))); return activeEndpoint.thenReturn(reactorConnection); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index c2600c0497e54..17f76c3fafe6a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -37,7 +37,7 @@ /** * Handles receiving events from Event Hubs service and translating them to proton-j messages. */ -public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable { +public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable, AutoCloseable { private final String entityPath; private final Receiver receiver; private final ReceiveLinkHandler handler; @@ -225,6 +225,7 @@ public void dispose() { close(); } + @Override public void close() { closeAsync().block(retryOptions.getTryTimeout()); } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index 9159290771fa6..2ee4b2eb2dfc9 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -62,7 +62,7 @@ /** * Handles scheduling and transmitting events through proton-j to Event Hubs service. */ -class ReactorSender implements AmqpSendLink, AsyncAutoCloseable { +class ReactorSender implements AmqpSendLink, AsyncAutoCloseable, AutoCloseable { private final String entityPath; private final Sender sender; private final SendLinkHandler handler; @@ -337,6 +337,7 @@ public void dispose() { close(); } + @Override public void close() { closeAsync().block(retryOptions.getTryTimeout()); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java index 53e4a60512621..f865fa579123c 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java @@ -422,7 +422,7 @@ void createCBSNodeTimeoutException() throws IOException { assertTrue(error instanceof AmqpException); final AmqpException amqpException = (AmqpException) error; - assertFalse(amqpException.isTransient()); + assertTrue(amqpException.isTransient()); assertNull(amqpException.getErrorCondition()); assertNotNull(amqpException.getMessage()); @@ -583,7 +583,7 @@ void cannotCreateResourcesOnFailure() { Assertions.fail("Exception was not the correct type: " + error); } - assertFalse(amqpException.isTransient()); + assertTrue(amqpException.isTransient()); }; when(event.getTransport()).thenReturn(transport); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index ae78b5abb7ffd..f865191c55092 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -29,7 +29,7 @@ /** * Represents an session that is received when "any" session is accepted from the service. */ -class ServiceBusSessionReceiver implements AsyncAutoCloseable { +class ServiceBusSessionReceiver implements AsyncAutoCloseable, AutoCloseable { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final LockContainer lockContainer; private final AtomicReference sessionLockedUntil = new AtomicReference<>(); @@ -212,6 +212,7 @@ public Mono closeAsync() { return receiveLink.closeAsync().doFinally(signal -> subscriptions.dispose()); } + @Override public void close() { closeAsync().block(retryOptions.getTryTimeout()); }