From 2384fc9db2c1834ba24d9f6a9b2a6b57205d08bc Mon Sep 17 00:00:00 2001 From: jiangyuan Date: Tue, 5 Nov 2024 14:11:14 +0800 Subject: [PATCH] update ConnectionEventListener (#19) * update ConnectionEventListener --- .../connection/ConnectionEventListener.java | 4 +-- .../DefaultConnectionEventProcessor.java | 27 +++++---------- .../ClientConnectionManagerTest.java | 33 ++++++++++--------- 3 files changed, 28 insertions(+), 36 deletions(-) diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java index c04a23b..3cef8b8 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java @@ -2,8 +2,6 @@ public interface ConnectionEventListener { - ConnectionEvent interest(); - - void onEvent(Connection connection); + void onEvent(ConnectionEvent connectionEvent, Connection connection); } diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java index a7f2cfa..17b9ca2 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java @@ -21,7 +21,7 @@ public class DefaultConnectionEventProcessor extends AbstractLifeCycle implement protected LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - private Map> listeners = new ConcurrentHashMap<>(); + private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @Override public void startup() { @@ -48,14 +48,7 @@ public void handleEvent(ConnectionEvent event, Connection connection) { public void addConnectionEventListener(ConnectionEventListener listener) { ensureStarted(); Validate.notNull(listener, "listener must not be null"); - Validate.notNull(listener.interest(), "listener interest event must not be null"); - ConnectionEvent event = listener.interest(); - List connectionEventListeners = listeners.get(event); - if (connectionEventListeners == null) { - listeners.computeIfAbsent(event, e -> new CopyOnWriteArrayList<>()); - connectionEventListeners = listeners.get(event); - } - connectionEventListeners.add(listener); + listeners.addIfAbsent(listener); } @AllArgsConstructor @@ -81,17 +74,15 @@ public void run() { continue; } - List connectionEventListeners = listeners.get(event.connectionEvent); - if (connectionEventListeners != null) { - for (ConnectionEventListener listener : connectionEventListeners) { - try { - listener.onEvent(event.connection); - } - catch (Throwable t) { - log.warn("{} onEvent execute fail", listener, t); - } + for (ConnectionEventListener listener : listeners) { + try { + listener.onEvent(event.connectionEvent, event.connection); + } + catch (Throwable t) { + log.warn("{} onEvent execute fail", listener, t); } } + } } diff --git a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java index 63611ce..14e84b1 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java +++ b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java @@ -281,18 +281,24 @@ void testDisableReconnect() throws InterruptedException, TimeoutException { @Test void testConnectionEventListener() throws RemotingException, InterruptedException, TimeoutException { - AtomicBoolean connectFlag = new AtomicBoolean(false); - AtomicReference connectionRef1 = new AtomicReference<>(); + connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() { @Override - public ConnectionEvent interest() { - return ConnectionEvent.CONNECT; + public void onEvent(ConnectionEvent connectionEvent, Connection connection) { + // threw exception will not affect others listener + throw new RuntimeException("test throw exception"); } + }); + AtomicBoolean connectFlag = new AtomicBoolean(false); + AtomicReference connectionRef1 = new AtomicReference<>(); + connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() { @Override - public void onEvent(Connection connection) { - connectionRef1.set(connection); - connectFlag.set(true); + public void onEvent(ConnectionEvent connectionEvent, Connection connection) { + if (ConnectionEvent.CONNECT == connectionEvent) { + connectionRef1.set(connection); + connectFlag.set(true); + } } }); @@ -300,14 +306,11 @@ public void onEvent(Connection connection) { AtomicReference connectionRef2 = new AtomicReference<>(); connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() { @Override - public ConnectionEvent interest() { - return ConnectionEvent.CLOSE; - } - - @Override - public void onEvent(Connection connection) { - connectionRef2.set(connection); - closeFlag.set(true); + public void onEvent(ConnectionEvent connectionEvent, Connection connection) { + if (ConnectionEvent.CLOSE == connectionEvent) { + connectionRef2.set(connection); + closeFlag.set(true); + } } });