Skip to content

Commit

Permalink
update ConnectionEventListener (#19)
Browse files Browse the repository at this point in the history
* update ConnectionEventListener
  • Loading branch information
JoeCqupt authored Nov 5, 2024
1 parent 7134229 commit 2384fc9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

public interface ConnectionEventListener {

ConnectionEvent interest();

void onEvent(Connection connection);
void onEvent(ConnectionEvent connectionEvent, Connection connection);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class DefaultConnectionEventProcessor extends AbstractLifeCycle implement

protected LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();

private Map<ConnectionEvent, List<ConnectionEventListener>> listeners = new ConcurrentHashMap<>();
private CopyOnWriteArrayList<ConnectionEventListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void startup() {
Expand All @@ -48,14 +48,7 @@ public void handleEvent(ConnectionEvent event, Connection connection) {
public void addConnectionEventListener(ConnectionEventListener listener) {
ensureStarted();
Validate.notNull(listener, "listener must not be null");
Validate.notNull(listener.interest(), "listener interest event must not be null");
ConnectionEvent event = listener.interest();
List<ConnectionEventListener> connectionEventListeners = listeners.get(event);
if (connectionEventListeners == null) {
listeners.computeIfAbsent(event, e -> new CopyOnWriteArrayList<>());
connectionEventListeners = listeners.get(event);
}
connectionEventListeners.add(listener);
listeners.addIfAbsent(listener);
}

@AllArgsConstructor
Expand All @@ -81,17 +74,15 @@ public void run() {
continue;
}

List<ConnectionEventListener> connectionEventListeners = listeners.get(event.connectionEvent);
if (connectionEventListeners != null) {
for (ConnectionEventListener listener : connectionEventListeners) {
try {
listener.onEvent(event.connection);
}
catch (Throwable t) {
log.warn("{} onEvent execute fail", listener, t);
}
for (ConnectionEventListener listener : listeners) {
try {
listener.onEvent(event.connectionEvent, event.connection);
}
catch (Throwable t) {
log.warn("{} onEvent execute fail", listener, t);
}
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,33 +281,36 @@ void testDisableReconnect() throws InterruptedException, TimeoutException {

@Test
void testConnectionEventListener() throws RemotingException, InterruptedException, TimeoutException {
AtomicBoolean connectFlag = new AtomicBoolean(false);
AtomicReference<Connection> connectionRef1 = new AtomicReference<>();

connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() {
@Override
public ConnectionEvent interest() {
return ConnectionEvent.CONNECT;
public void onEvent(ConnectionEvent connectionEvent, Connection connection) {
// threw exception will not affect others listener
throw new RuntimeException("test throw exception");
}
});

AtomicBoolean connectFlag = new AtomicBoolean(false);
AtomicReference<Connection> connectionRef1 = new AtomicReference<>();
connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() {
@Override
public void onEvent(Connection connection) {
connectionRef1.set(connection);
connectFlag.set(true);
public void onEvent(ConnectionEvent connectionEvent, Connection connection) {
if (ConnectionEvent.CONNECT == connectionEvent) {
connectionRef1.set(connection);
connectFlag.set(true);
}
}
});

AtomicBoolean closeFlag = new AtomicBoolean(false);
AtomicReference<Connection> connectionRef2 = new AtomicReference<>();
connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() {
@Override
public ConnectionEvent interest() {
return ConnectionEvent.CLOSE;
}

@Override
public void onEvent(Connection connection) {
connectionRef2.set(connection);
closeFlag.set(true);
public void onEvent(ConnectionEvent connectionEvent, Connection connection) {
if (ConnectionEvent.CLOSE == connectionEvent) {
connectionRef2.set(connection);
closeFlag.set(true);
}
}
});

Expand Down

0 comments on commit 2384fc9

Please sign in to comment.