diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 62a8a72901c..235349fce38 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -496,13 +496,7 @@ public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingComma public CompletableFuture invokeImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) { - String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); - doBeforeRpcHooks(channelRemoteAddr, request); - return invoke0(channel, request, timeoutMillis).whenComplete((v, t) -> { - if (t == null) { - doAfterRpcHooks(channelRemoteAddr, request, v.getResponseCommand()); - } - }); + return invoke0(channel, request, timeoutMillis); } protected CompletableFuture invoke0(final Channel channel, final RemotingCommand request, diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index f5157d03049..925c4f9cb2a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -804,6 +804,9 @@ public CompletableFuture invoke(String addr, RemotingCommand re public CompletableFuture invokeImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) { Stopwatch stopwatch = Stopwatch.createStarted(); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + doBeforeRpcHooks(channelRemoteAddr, request); + return super.invokeImpl(channel, request, timeoutMillis).thenCompose(responseFuture -> { RemotingCommand response = responseFuture.getResponseCommand(); if (response.getCode() == ResponseCode.GO_AWAY) { @@ -839,6 +842,10 @@ public CompletableFuture invokeImpl(final Channel channel, final } } return CompletableFuture.completedFuture(responseFuture); + }).whenComplete((v, t) -> { + if (t == null) { + doAfterRpcHooks(channelRemoteAddr, request, v.getResponseCommand()); + } }); }