Skip to content

Commit

Permalink
imporve connection manager (#17)
Browse files Browse the repository at this point in the history
* improve  connection pool

* improve connection manager

* update reconnect
  • Loading branch information
JoeCqupt authored Oct 31, 2024
1 parent 0dc28d4 commit 571b373
Show file tree
Hide file tree
Showing 20 changed files with 331 additions and 422 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.github.x-infra-lab</groupId>
<artifactId>x-remoting</artifactId>
<version>0.0.1</version>
<version>0.0.2-RC1</version>
<packaging>jar</packaging>

<name>x-remoting</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,13 @@ public AbstractConnectionManager(ConnectionManagerConfig config) {
this.config = config;
}

@Override
public synchronized Connection getOrCreateIfAbsent(SocketAddress socketAddress) throws RemotingException {
ensureStarted();
Validate.notNull(socketAddress, "socketAddress can not be null");

ConnectionHolder connectionHolder = connections.get(socketAddress);
if (connectionHolder == null) {
connectionHolder = createConnectionHolder(socketAddress);
createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint());
}

return connectionHolder.get();
}

@Override
public void check(Connection connection) throws RemotingException {
ensureStarted();
Validate.notNull(connection, "connection can not be null");

if (connection.getChannel() == null || !connection.getChannel().isActive()) {
this.removeAndClose(connection);
this.close(connection);
throw new RemotingException("Check connection failed for address: " + connection.remoteAddress());
}
if (!connection.getChannel().isWritable()) {
Expand All @@ -60,7 +46,7 @@ public void check(Connection connection) throws RemotingException {
}

@Override
public synchronized void removeAndClose(Connection connection) {
public synchronized void close(Connection connection) {
ensureStarted();
Validate.notNull(connection, "connection can not be null");

Expand All @@ -70,25 +56,13 @@ public synchronized void removeAndClose(Connection connection) {
connection.close();
}
else {
connectionHolder.removeAndClose(connection);
connectionHolder.invalidate(connection);
if (connectionHolder.isEmpty()) {
connections.remove(socketAddress);
}
}
}

@Override
public Connection get(SocketAddress socketAddress) {
ensureStarted();
Validate.notNull(socketAddress, "socketAddress can not be null");

ConnectionHolder connectionHolder = connections.get(socketAddress);
if (connectionHolder == null) {
return null;
}
return connectionHolder.get();
}

@Override
public synchronized void add(Connection connection) {
ensureStarted();
Expand All @@ -111,7 +85,7 @@ public synchronized void shutdown() {
for (Map.Entry<SocketAddress, ConnectionHolder> entry : connections.entrySet()) {
SocketAddress socketAddress = entry.getKey();
ConnectionHolder connectionHolder = entry.getValue();
connectionHolder.removeAndCloseAll();
connectionHolder.close();
connections.remove(socketAddress);
}

Expand All @@ -125,10 +99,10 @@ protected ConnectionHolder createConnectionHolder(SocketAddress socketAddress) {

protected void createConnectionForHolder(SocketAddress socketAddress, ConnectionHolder connectionHolder, int size)
throws RemotingException {
for (int i = 0; i < size; i++) {
for (int i = connectionHolder.size(); i < size; i++) {
Connection connection = connectionFactory.create(socketAddress);
connectionHolder.add(connection);
}
}

}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,22 @@
package io.github.xinfra.lab.remoting.connection;

import io.github.xinfra.lab.remoting.common.NamedThreadFactory;
import io.github.xinfra.lab.remoting.annotation.AccessForTest;
import io.github.xinfra.lab.remoting.exception.RemotingException;
import io.github.xinfra.lab.remoting.protocol.Protocol;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@Slf4j
public class ClientConnectionManager extends AbstractConnectionManager {

private ExecutorService reconnector = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1024), new NamedThreadFactory("Reconnector-Worker"));

private Set<SocketAddress> disableReconnectSocketAddresses = new CopyOnWriteArraySet<>();
@AccessForTest
protected Reconnector reconnector;

public ClientConnectionManager(Protocol protocol) {
this.connectionFactory = new DefaultConnectionFactory(protocol, defaultChannelSuppliers());
Expand Down Expand Up @@ -64,78 +54,51 @@ private List<Supplier<ChannelHandler>> defaultChannelSuppliers() {
}

@Override
public synchronized void shutdown() {
for (SocketAddress socketAddress : connections.keySet()) {
disableReconnect(socketAddress);
public Connection connect(SocketAddress socketAddress) throws RemotingException {
ConnectionHolder connectionHolder = connections.get(socketAddress);
if (connectionHolder == null) {
connectionHolder = createConnectionHolder(socketAddress);
}
super.shutdown();
createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint());

reconnector.shutdownNow();
return connectionHolder.get();
}

@Override
public synchronized void reconnect(SocketAddress socketAddress) throws RemotingException {
public synchronized Connection get(SocketAddress socketAddress) throws RemotingException {
ensureStarted();
if (disableReconnectSocketAddresses.contains(socketAddress)) {
log.warn("socketAddress:{} is disable to reconnect", socketAddress);
throw new RemotingException("socketAddress is disable to reconnect:" + socketAddress);
}
Validate.notNull(socketAddress, "socketAddress can not be null");

ConnectionHolder connectionHolder = connections.get(socketAddress);
if (connectionHolder == null) {
connectionHolder = createConnectionHolder(socketAddress);
createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint());
}
else {
int needCreateNum = config.getConnectionNumPreEndpoint() - connectionHolder.size();
if (needCreateNum > 0) {
createConnectionForHolder(socketAddress, connectionHolder, needCreateNum);
}
return connect(socketAddress);
}

return connectionHolder.get();
}

@Override
public synchronized void disableReconnect(SocketAddress socketAddress) {
ensureStarted();
disableReconnectSocketAddresses.add(socketAddress);
public Reconnector reconnector() {
return reconnector;
}

@Override
public synchronized void enableReconnect(SocketAddress socketAddress) {
ensureStarted();
disableReconnectSocketAddresses.remove(socketAddress);
public void startup() {
super.startup();
reconnector = new DefaultReconnector(this);
reconnector.startup();
}

@Override
public synchronized Future<Void> asyncReconnect(SocketAddress socketAddress) {
ensureStarted();
if (disableReconnectSocketAddresses.contains(socketAddress)) {
log.warn("socketAddress:{} is disable to asyncReconnect", socketAddress);
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(
new RemotingException("socketAddress is disable to asyncReconnect:" + socketAddress));
return future;
}

Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
reconnect(socketAddress);
}
catch (Exception e) {
log.warn("reconnect socketAddress:{} fail", socketAddress, e);
throw e;
}
return null;
public synchronized void shutdown() {
if (reconnector() != null) {
for (SocketAddress socketAddress : connections.keySet()) {
reconnector().disableReconnect(socketAddress);
}
};

try {
return reconnector.submit(callable);
}
catch (Throwable t) {
log.warn("asyncReconnect submit failed.", t);
throw t;
super.shutdown();

reconnector().shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Connection connection = ctx.channel().attr(CONNECTION).get();
if (connectionManager != null && connectionManager.isStarted()) {
connectionManager.removeAndClose(connection);
connectionManager.close(connection);
}
userEventTriggered(ctx, ConnectionEvent.CLOSE);

Expand All @@ -58,8 +58,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
Connection connection = ctx.channel().attr(CONNECTION).get();
ConnectionEvent connectionEvent = (ConnectionEvent) evt;
if (connectionEvent == ConnectionEvent.CLOSE) {
if (connectionManager != null && connectionManager.isStarted()) {
connectionManager.asyncReconnect(connection.remoteAddress());
if (connectionManager != null && connectionManager.isStarted()
&& connectionManager.reconnector() != null) {
connectionManager.reconnector().reconnect(connection.remoteAddress());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import io.github.xinfra.lab.remoting.annotation.AccessForTest;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class ConnectionHolder {
public class ConnectionHolder implements Closeable {

@AccessForTest
protected CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<>();
Expand All @@ -16,14 +19,18 @@ public ConnectionHolder(ConnectionSelectStrategy connectionSelectStrategy) {
}

public Connection get() {
return connectionSelectStrategy.select(connections);
List<Connection> snapshot = new ArrayList<>(connections);
if (snapshot.size() > 0) {
return connectionSelectStrategy.select(new ArrayList<>(connections));
}
return null;
}

public void add(Connection connection) {
connections.add(connection);
connections.addIfAbsent(connection);
}

public void removeAndClose(Connection connection) {
public void invalidate(Connection connection) {
connections.remove(connection);
connection.close();
}
Expand All @@ -32,15 +39,16 @@ public boolean isEmpty() {
return connections.isEmpty();
}

public void removeAndCloseAll() {
public int size() {
return connections.size();
}

@Override
public void close() {
for (Connection connection : connections) {
connections.remove(connection);
connection.close();
}
}

public int size() {
return connections.size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,21 @@

import io.github.xinfra.lab.remoting.common.LifeCycle;
import io.github.xinfra.lab.remoting.exception.RemotingException;
import io.github.xinfra.lab.remoting.protocol.Protocol;

import java.net.SocketAddress;
import java.util.concurrent.Future;

public interface ConnectionManager extends LifeCycle {

Connection getOrCreateIfAbsent(SocketAddress socketAddress) throws RemotingException;
Connection connect(SocketAddress socketAddress) throws RemotingException;

Connection get(SocketAddress socketAddress);
Connection get(SocketAddress socketAddress) throws RemotingException;

void check(Connection connection) throws RemotingException;

void removeAndClose(Connection connection);
void close(Connection connection);

void add(Connection connection);

void reconnect(SocketAddress socketAddress) throws RemotingException;

void disableReconnect(SocketAddress socketAddress);

void enableReconnect(SocketAddress socketAddress);

Future<Void> asyncReconnect(SocketAddress socketAddress);
Reconnector reconnector();

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class DefaultConnectionFactory implements ConnectionFactory {
/**
* Q: why use Supplier to get ChannelHandler? A: some ChannelHandler is
* not @ChannelHandler.Sharable. need create instance every time
* @param channelHandlerSuppliers
*/
public DefaultConnectionFactory(Protocol protocol, List<Supplier<ChannelHandler>> channelHandlerSuppliers) {
this(protocol, channelHandlerSuppliers, new ConnectionConfig());
Expand All @@ -57,8 +56,6 @@ public DefaultConnectionFactory(Protocol protocol, List<Supplier<ChannelHandler>
/**
* Q: why use Supplier to get ChannelHandler? A: some ChannelHandler is
* not @ChannelHandler.Sharable. need create instance every time
* @param channelHandlerSuppliers
* @param connectionConfig
*/
public DefaultConnectionFactory(Protocol protocol, List<Supplier<ChannelHandler>> channelHandlerSuppliers,
ConnectionConfig connectionConfig) {
Expand Down
Loading

0 comments on commit 571b373

Please sign in to comment.