Skip to content

Commit

Permalink
Use AutoClosable for to ReactorSender, ReactorReceiver and ServiceBus…
Browse files Browse the repository at this point in the history
…SessionReceiver (Azure#21834)
  • Loading branch information
YijunXieMS authored May 26, 2021
1 parent 9201432 commit 9d6deb6
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
final Mono<AmqpEndpointState> activeEndpoint = getEndpointStates()
.filter(state -> state == AmqpEndpointState.ACTIVE)
.next()
.timeout(operationTimeout, Mono.error(new AmqpException(false, String.format(
.timeout(operationTimeout, Mono.error(new AmqpException(true, String.format(
"Connection '%s' not opened within AmqpRetryOptions.tryTimeout(): %s", connectionId,
operationTimeout), handler.getErrorContext())));
return activeEndpoint.thenReturn(reactorConnection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
/**
* Handles receiving events from Event Hubs service and translating them to proton-j messages.
*/
public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable {
public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable, AutoCloseable {
private final String entityPath;
private final Receiver receiver;
private final ReceiveLinkHandler handler;
Expand Down Expand Up @@ -225,6 +225,7 @@ public void dispose() {
close();
}

@Override
public void close() {
closeAsync().block(retryOptions.getTryTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
/**
* Handles scheduling and transmitting events through proton-j to Event Hubs service.
*/
class ReactorSender implements AmqpSendLink, AsyncAutoCloseable {
class ReactorSender implements AmqpSendLink, AsyncAutoCloseable, AutoCloseable {
private final String entityPath;
private final Sender sender;
private final SendLinkHandler handler;
Expand Down Expand Up @@ -337,6 +337,7 @@ public void dispose() {
close();
}

@Override
public void close() {
closeAsync().block(retryOptions.getTryTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ void createCBSNodeTimeoutException() throws IOException {
assertTrue(error instanceof AmqpException);

final AmqpException amqpException = (AmqpException) error;
assertFalse(amqpException.isTransient());
assertTrue(amqpException.isTransient());
assertNull(amqpException.getErrorCondition());

assertNotNull(amqpException.getMessage());
Expand Down Expand Up @@ -583,7 +583,7 @@ void cannotCreateResourcesOnFailure() {
Assertions.fail("Exception was not the correct type: " + error);
}

assertFalse(amqpException.isTransient());
assertTrue(amqpException.isTransient());
};

when(event.getTransport()).thenReturn(transport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Represents an session that is received when "any" session is accepted from the service.
*/
class ServiceBusSessionReceiver implements AsyncAutoCloseable {
class ServiceBusSessionReceiver implements AsyncAutoCloseable, AutoCloseable {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final LockContainer<OffsetDateTime> lockContainer;
private final AtomicReference<OffsetDateTime> sessionLockedUntil = new AtomicReference<>();
Expand Down Expand Up @@ -212,6 +212,7 @@ public Mono<Void> closeAsync() {
return receiveLink.closeAsync().doFinally(signal -> subscriptions.dispose());
}

@Override
public void close() {
closeAsync().block(retryOptions.getTryTimeout());
}
Expand Down

0 comments on commit 9d6deb6

Please sign in to comment.