Skip to content

Commit

Permalink
[transport.tcpv4] Make getting connections more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
nhairs committed Nov 22, 2024
1 parent 0c41d3f commit 37bcb13
Showing 1 changed file with 36 additions and 9 deletions.
45 changes: 36 additions & 9 deletions src/nserver/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,41 @@ def stop_server(self) -> None:
def __repr__(self):
return f"{self.__class__.__name__}(address={self.address!r}, port={self.port!r})"

def _get_next_connection(self) -> tuple[socket.socket, tuple[str, int]]:
"""Get the next connection that is ready to receive data on."""
def _get_next_connection(self) -> Tuple[socket.socket, Tuple[str, int]]:
"""Get the next connection that is ready to receive data on.
Blocks until a good connection is found
"""
while True:
if not self.connection_queue:
self._populate_connection_queue()

# There is something in the queue - attempt to get it
connection = self.connection_queue.popleft()

if not self._connection_viable(connection):
self._remove_connection(connection)
continue

# Connection is probably viable
try:
remote_address = connection.getpeername()
except OSError as e:
if e.errno == 107: # Transport endpoint is not connected
self._remove_connection(connection)
continue

raise # Unknown OSError - raise it.

break # we have a valid connection

return connection, remote_address

def _populate_connection_queue(self) -> None:
"""Populate self.connection_queue
Blocks until there is at least on connection
"""
while not self.connection_queue:
# loop until connection is ready for execution
events = self.selector.select(self.SELECT_TIMEOUT)
Expand Down Expand Up @@ -410,13 +443,7 @@ def _get_next_connection(self) -> tuple[socket.socket, tuple[str, int]]:
# No connections ready, take advantage to do cleanup
elif time.time() - self.last_cache_clean > self.CONNECTION_CACHE_CLEAN_INTERVAL:
self._cleanup_cached_connections()

# We have a connection in the queue
# print(f"connection_queue: {self.connection_queue}")
connection = self.connection_queue.popleft()
remote_address = connection.getpeername()

return connection, remote_address
return

def _accept_connection(self) -> socket.socket:
"""Accept a connection, cache it, and add it to the selector"""
Expand Down

0 comments on commit 37bcb13

Please sign in to comment.