Skip to content

Commit

Permalink
[ISSUE apache#7815] Use createChannelAsync for async invoke rpc (apac…
Browse files Browse the repository at this point in the history
…he#7816)

* async
  • Loading branch information
drpmma authored Feb 20, 2024
1 parent 6f37957 commit 3ee7bc2
Showing 1 changed file with 126 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -613,25 +613,33 @@ private void updateChannelLastResponseTime(final String addr) {
}
}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
private ChannelFuture getAndCreateChannelAsync(final String addr) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
return getAndCreateNameserverChannelAsync();
}

ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannelFuture();
}

return this.createChannel(addr);
return this.createChannelAsync(addr);
}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
ChannelFuture channelFuture = getAndCreateChannelAsync(addr);
if (channelFuture == null) {
return null;
}
return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel();
}

private Channel getAndCreateNameserverChannel() throws InterruptedException {
private ChannelFuture getAndCreateNameserverChannelAsync() throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannelFuture();
}
}

Expand All @@ -642,25 +650,19 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException {
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannelFuture();
}
}

if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);
LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);
LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
return this.createChannelAsync(newAddr);
}
} catch (Exception e) {
LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);
Expand All @@ -674,39 +676,23 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException {
return null;
}

private Channel createChannel(final String addr) throws InterruptedException {
private ChannelFuture createChannelAsync(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannelFuture();
}

if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {

if (cw.isOK()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
if (cw.isOK() || !cw.getChannelFuture().isDone()) {
return cw.getChannelFuture();
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}

if (createNewConnection) {
String[] hostAndPort = getHostAndPort(addr);
ChannelFuture channelFuture = fetchBootstrap(addr)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
this.channelWrapperTables.put(channelFuture.channel(), cw);
}
return createChannel(addr).getChannelFuture();
} catch (Exception e) {
LOGGER.error("createChannel: create channel exception", e);
} finally {
Expand All @@ -716,84 +702,104 @@ private Channel createChannel(final String addr) throws InterruptedException {
LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}

if (cw != null) {
return waitChannelFuture(addr, cw);
}

return null;
}

private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString());
}
} else {
LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
return null;
private ChannelWrapper createChannel(String addr) {
String[] hostAndPort = getHostAndPort(addr);
ChannelFuture channelFuture = fetchBootstrap(addr)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
this.channelWrapperTables.put(channelFuture.channel(), cw);
return cw;
}

@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
if (channelFuture == null) {
invokeCallback.operationFail(new RemotingConnectException(addr));
return;
}
channelFuture.addListener(future -> {
if (future.isSuccess()) {
Channel channel = channelFuture.channel();
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
invokeCallback.operationFail(new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"));
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
} else {
this.closeChannel(addr, channel);
invokeCallback.operationFail(new RemotingConnectException(addr));
}
} else {
invokeCallback.operationFail(new RemotingConnectException(addr));
}
});
}

@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(channelRemoteAddr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", channelRemoteAddr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
if (channelFuture == null) {
throw new RemotingConnectException(addr);
}
channelFuture.addListener(future -> {
if (future.isSuccess()) {
Channel channel = channelFuture.channel();
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
doBeforeRpcHooks(channelRemoteAddr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} else {
this.closeChannel(addr, channel);
}
}
});
}

@Override
public CompletableFuture<RemotingCommand> invoke(String addr, RemotingCommand request,
long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
return invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
}
}).thenApply(ResponseFuture::getResponseCommand);
} else {
this.closeChannel(addr, channel);
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
if (channelFuture == null) {
future.completeExceptionally(new RemotingConnectException(addr));
return future;
}
channelFuture.addListener(f -> {
if (f.isSuccess()) {
Channel channel = channelFuture.channel();
if (channel != null && channel.isActive()) {
invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
}
}).thenApply(ResponseFuture::getResponseCommand).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(v);
}
});
} else {
this.closeChannel(addr, channel);
future.completeExceptionally(new RemotingConnectException(addr));
}
} else {
future.completeExceptionally(new RemotingConnectException(addr));
}
});
} catch (Throwable t) {
future.completeExceptionally(t);
}
Expand Down Expand Up @@ -824,18 +830,37 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
});
if (channelWrapper != null) {
if (nettyClientConfig.isEnableTransparentRetry()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
retryRequest.setBody(request.getBody());
Channel retryChannel;
if (channelWrapper.isOK()) {
retryChannel = channelWrapper.getChannel();
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
Channel retryChannel = channelWrapper.getChannel();
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
} else {
retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper);
}
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
ChannelFuture channelFuture = channelWrapper.getChannelFuture();
channelFuture.addListener(f -> {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
if (f.isSuccess()) {
Channel retryChannel0 = channelFuture.channel();
if (retryChannel0 != null && channel != retryChannel0) {
super.invokeImpl(retryChannel0, retryRequest, timeoutMillis - duration).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(v);
}
});
}
} else {
future.completeExceptionally(new RemotingConnectException(channelWrapper.channelAddress));
}
});
return future;
}
}
}
Expand Down

0 comments on commit 3ee7bc2

Please sign in to comment.