From 955fcb73ac8344fd2f0a253364073539e598562b Mon Sep 17 00:00:00 2001 From: lx169075 Date: Mon, 24 May 2021 19:53:40 +0800 Subject: [PATCH] fix multi new connect coming at same time, epoll lost event --- cup/net/asyn/conn.py | 79 +++++++++++++++++++++++---------------- cup/net/asyn/msgcenter.py | 6 +-- 2 files changed, 49 insertions(+), 36 deletions(-) diff --git a/cup/net/asyn/conn.py b/cup/net/asyn/conn.py index cc26187..a08330a 100644 --- a/cup/net/asyn/conn.py +++ b/cup/net/asyn/conn.py @@ -5,12 +5,9 @@ """ :descrition: connection related module - 1. There's only 1 thread reading/receiving data from the interface. - 2. There might have more than 1 thred writing data into the network queue. 1 thread per context(ip, port). - Notice that _do_write will only TRY to send out some data. It might encounter TCP/IP stack full of data in the SEND buffer-queue of the network interface @@ -121,9 +118,7 @@ def global_sock_keepalive(self, It activates after 1 second (after_idle_sec) of idleness, then sends a keepalive ping once every 3 seconds (interval_sec), and closes the connection after 5 failed ping (max_fails), or 15 sec - Notice, this will set all sockets this way. - :param sock: socket :param after_idle_sec: @@ -187,16 +182,17 @@ def bind(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._set_sock_params(sock) sock.bind((self._bind_ip, self._bind_port)) - self._set_sock_nonblocking(sock) + + ##self._set_sock_nonblocking(sock) log.info( 'bind port info:(ip:%s, port:%s)' % ( self._bind_ip, self._bind_port ) ) - self._epoll.register( - sock.fileno(), - select.EPOLLIN | select.EPOLLET | select.EPOLLOUT | select.EPOLLERR - ) + #self._epoll.register( + # sock.fileno(), + # select.EPOLLIN | select.EPOLLET | select.EPOLLOUT | select.EPOLLERR + #) self._bind_sock = sock def push_msg2sendqueue(self, msg): @@ -331,15 +327,15 @@ def _handle_new_conn(self, newsock, peer): context.set_sock(newsock) context.set_conn_man(self) context.set_peerinfo(peer) - self._epoll.register( - newsock.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR - ) self._rwlock.acquire_writelock() self._fileno2context[newsock.fileno()] = context self._peer2context[peer] = context self._context2fileno_peer[context] = (newsock.fileno(), peer) self._rwlock.release_writelock() - log.info('a new connection: {0}'.format(peer)) + self._epoll.register( + newsock.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR + ) + log.info('a new connection: {0}, register new fd:{1}'.format(peer, newsock.fileno())) def cleanup_error_context(self, context): """clean up error context""" @@ -371,16 +367,6 @@ def _cleanup_context(send_queue, peerinfo): self._rwlock.release_readlock() if fileno_peer is None: return - try: - sock = context.get_sock() - sock.close() - context.set_sock(None) - except socket.error as error: - log.info( - 'failed to close the socket, err_msg:%s' % str(error) - ) - except Exception as error: - log.warn('failed to close socket:{0}'.format(error)) try: self._epoll.unregister(fileno_peer[0]) except Exception as error: # pylint: disable=W0703 @@ -394,10 +380,21 @@ def _cleanup_context(send_queue, peerinfo): del self._peer2context[fileno_peer[1]] del self._context2fileno_peer[context] except KeyError: + log.warn(traceback.format_exc()) pass self._rwlock.release_writelock() - log.info('socket {0} closed successfully'.format(peerinfo)) + try: + sock = context.get_sock() + sock.close() + context.set_sock(None) + log.info('socket {0} closed successfully, fd:{1}'.format(peerinfo, fileno_peer[0])) + except socket.error as error: + log.info( + 'failed to close the socket, err_msg:%s' % str(error) + ) + except Exception as error: + log.warn(traceback.format_exc()) pass # pylint: disable=W0212 self._thdpool.add_1job(_cleanup_context, context._send_queue, peerinfo) @@ -431,6 +428,12 @@ def close_socket(self, msg, recv_socket): ) return + def listen_new_connect(self): + while not self._stopsign: + newsock, addr = self._bind_sock.accept() + self._handle_new_conn(newsock, addr) + log.info('listen new connect thread exit') + def poll(self): """ start to poll @@ -439,7 +442,10 @@ def poll(self): self._executor.run() log.info('thdpool and executor start') misc.check_not_none(self._bind_sock) - self._bind_sock.listen(10) + self._bind_sock.listen(128) + self._executor.queue_exec( + self.listen_new_connect, + urgency=executor.URGENCY_HIGH) self._executor.delay_exec( 2, # todo set the check_time to ? self.do_check_msg_ack_loop, @@ -453,21 +459,22 @@ def poll(self): return raise err # log.debug('start to poll') + log.debug('handle events num:%s, bind sock fileno:%s' % (len(events), self._bind_sock.fileno())) for fileno, event in events: # if it comes from the listen port, new conn - if fileno == self._bind_sock.fileno(): - newsock, addr = self._bind_sock.accept() - self._handle_new_conn(newsock, addr) - elif event & select.EPOLLIN: + #if fileno == self._bind_sock.fileno(): + # newsock, addr = self._bind_sock.accept() + # self._handle_new_conn(newsock, addr) + if event & select.EPOLLIN: try: self._handle_new_recv(self._fileno2context[fileno]) except KeyError: - log.info('socket already closed') + log.info('fd:%s, socket already closed' % fileno) elif event & select.EPOLLOUT: try: self._handle_new_send(self._fileno2context[fileno]) except KeyError: - log.info('socket already closed') + log.info('fd:%s, socket already closed' % fileno) elif (event & select.EPOLLHUP) or (event & select.EPOLLERR): # FIXME: consider if we need to release net msg resources if event & select.EPOLLHUP: @@ -505,11 +512,16 @@ def stop(self, force_stop=False): """ stop the connection manager """ + def interupt_accept(): + local_socket = socket.socket() + local_socket.connect((self._bind_ip, self._bind_port)) + local_socket.close() log.info('to stop the connection manager') self._stopsign = True self._async_stop(force_stop) + interupt_accept() log.info('connection manager stopped') - + def get_recv_msg_ind(self): """ get recv msg ind @@ -880,3 +892,4 @@ def do_check_msg_ack_loop(self): ) # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent + diff --git a/cup/net/asyn/msgcenter.py b/cup/net/asyn/msgcenter.py index ddeff27..76bcbcf 100644 --- a/cup/net/asyn/msgcenter.py +++ b/cup/net/asyn/msgcenter.py @@ -11,12 +11,11 @@ import time import socket import threading - +import traceback from cup import log from cup.net.asyn import conn from cup.net.asyn import msg as async_msg - __all__ = ['IMessageCenter'] # CHECK_OFF=0 @@ -177,11 +176,11 @@ def stop(self, force_stop=False): stop the message center """ log.info('To stop the msgcenter') - self._conn_mgr.stop(force_stop) self._stop = True self._stat_cond.acquire() self._stat_cond.notify() self._stat_cond.release() + self._conn_mgr.stop(force_stop) log.info('msgcenter stopped') def run(self): @@ -189,6 +188,7 @@ def run(self): run the msgcenter """ if not self.setup(): + self._stop = True return False thd_conn_man = threading.Thread(target=self._run_conn_manager, args=()) thd_conn_man.start()