Skip to content

Commit

Permalink
fix multi new connect coming at same time, epoll lost event
Browse files Browse the repository at this point in the history
  • Loading branch information
lx169075 committed May 24, 2021
1 parent 94398c7 commit 955fcb7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 36 deletions.
79 changes: 46 additions & 33 deletions cup/net/asyn/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
"""
:descrition:
connection related module
1. There's only 1 thread reading/receiving data from the interface.
2. There might have more than 1 thred writing data into the network
queue. 1 thread per context(ip, port).
Notice that _do_write will only TRY to send out some data. It might
encounter TCP/IP stack full of data in the SEND buffer-queue of
the network interface
Expand Down Expand Up @@ -121,9 +118,7 @@ def global_sock_keepalive(self,
It activates after 1 second (after_idle_sec) of idleness,
then sends a keepalive ping once every 3 seconds (interval_sec),
and closes the connection after 5 failed ping (max_fails), or 15 sec
Notice, this will set all sockets this way.
:param sock:
socket
:param after_idle_sec:
Expand Down Expand Up @@ -187,16 +182,17 @@ def bind(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._set_sock_params(sock)
sock.bind((self._bind_ip, self._bind_port))
self._set_sock_nonblocking(sock)

##self._set_sock_nonblocking(sock)
log.info(
'bind port info:(ip:%s, port:%s)' % (
self._bind_ip, self._bind_port
)
)
self._epoll.register(
sock.fileno(),
select.EPOLLIN | select.EPOLLET | select.EPOLLOUT | select.EPOLLERR
)
#self._epoll.register(
# sock.fileno(),
# select.EPOLLIN | select.EPOLLET | select.EPOLLOUT | select.EPOLLERR
#)
self._bind_sock = sock

def push_msg2sendqueue(self, msg):
Expand Down Expand Up @@ -331,15 +327,15 @@ def _handle_new_conn(self, newsock, peer):
context.set_sock(newsock)
context.set_conn_man(self)
context.set_peerinfo(peer)
self._epoll.register(
newsock.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR
)
self._rwlock.acquire_writelock()
self._fileno2context[newsock.fileno()] = context
self._peer2context[peer] = context
self._context2fileno_peer[context] = (newsock.fileno(), peer)
self._rwlock.release_writelock()
log.info('a new connection: {0}'.format(peer))
self._epoll.register(
newsock.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR
)
log.info('a new connection: {0}, register new fd:{1}'.format(peer, newsock.fileno()))

def cleanup_error_context(self, context):
"""clean up error context"""
Expand Down Expand Up @@ -371,16 +367,6 @@ def _cleanup_context(send_queue, peerinfo):
self._rwlock.release_readlock()
if fileno_peer is None:
return
try:
sock = context.get_sock()
sock.close()
context.set_sock(None)
except socket.error as error:
log.info(
'failed to close the socket, err_msg:%s' % str(error)
)
except Exception as error:
log.warn('failed to close socket:{0}'.format(error))
try:
self._epoll.unregister(fileno_peer[0])
except Exception as error: # pylint: disable=W0703
Expand All @@ -394,10 +380,21 @@ def _cleanup_context(send_queue, peerinfo):
del self._peer2context[fileno_peer[1]]
del self._context2fileno_peer[context]
except KeyError:
log.warn(traceback.format_exc())
pass
self._rwlock.release_writelock()
log.info('socket {0} closed successfully'.format(peerinfo))
try:
sock = context.get_sock()
sock.close()
context.set_sock(None)
log.info('socket {0} closed successfully, fd:{1}'.format(peerinfo, fileno_peer[0]))
except socket.error as error:
log.info(
'failed to close the socket, err_msg:%s' % str(error)
)

except Exception as error:
log.warn(traceback.format_exc())
pass
# pylint: disable=W0212
self._thdpool.add_1job(_cleanup_context, context._send_queue, peerinfo)
Expand Down Expand Up @@ -431,6 +428,12 @@ def close_socket(self, msg, recv_socket):
)
return

def listen_new_connect(self):
while not self._stopsign:
newsock, addr = self._bind_sock.accept()
self._handle_new_conn(newsock, addr)
log.info('listen new connect thread exit')

def poll(self):
"""
start to poll
Expand All @@ -439,7 +442,10 @@ def poll(self):
self._executor.run()
log.info('thdpool and executor start')
misc.check_not_none(self._bind_sock)
self._bind_sock.listen(10)
self._bind_sock.listen(128)
self._executor.queue_exec(
self.listen_new_connect,
urgency=executor.URGENCY_HIGH)
self._executor.delay_exec(
2, # todo set the check_time to ?
self.do_check_msg_ack_loop,
Expand All @@ -453,21 +459,22 @@ def poll(self):
return
raise err
# log.debug('start to poll')
log.debug('handle events num:%s, bind sock fileno:%s' % (len(events), self._bind_sock.fileno()))
for fileno, event in events:
# if it comes from the listen port, new conn
if fileno == self._bind_sock.fileno():
newsock, addr = self._bind_sock.accept()
self._handle_new_conn(newsock, addr)
elif event & select.EPOLLIN:
#if fileno == self._bind_sock.fileno():
# newsock, addr = self._bind_sock.accept()
# self._handle_new_conn(newsock, addr)
if event & select.EPOLLIN:
try:
self._handle_new_recv(self._fileno2context[fileno])
except KeyError:
log.info('socket already closed')
log.info('fd:%s, socket already closed' % fileno)
elif event & select.EPOLLOUT:
try:
self._handle_new_send(self._fileno2context[fileno])
except KeyError:
log.info('socket already closed')
log.info('fd:%s, socket already closed' % fileno)
elif (event & select.EPOLLHUP) or (event & select.EPOLLERR):
# FIXME: consider if we need to release net msg resources
if event & select.EPOLLHUP:
Expand Down Expand Up @@ -505,11 +512,16 @@ def stop(self, force_stop=False):
"""
stop the connection manager
"""
def interupt_accept():
local_socket = socket.socket()
local_socket.connect((self._bind_ip, self._bind_port))
local_socket.close()
log.info('to stop the connection manager')
self._stopsign = True
self._async_stop(force_stop)
interupt_accept()
log.info('connection manager stopped')

def get_recv_msg_ind(self):
"""
get recv msg ind
Expand Down Expand Up @@ -880,3 +892,4 @@ def do_check_msg_ack_loop(self):
)

# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent

6 changes: 3 additions & 3 deletions cup/net/asyn/msgcenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
import time
import socket
import threading

import traceback
from cup import log
from cup.net.asyn import conn
from cup.net.asyn import msg as async_msg


__all__ = ['IMessageCenter']

# CHECK_OFF=0
Expand Down Expand Up @@ -177,18 +176,19 @@ def stop(self, force_stop=False):
stop the message center
"""
log.info('To stop the msgcenter')
self._conn_mgr.stop(force_stop)
self._stop = True
self._stat_cond.acquire()
self._stat_cond.notify()
self._stat_cond.release()
self._conn_mgr.stop(force_stop)
log.info('msgcenter stopped')

def run(self):
"""
run the msgcenter
"""
if not self.setup():
self._stop = True
return False
thd_conn_man = threading.Thread(target=self._run_conn_manager, args=())
thd_conn_man.start()
Expand Down

0 comments on commit 955fcb7

Please sign in to comment.