Skip to content

Commit

Permalink
Update to executeBlocking API change
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jul 19, 2023
1 parent 9f5ec82 commit efab6dc
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 311 deletions.
26 changes: 8 additions & 18 deletions src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,13 @@ private Future<Void> start(ContextInternal ctx, int attempts) {
}

public void tryConnect(ContextInternal ctx,int attempts,Promise<Void> promise){
ctx.<Void>executeBlocking(exePromise -> {
ctx.<Void>executeBlocking(() -> {
try {
connect().onComplete(exePromise);
connect();
return null;
} catch (IOException | TimeoutException e) {
log.error("Could not connect to rabbitmq", e);
exePromise.fail(e);
throw e;
}
}).onSuccess(h->{
promise.complete();
Expand All @@ -557,13 +558,9 @@ public void tryConnect(ContextInternal ctx,int attempts,Promise<Void> promise){
@Override
public Future<Void> stop() {
log.info("Stopping rabbitmq client");
return vertx.executeBlocking(future -> {
try {
disconnect();
future.complete();
} catch (IOException e) {
future.fail(e);
}
return vertx.executeBlocking(() -> {
disconnect();
return null;
});
}

Expand All @@ -589,14 +586,7 @@ private <T> Future<T> forChannel(ChannelHandler<T> channelHandler) {
return ctx.failedFuture(e);
}
}
return vertx.executeBlocking(future -> {
try {
T t = channelHandler.handle(channel);
future.complete(t);
} catch (Throwable t) {
future.fail(t);
}
});
return vertx.executeBlocking(() -> channelHandler.handle(channel));
}

private Future<Void> connect() throws IOException, TimeoutException {
Expand Down
Loading

0 comments on commit efab6dc

Please sign in to comment.