From 571b373963787bde7d451694a57a3ced000fc8b3 Mon Sep 17 00:00:00 2001 From: jiangyuan Date: Fri, 1 Nov 2024 00:04:34 +0800 Subject: [PATCH] imporve connection manager (#17) * improve connection pool * improve connection manager * update reconnect --- pom.xml | 2 +- .../connection/AbstractConnectionManager.java | 38 +--- .../connection/ClientConnectionManager.java | 95 +++------- .../connection/ConnectionEventHandler.java | 7 +- .../remoting/connection/ConnectionHolder.java | 26 ++- .../connection/ConnectionManager.java | 16 +- .../connection/DefaultConnectionFactory.java | 3 - .../connection/DefaultReconnector.java | 102 +++++++++++ .../lab/remoting/connection/Reconnector.java | 16 ++ .../connection/ServerConnectionManager.java | 33 ++-- .../lab/remoting/rpc/client/RpcClient.java | 4 +- .../rpc/client/RpcClientRemoting.java | 55 ------ .../lab/remoting/rpc/client/RpcRemoting.java | 32 +++- .../lab/remoting/rpc/server/RpcServer.java | 5 +- .../rpc/server/RpcServerRemoting.java | 72 -------- .../ClientConnectionManagerTest.java | 172 +++++++----------- .../ConnectionEventHandlerTest.java | 36 ++-- .../ServerConnectionManagerTest.java | 19 +- .../lab/remoting/protocol/TestProtocol.java | 18 +- .../remoting/rpc/client/HeartBeatTest.java | 2 +- 20 files changed, 331 insertions(+), 422 deletions(-) create mode 100644 src/main/java/io/github/xinfra/lab/remoting/connection/DefaultReconnector.java create mode 100644 src/main/java/io/github/xinfra/lab/remoting/connection/Reconnector.java delete mode 100644 src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClientRemoting.java delete mode 100644 src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServerRemoting.java diff --git a/pom.xml b/pom.xml index 10933f6..cba2fa0 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ io.github.x-infra-lab x-remoting - 0.0.1 + 0.0.2-RC1 jar x-remoting diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/AbstractConnectionManager.java b/src/main/java/io/github/xinfra/lab/remoting/connection/AbstractConnectionManager.java index c7d6ecf..6573191 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/AbstractConnectionManager.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/AbstractConnectionManager.java @@ -29,27 +29,13 @@ public AbstractConnectionManager(ConnectionManagerConfig config) { this.config = config; } - @Override - public synchronized Connection getOrCreateIfAbsent(SocketAddress socketAddress) throws RemotingException { - ensureStarted(); - Validate.notNull(socketAddress, "socketAddress can not be null"); - - ConnectionHolder connectionHolder = connections.get(socketAddress); - if (connectionHolder == null) { - connectionHolder = createConnectionHolder(socketAddress); - createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint()); - } - - return connectionHolder.get(); - } - @Override public void check(Connection connection) throws RemotingException { ensureStarted(); Validate.notNull(connection, "connection can not be null"); if (connection.getChannel() == null || !connection.getChannel().isActive()) { - this.removeAndClose(connection); + this.close(connection); throw new RemotingException("Check connection failed for address: " + connection.remoteAddress()); } if (!connection.getChannel().isWritable()) { @@ -60,7 +46,7 @@ public void check(Connection connection) throws RemotingException { } @Override - public synchronized void removeAndClose(Connection connection) { + public synchronized void close(Connection connection) { ensureStarted(); Validate.notNull(connection, "connection can not be null"); @@ -70,25 +56,13 @@ public synchronized void removeAndClose(Connection connection) { connection.close(); } else { - connectionHolder.removeAndClose(connection); + connectionHolder.invalidate(connection); if (connectionHolder.isEmpty()) { connections.remove(socketAddress); } } } - @Override - public Connection get(SocketAddress socketAddress) { - ensureStarted(); - Validate.notNull(socketAddress, "socketAddress can not be null"); - - ConnectionHolder connectionHolder = connections.get(socketAddress); - if (connectionHolder == null) { - return null; - } - return connectionHolder.get(); - } - @Override public synchronized void add(Connection connection) { ensureStarted(); @@ -111,7 +85,7 @@ public synchronized void shutdown() { for (Map.Entry entry : connections.entrySet()) { SocketAddress socketAddress = entry.getKey(); ConnectionHolder connectionHolder = entry.getValue(); - connectionHolder.removeAndCloseAll(); + connectionHolder.close(); connections.remove(socketAddress); } @@ -125,10 +99,10 @@ protected ConnectionHolder createConnectionHolder(SocketAddress socketAddress) { protected void createConnectionForHolder(SocketAddress socketAddress, ConnectionHolder connectionHolder, int size) throws RemotingException { - for (int i = 0; i < size; i++) { + for (int i = connectionHolder.size(); i < size; i++) { Connection connection = connectionFactory.create(socketAddress); connectionHolder.add(connection); } } -} +} \ No newline at end of file diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManager.java b/src/main/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManager.java index 5a9e0bb..aa16e7f 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManager.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManager.java @@ -1,32 +1,22 @@ package io.github.xinfra.lab.remoting.connection; -import io.github.xinfra.lab.remoting.common.NamedThreadFactory; +import io.github.xinfra.lab.remoting.annotation.AccessForTest; import io.github.xinfra.lab.remoting.exception.RemotingException; import io.github.xinfra.lab.remoting.protocol.Protocol; import io.netty.channel.ChannelHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.Validate; import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @Slf4j public class ClientConnectionManager extends AbstractConnectionManager { - private ExecutorService reconnector = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(1024), new NamedThreadFactory("Reconnector-Worker")); - - private Set disableReconnectSocketAddresses = new CopyOnWriteArraySet<>(); + @AccessForTest + protected Reconnector reconnector; public ClientConnectionManager(Protocol protocol) { this.connectionFactory = new DefaultConnectionFactory(protocol, defaultChannelSuppliers()); @@ -64,78 +54,51 @@ private List> defaultChannelSuppliers() { } @Override - public synchronized void shutdown() { - for (SocketAddress socketAddress : connections.keySet()) { - disableReconnect(socketAddress); + public Connection connect(SocketAddress socketAddress) throws RemotingException { + ConnectionHolder connectionHolder = connections.get(socketAddress); + if (connectionHolder == null) { + connectionHolder = createConnectionHolder(socketAddress); } - super.shutdown(); + createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint()); - reconnector.shutdownNow(); + return connectionHolder.get(); } @Override - public synchronized void reconnect(SocketAddress socketAddress) throws RemotingException { + public synchronized Connection get(SocketAddress socketAddress) throws RemotingException { ensureStarted(); - if (disableReconnectSocketAddresses.contains(socketAddress)) { - log.warn("socketAddress:{} is disable to reconnect", socketAddress); - throw new RemotingException("socketAddress is disable to reconnect:" + socketAddress); - } + Validate.notNull(socketAddress, "socketAddress can not be null"); + ConnectionHolder connectionHolder = connections.get(socketAddress); if (connectionHolder == null) { - connectionHolder = createConnectionHolder(socketAddress); - createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint()); - } - else { - int needCreateNum = config.getConnectionNumPreEndpoint() - connectionHolder.size(); - if (needCreateNum > 0) { - createConnectionForHolder(socketAddress, connectionHolder, needCreateNum); - } + return connect(socketAddress); } + + return connectionHolder.get(); } @Override - public synchronized void disableReconnect(SocketAddress socketAddress) { - ensureStarted(); - disableReconnectSocketAddresses.add(socketAddress); + public Reconnector reconnector() { + return reconnector; } @Override - public synchronized void enableReconnect(SocketAddress socketAddress) { - ensureStarted(); - disableReconnectSocketAddresses.remove(socketAddress); + public void startup() { + super.startup(); + reconnector = new DefaultReconnector(this); + reconnector.startup(); } @Override - public synchronized Future asyncReconnect(SocketAddress socketAddress) { - ensureStarted(); - if (disableReconnectSocketAddresses.contains(socketAddress)) { - log.warn("socketAddress:{} is disable to asyncReconnect", socketAddress); - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally( - new RemotingException("socketAddress is disable to asyncReconnect:" + socketAddress)); - return future; - } - - Callable callable = new Callable() { - @Override - public Void call() throws Exception { - try { - reconnect(socketAddress); - } - catch (Exception e) { - log.warn("reconnect socketAddress:{} fail", socketAddress, e); - throw e; - } - return null; + public synchronized void shutdown() { + if (reconnector() != null) { + for (SocketAddress socketAddress : connections.keySet()) { + reconnector().disableReconnect(socketAddress); } - }; - try { - return reconnector.submit(callable); - } - catch (Throwable t) { - log.warn("asyncReconnect submit failed.", t); - throw t; + super.shutdown(); + + reconnector().shutdown(); } } diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandler.java b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandler.java index c2800f9..eda2b25 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandler.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandler.java @@ -45,7 +45,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception { Connection connection = ctx.channel().attr(CONNECTION).get(); if (connectionManager != null && connectionManager.isStarted()) { - connectionManager.removeAndClose(connection); + connectionManager.close(connection); } userEventTriggered(ctx, ConnectionEvent.CLOSE); @@ -58,8 +58,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc Connection connection = ctx.channel().attr(CONNECTION).get(); ConnectionEvent connectionEvent = (ConnectionEvent) evt; if (connectionEvent == ConnectionEvent.CLOSE) { - if (connectionManager != null && connectionManager.isStarted()) { - connectionManager.asyncReconnect(connection.remoteAddress()); + if (connectionManager != null && connectionManager.isStarted() + && connectionManager.reconnector() != null) { + connectionManager.reconnector().reconnect(connection.remoteAddress()); } } } diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionHolder.java b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionHolder.java index f261cf8..e3d48d3 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionHolder.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionHolder.java @@ -2,9 +2,12 @@ import io.github.xinfra.lab.remoting.annotation.AccessForTest; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -public class ConnectionHolder { +public class ConnectionHolder implements Closeable { @AccessForTest protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList<>(); @@ -16,14 +19,18 @@ public ConnectionHolder(ConnectionSelectStrategy connectionSelectStrategy) { } public Connection get() { - return connectionSelectStrategy.select(connections); + List snapshot = new ArrayList<>(connections); + if (snapshot.size() > 0) { + return connectionSelectStrategy.select(new ArrayList<>(connections)); + } + return null; } public void add(Connection connection) { - connections.add(connection); + connections.addIfAbsent(connection); } - public void removeAndClose(Connection connection) { + public void invalidate(Connection connection) { connections.remove(connection); connection.close(); } @@ -32,15 +39,16 @@ public boolean isEmpty() { return connections.isEmpty(); } - public void removeAndCloseAll() { + public int size() { + return connections.size(); + } + + @Override + public void close() { for (Connection connection : connections) { connections.remove(connection); connection.close(); } } - public int size() { - return connections.size(); - } - } diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionManager.java b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionManager.java index 08a11b1..b0683c4 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionManager.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionManager.java @@ -2,29 +2,21 @@ import io.github.xinfra.lab.remoting.common.LifeCycle; import io.github.xinfra.lab.remoting.exception.RemotingException; -import io.github.xinfra.lab.remoting.protocol.Protocol; import java.net.SocketAddress; -import java.util.concurrent.Future; public interface ConnectionManager extends LifeCycle { - Connection getOrCreateIfAbsent(SocketAddress socketAddress) throws RemotingException; + Connection connect(SocketAddress socketAddress) throws RemotingException; - Connection get(SocketAddress socketAddress); + Connection get(SocketAddress socketAddress) throws RemotingException; void check(Connection connection) throws RemotingException; - void removeAndClose(Connection connection); + void close(Connection connection); void add(Connection connection); - void reconnect(SocketAddress socketAddress) throws RemotingException; - - void disableReconnect(SocketAddress socketAddress); - - void enableReconnect(SocketAddress socketAddress); - - Future asyncReconnect(SocketAddress socketAddress); + Reconnector reconnector(); } diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionFactory.java b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionFactory.java index b264183..3adc1df 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionFactory.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionFactory.java @@ -48,7 +48,6 @@ public class DefaultConnectionFactory implements ConnectionFactory { /** * Q: why use Supplier to get ChannelHandler? A: some ChannelHandler is * not @ChannelHandler.Sharable. need create instance every time - * @param channelHandlerSuppliers */ public DefaultConnectionFactory(Protocol protocol, List> channelHandlerSuppliers) { this(protocol, channelHandlerSuppliers, new ConnectionConfig()); @@ -57,8 +56,6 @@ public DefaultConnectionFactory(Protocol protocol, List /** * Q: why use Supplier to get ChannelHandler? A: some ChannelHandler is * not @ChannelHandler.Sharable. need create instance every time - * @param channelHandlerSuppliers - * @param connectionConfig */ public DefaultConnectionFactory(Protocol protocol, List> channelHandlerSuppliers, ConnectionConfig connectionConfig) { diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultReconnector.java b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultReconnector.java new file mode 100644 index 0000000..f9332dc --- /dev/null +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultReconnector.java @@ -0,0 +1,102 @@ +package io.github.xinfra.lab.remoting.connection; + +import io.github.xinfra.lab.remoting.annotation.AccessForTest; +import io.github.xinfra.lab.remoting.common.AbstractLifeCycle; +import io.github.xinfra.lab.remoting.common.NamedThreadFactory; +import io.github.xinfra.lab.remoting.exception.RemotingException; +import lombok.extern.slf4j.Slf4j; + +import java.net.SocketAddress; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class DefaultReconnector extends AbstractLifeCycle implements Reconnector { + + private Set disabledAddresses = new CopyOnWriteArraySet<>(); + + @AccessForTest + protected LinkedBlockingQueue reconnectAddressQueue = new LinkedBlockingQueue<>(); + + @AccessForTest + protected ConnectionManager connectionManager; + + private final Thread reconeectThread = new NamedThreadFactory("reconnect-thread").newThread(new ReconnectTask()); + + public DefaultReconnector(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + + @Override + public void startup() { + super.startup(); + reconeectThread.start(); + } + + @Override + public void shutdown() { + super.shutdown(); + reconeectThread.interrupt(); + disabledAddresses.clear(); + reconnectAddressQueue.clear(); + } + + @Override + public synchronized void reconnect(SocketAddress socketAddress) throws RemotingException { + ensureStarted(); + reconnectAddressQueue.add(socketAddress); + } + + @Override + public synchronized void disableReconnect(SocketAddress socketAddress) { + ensureStarted(); + disabledAddresses.add(socketAddress); + } + + @Override + public synchronized void enableReconnect(SocketAddress socketAddress) { + ensureStarted(); + disabledAddresses.remove(socketAddress); + } + + class ReconnectTask implements Runnable { + + @Override + public void run() { + while (isStarted()) { + SocketAddress socketAddress = null; + try { + socketAddress = reconnectAddressQueue.take(); + } + catch (InterruptedException e) { + break; + } + + if (disabledAddresses.contains(socketAddress)) { + log.warn("reconnect to {} has been disabled", socketAddress); + } + else { + try { + connectionManager.connect(socketAddress); + } + catch (Throwable e) { + log.warn("reconnect {} fail.", socketAddress); + reconnectAddressQueue.add(socketAddress); + } + } + + try { + TimeUnit.SECONDS.sleep(1); + } + catch (InterruptedException e) { + break; + } + + } + } + + } + +} diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/Reconnector.java b/src/main/java/io/github/xinfra/lab/remoting/connection/Reconnector.java new file mode 100644 index 0000000..2832a2c --- /dev/null +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/Reconnector.java @@ -0,0 +1,16 @@ +package io.github.xinfra.lab.remoting.connection; + +import io.github.xinfra.lab.remoting.common.LifeCycle; +import io.github.xinfra.lab.remoting.exception.RemotingException; + +import java.net.SocketAddress; + +public interface Reconnector extends LifeCycle { + + void disableReconnect(SocketAddress socketAddress); + + void enableReconnect(SocketAddress socketAddress); + + void reconnect(SocketAddress socketAddress) throws RemotingException; + +} diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManager.java b/src/main/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManager.java index 1674f53..304065e 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManager.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManager.java @@ -1,33 +1,32 @@ package io.github.xinfra.lab.remoting.connection; +import io.github.xinfra.lab.remoting.exception.RemotingException; +import org.apache.commons.lang3.Validate; + import java.net.SocketAddress; -import java.util.concurrent.Future; public class ServerConnectionManager extends AbstractConnectionManager { @Override - public Connection getOrCreateIfAbsent(SocketAddress socketAddress) { - throw new UnsupportedOperationException(); - } - - @Override - public void reconnect(SocketAddress socketAddress) { - throw new UnsupportedOperationException(); - } - - @Override - public void disableReconnect(SocketAddress socketAddress) { - throw new UnsupportedOperationException(); + public Connection connect(SocketAddress socketAddress) throws RemotingException { + throw new UnsupportedOperationException("ServerConnectionManager not support connect"); } @Override - public void enableReconnect(SocketAddress socketAddress) { - throw new UnsupportedOperationException(); + public Connection get(SocketAddress socketAddress) { + ensureStarted(); + Validate.notNull(socketAddress, "socketAddress can not be null"); + + ConnectionHolder connectionHolder = connections.get(socketAddress); + if (connectionHolder == null) { + return null; + } + return connectionHolder.get(); } @Override - public Future asyncReconnect(SocketAddress socketAddress) { - throw new UnsupportedOperationException(); + public Reconnector reconnector() { + return null; } } diff --git a/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClient.java b/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClient.java index d879e10..05df04b 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClient.java +++ b/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClient.java @@ -22,7 +22,7 @@ public class RpcClient extends AbstractLifeCycle { @Getter private RpcProtocol protocol; - private RpcClientRemoting rpcClientRemoting; + private RpcRemoting rpcClientRemoting; @Getter private ClientConnectionManager connectionManager; @@ -46,7 +46,7 @@ else if (connectionManagerConfig != null) { this.connectionManager = new ClientConnectionManager(protocol); } - this.rpcClientRemoting = new RpcClientRemoting(protocol, connectionManager); + this.rpcClientRemoting = new RpcRemoting(protocol, connectionManager); } public RpcClient() { diff --git a/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClientRemoting.java b/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClientRemoting.java deleted file mode 100644 index e3f8857..0000000 --- a/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcClientRemoting.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.github.xinfra.lab.remoting.rpc.client; - -import io.github.xinfra.lab.remoting.connection.ClientConnectionManager; -import io.github.xinfra.lab.remoting.connection.Connection; -import io.github.xinfra.lab.remoting.connection.ConnectionManager; -import io.github.xinfra.lab.remoting.exception.RemotingException; -import io.github.xinfra.lab.remoting.rpc.RpcProtocol; - -import java.net.SocketAddress; - -public class RpcClientRemoting extends RpcRemoting { - - protected ConnectionManager connectionManager; - - public RpcClientRemoting(RpcProtocol protocol, ClientConnectionManager connectionManager) { - super(protocol); - this.connectionManager = connectionManager; - } - - public R syncCall(Object request, SocketAddress socketAddress, int timeoutMills) - throws InterruptedException, RemotingException { - - Connection connection = connectionManager.getOrCreateIfAbsent(socketAddress); - connectionManager.check(connection); - - return this.syncCall(request, connection, timeoutMills); - } - - public RpcInvokeFuture asyncCall(Object request, SocketAddress socketAddress, int timeoutMills) - throws RemotingException { - - Connection connection = connectionManager.getOrCreateIfAbsent(socketAddress); - connectionManager.check(connection); - - return super.asyncCall(request, connection, timeoutMills); - } - - public void asyncCall(Object request, SocketAddress socketAddress, int timeoutMills, - RpcInvokeCallBack rpcInvokeCallBack) throws RemotingException { - - Connection connection = connectionManager.getOrCreateIfAbsent(socketAddress); - connectionManager.check(connection); - - super.asyncCall(request, connection, timeoutMills, rpcInvokeCallBack); - } - - public void oneway(Object request, SocketAddress socketAddress) throws RemotingException { - - Connection connection = connectionManager.getOrCreateIfAbsent(socketAddress); - connectionManager.check(connection); - - super.oneway(request, connection); - } - -} diff --git a/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcRemoting.java b/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcRemoting.java index e94f35d..2eaa77f 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcRemoting.java +++ b/src/main/java/io/github/xinfra/lab/remoting/rpc/client/RpcRemoting.java @@ -3,6 +3,7 @@ import io.github.xinfra.lab.remoting.client.BaseRemoting; import io.github.xinfra.lab.remoting.client.InvokeFuture; import io.github.xinfra.lab.remoting.connection.Connection; +import io.github.xinfra.lab.remoting.connection.ConnectionManager; import io.github.xinfra.lab.remoting.exception.RemotingException; import io.github.xinfra.lab.remoting.exception.SerializeException; import io.github.xinfra.lab.remoting.protocol.Protocol; @@ -10,14 +11,23 @@ import io.github.xinfra.lab.remoting.rpc.message.RpcResponseMessage; import io.github.xinfra.lab.remoting.rpc.message.RpcResponses; +import java.net.SocketAddress; + public class RpcRemoting extends BaseRemoting { - public RpcRemoting(Protocol protocol) { + protected ConnectionManager connectionManager; + + public RpcRemoting(Protocol protocol, ConnectionManager connectionManager) { super(protocol); + this.connectionManager = connectionManager; } - public R syncCall(Object request, Connection connection, int timeoutMills) + public R syncCall(Object request, SocketAddress socketAddress, int timeoutMills) throws InterruptedException, RemotingException { + + Connection connection = connectionManager.get(socketAddress); + connectionManager.check(connection); + RpcRequestMessage requestMessage = buildRequestMessage(request); RpcResponseMessage responseMessage = (RpcResponseMessage) super.syncCall(requestMessage, connection, @@ -25,22 +35,34 @@ public R syncCall(Object request, Connection connection, int timeoutMills) return RpcResponses.getResponseObject(responseMessage); } - public RpcInvokeFuture asyncCall(Object request, Connection connection, int timeoutMills) + public RpcInvokeFuture asyncCall(Object request, SocketAddress socketAddress, int timeoutMills) throws RemotingException { + + Connection connection = connectionManager.get(socketAddress); + connectionManager.check(connection); + RpcRequestMessage requestMessage = buildRequestMessage(request); InvokeFuture invokeFuture = super.asyncCall(requestMessage, connection, timeoutMills); return new RpcInvokeFuture(invokeFuture); } - public void asyncCall(Object request, Connection connection, int timeoutMills, + public void asyncCall(Object request, SocketAddress socketAddress, int timeoutMills, RpcInvokeCallBack rpcInvokeCallBack) throws RemotingException { + + Connection connection = connectionManager.get(socketAddress); + connectionManager.check(connection); + RpcRequestMessage requestMessage = buildRequestMessage(request); super.asyncCall(requestMessage, connection, timeoutMills, rpcInvokeCallBack); } - public void oneway(Object request, Connection connection) throws RemotingException { + public void oneway(Object request, SocketAddress socketAddress) throws RemotingException { + + Connection connection = connectionManager.get(socketAddress); + connectionManager.check(connection); + RpcRequestMessage requestMessage = buildRequestMessage(request); super.oneway(requestMessage, connection); diff --git a/src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServer.java b/src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServer.java index af471c2..17b129c 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServer.java +++ b/src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServer.java @@ -5,6 +5,7 @@ import io.github.xinfra.lab.remoting.rpc.RpcProtocol; import io.github.xinfra.lab.remoting.rpc.client.RpcInvokeCallBack; import io.github.xinfra.lab.remoting.rpc.client.RpcInvokeFuture; +import io.github.xinfra.lab.remoting.rpc.client.RpcRemoting; import io.github.xinfra.lab.remoting.server.BaseRemotingServer; import lombok.Getter; @@ -17,7 +18,7 @@ public class RpcServer extends BaseRemotingServer { private RpcProtocol protocol; @Getter - private RpcServerRemoting rpcServerRemoting; + private RpcRemoting rpcServerRemoting; public RpcServer() { super(new RpcServerConfig()); @@ -31,7 +32,7 @@ public RpcServer(RpcServerConfig config) { public void startup() { super.startup(); protocol = new RpcProtocol(); - rpcServerRemoting = new RpcServerRemoting(protocol, connectionManager); + rpcServerRemoting = new RpcRemoting(protocol, connectionManager); } @Override diff --git a/src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServerRemoting.java b/src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServerRemoting.java deleted file mode 100644 index 3abd8e2..0000000 --- a/src/main/java/io/github/xinfra/lab/remoting/rpc/server/RpcServerRemoting.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.github.xinfra.lab.remoting.rpc.server; - -import io.github.xinfra.lab.remoting.connection.Connection; -import io.github.xinfra.lab.remoting.connection.ConnectionManager; -import io.github.xinfra.lab.remoting.connection.ServerConnectionManager; -import io.github.xinfra.lab.remoting.exception.RemotingException; -import io.github.xinfra.lab.remoting.rpc.RpcProtocol; -import io.github.xinfra.lab.remoting.rpc.client.RpcInvokeCallBack; -import io.github.xinfra.lab.remoting.rpc.client.RpcInvokeFuture; -import io.github.xinfra.lab.remoting.rpc.client.RpcRemoting; - -import java.net.SocketAddress; - -public class RpcServerRemoting extends RpcRemoting { - - private ConnectionManager connectionManager; - - public RpcServerRemoting(RpcProtocol protocol, ServerConnectionManager connectionManager) { - super(protocol); - this.connectionManager = connectionManager; - } - - public R syncCall(Object request, SocketAddress socketAddress, int timeoutMills) - throws InterruptedException, RemotingException { - - Connection connection = connectionManager.get(socketAddress); - if (null == connection) { - throw new RemotingException("Client address [" + socketAddress + "] not connected yet!"); - } - connectionManager.check(connection); - - return this.syncCall(request, connection, timeoutMills); - } - - public RpcInvokeFuture asyncCall(Object request, SocketAddress socketAddress, int timeoutMills) - throws RemotingException { - - Connection connection = connectionManager.get(socketAddress); - if (null == connection) { - throw new RemotingException("Client address [" + socketAddress + "] not connected yet!"); - } - - connectionManager.check(connection); - - return super.asyncCall(request, connection, timeoutMills); - } - - public void asyncCall(Object request, SocketAddress socketAddress, int timeoutMills, - RpcInvokeCallBack rpcInvokeCallBack) throws RemotingException { - - Connection connection = connectionManager.get(socketAddress); - if (null == connection) { - throw new RemotingException("Client address [" + socketAddress + "] not connected yet!"); - } - connectionManager.check(connection); - - super.asyncCall(request, connection, timeoutMills, rpcInvokeCallBack); - } - - public void oneway(Object request, SocketAddress socketAddress) throws RemotingException { - - Connection connection = connectionManager.get(socketAddress); - if (null == connection) { - throw new RemotingException("Client address [" + socketAddress + "] not connected yet!"); - } - - connectionManager.check(connection); - - super.oneway(request, connection); - } - -} diff --git a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java index f2be76a..31a604c 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java +++ b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java @@ -7,7 +7,6 @@ import io.github.xinfra.lab.remoting.protocol.TestProtocol; import io.netty.channel.Channel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -15,14 +14,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.mockito.ArgumentMatchers.eq; @@ -78,39 +75,16 @@ public void testNewInstance() { } - @Test - public void testGetOrCreateIfAbsent() throws RemotingException { - InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Connection connection1 = connectionManager.getOrCreateIfAbsent(address); - Assertions.assertNotNull(connection1); - - Connection connection2 = connectionManager.getOrCreateIfAbsent(address); - Assertions.assertTrue(connection1 == connection2); - } - - @Test - public void testGetOrCreateIfAbsentFail() { - // invalid socketAddress - InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort + 1); - Assertions.assertThrows(RemotingException.class, () -> { - connectionManager.getOrCreateIfAbsent(address); - }); - } - @Test public void testGet() throws RemotingException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - // no connection - Connection connection1 = connectionManager.get(address); - Assertions.assertNull(connection1); - // create connection - Connection connection2 = connectionManager.getOrCreateIfAbsent(address); - Assertions.assertNotNull(connection2); + Connection connection1 = connectionManager.get(address); + Assertions.assertNotNull(connection1); - connection1 = connectionManager.get(address); + Connection connection2 = connectionManager.get(address); Assertions.assertNotNull(connection1); Assertions.assertTrue(connection1 == connection2); @@ -121,17 +95,11 @@ public void testGetFail() throws RemotingException { // invalid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort + 1); - // no connection - Connection connection1 = connectionManager.get(address); - Assertions.assertNull(connection1); - // fail create connection Assertions.assertThrows(RemotingException.class, () -> { - connectionManager.getOrCreateIfAbsent(address); + connectionManager.get(address); }); - connection1 = connectionManager.get(address); - Assertions.assertNull(connection1); } @Test @@ -142,7 +110,7 @@ public void testCheck() throws RemotingException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(address); + Connection connection = connectionManager.get(address); connectionManager.check(connection); } @@ -150,7 +118,7 @@ public void testCheck() throws RemotingException { public void testCheckWritable() throws RemotingException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(address); + Connection connection = connectionManager.get(address); connectionManager.check(connection); // mock @@ -175,7 +143,7 @@ public void testCheckWritable() throws RemotingException { public void testCheckActive() throws RemotingException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(address); + Connection connection = connectionManager.get(address); connectionManager.check(connection); // mock @@ -187,58 +155,54 @@ public void testCheckActive() throws RemotingException { ConnectionManager spyConnectionManager = spy(connectionManager); - connectionManager.disableReconnect(address); + connectionManager.reconnector().disableReconnect(address); Assertions.assertThrows(RemotingException.class, () -> { spyConnectionManager.check(spyConnection); }); - verify(spyConnectionManager, times(1)).removeAndClose(eq(spyConnection)); + verify(spyConnectionManager, times(1)).close(eq(spyConnection)); } @Test - public void testRemoveAndClose() throws RemotingException { + public void testCloseConnection() throws RemotingException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(address); + Connection connection = connectionManager.get(address); - connectionManager.disableReconnect(address); - connectionManager.removeAndClose(connection); + connectionManager.reconnector().disableReconnect(address); + connectionManager.close(connection); Assertions.assertNull(((ClientConnectionManager) connectionManager).connections.get(address)); - Assertions.assertNull(connectionManager.get(address)); - // removeAndClose again - connectionManager.removeAndClose(connection); - - Connection mockConnection = mock(Connection.class); - // invalid - InetSocketAddress invalidAddress = new InetSocketAddress(remoteAddress, serverPort + 1); - Assertions.assertNull(connectionManager.get(invalidAddress)); - doReturn(invalidAddress).when(mockConnection).remoteAddress(); - connectionManager.removeAndClose(mockConnection); - verify(mockConnection, times(1)).close(); - connectionManager.removeAndClose(mockConnection); - verify(mockConnection, times(2)).close(); + // close again + connectionManager.close(connection); } @Test - public void testReconnect1() throws RemotingException { + public void testReconnect1() throws RemotingException, InterruptedException, TimeoutException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(address); + Connection connection = connectionManager.get(address); Assertions.assertNotNull(connection); Map connections = ((ClientConnectionManager) connectionManager).connections; Assertions.assertTrue(connections.containsKey(address)); - connections.remove(address); + connectionManager.close(connection); + + Assertions.assertTrue(!connections.containsKey(address)); + + connectionManager.reconnector().reconnect(address); + Wait.untilIsTrue(() -> { + if (connections.containsKey(address)) { + return true; + } + return false; + }, 30, 100); - connectionManager.reconnect(address); Assertions.assertTrue(connections.containsKey(address)); - Connection connection1 = connectionManager.get(address); - Assertions.assertNotNull(connection1); } @Test - public void testReconnect2() throws RemotingException { + public void testReconnect2() throws RemotingException, InterruptedException, TimeoutException { int numPreEndpoint = 3; ConnectionManagerConfig connectionManagerConfig = new ConnectionManagerConfig(); connectionManagerConfig.setConnectionNumPreEndpoint(numPreEndpoint); @@ -247,82 +211,72 @@ public void testReconnect2() throws RemotingException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(address); + Connection connection = connectionManager.get(address); Map connections = ((ClientConnectionManager) connectionManager).connections; ConnectionHolder connectionHolder = connections.get(address); Assertions.assertEquals(connectionHolder.size(), numPreEndpoint); - connectionHolder.connections.remove(connection); + connectionManager.close(connection); Assertions.assertEquals(connectionHolder.size(), numPreEndpoint - 1); - connectionManager.reconnect(address); + connectionManager.reconnector().reconnect(address); + Wait.untilIsTrue(() -> { + if (Objects.equals(connectionHolder.size(), numPreEndpoint)) { + return true; + } + return false; + }, 30, 100); + Assertions.assertEquals(connectionHolder.size(), numPreEndpoint); } @Test - public void testAsyncReconnect1() throws ExecutionException, InterruptedException, RemotingException { + public void testReconnect3() throws InterruptedException, RemotingException, TimeoutException { // valid socketAddress InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Map connections = ((ClientConnectionManager) connectionManager).connections; - connectionManager = spy(connectionManager); - Future future = connectionManager.asyncReconnect(address); - future.get(); - - verify(connectionManager, times(1)).reconnect(eq(address)); - - Assertions.assertTrue(connections.containsKey(address)); - Connection connection = connectionManager.get(address); - Assertions.assertNotNull(connection); - } + Reconnector reconnector = connectionManager.reconnector(); - @Test - public void testAsyncReconnect2() - throws InterruptedException, RemotingException, TimeoutException, UnknownHostException { - // valid socketAddress - InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); - Map connections = ((ClientConnectionManager) connectionManager).connections; + ConnectionManager spyConnectionManager = spy(connectionManager); + ((DefaultReconnector) reconnector).connectionManager = spyConnectionManager; - Connection connection = connectionManager.getOrCreateIfAbsent(address); - connectionManager.removeAndClose(connection); Assertions.assertTrue(!connections.containsKey(address)); + reconnector.reconnect(address); + Wait.untilIsTrue(() -> { - ConnectionHolder connectionHolder = connections.get(address); - if (connectionHolder != null && connectionHolder.get() != null) { + if (connections.containsKey(address)) { return true; } return false; - }, 100, 30); + }, 30, 100); - Assertions.assertTrue(connections.containsKey(address)); - connection = connectionManager.get(address); - Assertions.assertNotNull(connection); + verify(spyConnectionManager, times(1)).connect(eq(address)); } @Test void testDisableReconnect() throws RemotingException, ExecutionException, InterruptedException, TimeoutException { InetSocketAddress address = new InetSocketAddress(remoteAddress, serverPort); + Reconnector reconnector = connectionManager.reconnector(); + Map connections = ((ClientConnectionManager) connectionManager).connections; - connectionManager.reconnect(address); - connectionManager.asyncReconnect(address).get(3, TimeUnit.SECONDS); - - connectionManager.disableReconnect(address); - Assertions.assertThrowsExactly(RemotingException.class, () -> { - connectionManager.reconnect(address); - }); + reconnector.disableReconnect(address); + reconnector.reconnect(address); - ExecutionException executionException = Assertions.assertThrows(ExecutionException.class, () -> { - connectionManager.asyncReconnect(address).get(3, TimeUnit.SECONDS); - }); - Assertions.assertTrue(executionException.getCause() instanceof RemotingException); + Wait.untilIsTrue(() -> { + return ((DefaultReconnector) reconnector).reconnectAddressQueue.isEmpty() + && !connections.containsKey(address); + }, 100, 30); - connectionManager.enableReconnect(address); + reconnector.enableReconnect(address); + reconnector.reconnect(address); - connectionManager.reconnect(address); - connectionManager.asyncReconnect(address).get(3, TimeUnit.SECONDS); + Wait.untilIsTrue(() -> { + return ((DefaultReconnector) reconnector).reconnectAddressQueue.isEmpty() + && connections.containsKey(address); + }, 100, 30); } } diff --git a/src/test/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandlerTest.java b/src/test/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandlerTest.java index 0b02b05..c40b54b 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandlerTest.java +++ b/src/test/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandlerTest.java @@ -117,10 +117,13 @@ public void testChannelClose_withConnectionManager() throws Exception { connectionManager = spy(connectionManager); InetSocketAddress socketAddress = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(socketAddress); + Connection connection = connectionManager.get(socketAddress); connection = spy(connection); connection.getChannel().attr(CONNECTION).set(connection); + Reconnector reconnector = spy(connectionManager.reconnector()); + ((ClientConnectionManager) connectionManager).reconnector = reconnector; + ConnectionEventHandler connectionEventHandler = new ConnectionEventHandler(connectionManager); connectionEventHandler = spy(connectionEventHandler); @@ -133,10 +136,9 @@ public void testChannelClose_withConnectionManager() throws Exception { channelFuture.await(); Assertions.assertTrue(channelFuture.isDone()); - ConnectionManager tempConnectionManager = connectionManager; Wait.untilIsTrue(() -> { try { - verify(tempConnectionManager, times(1)).reconnect(eq(socketAddress)); + verify(reconnector, times(1)).reconnect(eq(socketAddress)); return true; } catch (Throwable e) { @@ -147,10 +149,10 @@ public void testChannelClose_withConnectionManager() throws Exception { verify(connectionEventHandler, times(1)).close(any(), any()); verify(connection, times(1)).onClose(); verify(connectionEventHandler, times(1)).channelInactive(any()); - verify(connectionManager, times(1)).removeAndClose(eq(connection)); + verify(connectionManager, times(1)).close(eq(connection)); verify(connectionEventHandler, times(1)).userEventTriggered(any(), eq(ConnectionEvent.CLOSE)); - verify(connectionManager, times(1)).asyncReconnect(eq(socketAddress)); + verify(reconnector, times(1)).reconnect(eq(socketAddress)); connectionManager.shutdown(); } @@ -162,10 +164,13 @@ public void testChannelInactive_withConnectionManager() throws Exception { connectionManager = spy(connectionManager); InetSocketAddress socketAddress = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(socketAddress); + Connection connection = connectionManager.get(socketAddress); connection = spy(connection); connection.getChannel().attr(CONNECTION).set(connection); + Reconnector reconnector = spy(connectionManager.reconnector()); + ((ClientConnectionManager) connectionManager).reconnector = reconnector; + ConnectionEventHandler connectionEventHandler = new ConnectionEventHandler(connectionManager); connectionEventHandler = spy(connectionEventHandler); @@ -177,10 +182,9 @@ public void testChannelInactive_withConnectionManager() throws Exception { ChannelFuture channelFuture = connection.getChannel().disconnect().await(); Assertions.assertTrue(channelFuture.isDone()); - ConnectionManager tempConnectionManager = connectionManager; Wait.untilIsTrue(() -> { try { - verify(tempConnectionManager, times(1)).reconnect(eq(socketAddress)); + verify(reconnector, times(1)).reconnect(eq(socketAddress)); return true; } catch (Throwable e) { @@ -193,10 +197,10 @@ public void testChannelInactive_withConnectionManager() throws Exception { verify(connection, times(2)).onClose(); verify(connectionEventHandler, times(1)).channelInactive(any()); - verify(connectionManager, times(1)).removeAndClose(eq(connection)); + verify(connectionManager, times(1)).close(eq(connection)); verify(connectionEventHandler, times(1)).userEventTriggered(any(), eq(ConnectionEvent.CLOSE)); - verify(connectionManager, times(1)).asyncReconnect(eq(socketAddress)); + verify(reconnector, times(1)).reconnect(eq(socketAddress)); connectionManager.shutdown(); } @@ -208,10 +212,13 @@ public void testChannelExceptionCaught_withConnectionManager() throws Exception connectionManager = spy(connectionManager); InetSocketAddress socketAddress = new InetSocketAddress(remoteAddress, serverPort); - Connection connection = connectionManager.getOrCreateIfAbsent(socketAddress); + Connection connection = connectionManager.get(socketAddress); connection = spy(connection); connection.getChannel().attr(CONNECTION).set(connection); + Reconnector reconnector = spy(connectionManager.reconnector()); + ((ClientConnectionManager) connectionManager).reconnector = reconnector; + ConnectionEventHandler connectionEventHandler = new ConnectionEventHandler(connectionManager); connectionEventHandler = spy(connectionEventHandler); @@ -222,10 +229,9 @@ public void testChannelExceptionCaught_withConnectionManager() throws Exception connection.getChannel().pipeline().fireExceptionCaught(new RuntimeException("testChannelExceptionCaught")); - ConnectionManager tempConnectionManager = connectionManager; Wait.untilIsTrue(() -> { try { - verify(tempConnectionManager, times(1)).reconnect(eq(socketAddress)); + verify(reconnector, times(1)).reconnect(eq(socketAddress)); return true; } catch (Throwable e) { @@ -237,10 +243,10 @@ public void testChannelExceptionCaught_withConnectionManager() throws Exception verify(connectionEventHandler, times(1)).close(any(), any()); verify(connection, times(1)).onClose(); verify(connectionEventHandler, times(1)).channelInactive(any()); - verify(connectionManager, times(1)).removeAndClose(eq(connection)); + verify(connectionManager, times(1)).close(eq(connection)); verify(connectionEventHandler, times(1)).userEventTriggered(any(), eq(ConnectionEvent.CLOSE)); - verify(connectionManager, times(1)).asyncReconnect(eq(socketAddress)); + verify(reconnector, times(1)).reconnect(eq(socketAddress)); connectionManager.shutdown(); } diff --git a/src/test/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManagerTest.java b/src/test/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManagerTest.java index 62cc5b8..645841b 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManagerTest.java +++ b/src/test/java/io/github/xinfra/lab/remoting/connection/ServerConnectionManagerTest.java @@ -57,15 +57,6 @@ public void after() { } } - @Test - public void testGetOrCreateIfAbsent() { - SocketAddress socketAddress = new InetSocketAddress(remoteAddress, serverPort); - Assertions.assertThrows(UnsupportedOperationException.class, () -> { - connectionManager.getOrCreateIfAbsent(socketAddress); - }); - - } - @Test public void testGet1() throws RemotingException { // valid socketAddress @@ -75,16 +66,10 @@ public void testGet1() throws RemotingException { Connection connection1 = connectionManager.get(socketAddress); Assertions.assertNull(connection1); - Assertions.assertThrows(UnsupportedOperationException.class, () -> { - connectionManager.getOrCreateIfAbsent(socketAddress); - }); - - connection1 = connectionManager.get(socketAddress); - Assertions.assertNull(connection1); } @Test - public void testAdd() { + public void testAdd() throws RemotingException { Connection connection1 = mock(Connection.class); SocketAddress socketAddress1 = new InetSocketAddress("localhost", 8080); doReturn(socketAddress1).when(connection1).remoteAddress(); @@ -98,7 +83,7 @@ public void testAdd() { Assertions.assertTrue(connection1 == connectionManager.get(socketAddress1)); Assertions.assertTrue(connection2 == connectionManager.get(socketAddress2)); - connectionManager.removeAndClose(connection1); + connectionManager.close(connection1); verify(connection1, times(1)).close(); Assertions.assertNull(connectionManager.get(socketAddress1)); Assertions.assertTrue(connection2 == connectionManager.get(socketAddress2)); diff --git a/src/test/java/io/github/xinfra/lab/remoting/protocol/TestProtocol.java b/src/test/java/io/github/xinfra/lab/remoting/protocol/TestProtocol.java index bd83397..eeb3b6b 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/protocol/TestProtocol.java +++ b/src/test/java/io/github/xinfra/lab/remoting/protocol/TestProtocol.java @@ -6,6 +6,7 @@ import io.github.xinfra.lab.remoting.message.MessageFactory; import io.github.xinfra.lab.remoting.message.MessageHandler; import io.github.xinfra.lab.remoting.rpc.message.RpcMessageFactory; +import io.netty.channel.ChannelHandlerContext; import lombok.Setter; import java.io.IOException; @@ -29,7 +30,22 @@ public class TestProtocol implements Protocol { private MessageFactory testMessageFactory; @Setter - private HeartbeatTrigger testHeartbeatTrigger; + private HeartbeatTrigger testHeartbeatTrigger = new HeartbeatTrigger() { + @Override + public void triggerHeartBeat(ChannelHandlerContext ctx) { + // do nothing + } + + @Override + public void setHeartbeatMaxFailCount(int failCount) { + + } + + @Override + public void setHeartbeatTimeoutMills(int timeoutMills) { + + } + }; @Override public byte[] protocolCode() { diff --git a/src/test/java/io/github/xinfra/lab/remoting/rpc/client/HeartBeatTest.java b/src/test/java/io/github/xinfra/lab/remoting/rpc/client/HeartBeatTest.java index 46623d3..f2c8e2a 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/rpc/client/HeartBeatTest.java +++ b/src/test/java/io/github/xinfra/lab/remoting/rpc/client/HeartBeatTest.java @@ -54,7 +54,7 @@ public void heartbeatTest1() throws RemotingException, InterruptedException, IOE BaseRemoting baseRemoting = new BaseRemoting(protocol); Message heartbeatRequestMessage = messageFactory.createHeartbeatRequestMessage(); - Connection connection = connectionManager.getOrCreateIfAbsent(remoteAddress); + Connection connection = connectionManager.get(remoteAddress); Message heartbeatResponseMessage = baseRemoting.syncCall(heartbeatRequestMessage, connection, 1000);