Skip to content

Commit

Permalink
Queue message broadcasts locally in each IRCClient
Browse files Browse the repository at this point in the history
Right now the trace of broadcasts is:
  RC2UDPHandler
    -> Broadcast task
       -> IRCServer.broadcast()
          -> for each IRCClient:
             -> IRCClient.msg()
                 -> IRCClient.send()
                    -> asyncio.StreamWriter.write()
                       asyncio.StreamWriter.drain()

So effectively, a task is spawned for every received message, but each
of those tasks ends up awaiting drain() for every client. This means
backtpressure from even a single client slows down every task,
ultimately slowing down processing, as well as creating a thundering
herd problem.

This could be easily fixed by just removing the "await ...drain()" from
IRCClient.send(), or creating a no_drain=True variant. However, this
means that super slow clients may potentially accumulate large buffers
of messages to be sent.

Instead, create an asyncio.Queue() in each IRCClient, holding each
client's backlog, and a single task per client to consume from this
queue. The queue size is capped at a certain (arbitrary) size, after
which the queue just drops all new messages. Metrics have been added to
monitor overruns and potentially adjust this size.
  • Loading branch information
paravoid committed Oct 11, 2024
1 parent 224d51c commit 6d4dd3f
Showing 1 changed file with 33 additions and 4 deletions.
37 changes: 33 additions & 4 deletions ircstream/ircserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

from ._version import __version__

MESSAGE_QUEUE_SIZE = 1000 # maximum number of messages that will be queued for each client

logger = structlog.get_logger()


Expand Down Expand Up @@ -216,6 +218,7 @@ def __init__(self, server: IRCServer, reader: asyncio.StreamReader, writer: asyn
self.host: str = ""
self.port: int = 0
self._background_tasks: set[asyncio.Task[Any]] = set()
self._bot_message_queue: asyncio.Queue[tuple[str, str]] = asyncio.Queue(maxsize=MESSAGE_QUEUE_SIZE)

def create_background_task(self, coro: Coroutine[Any, Any, None]) -> None:
"""Spawn a background attached to this client."""
Expand All @@ -236,6 +239,7 @@ async def connect(self) -> None:
self.server.metrics["clients"].inc()

self.create_background_task(self._periodic_ping())
self.create_background_task(self._bot_message_queue_processor())
await self._handle_forever()

async def terminate(self) -> None:
Expand Down Expand Up @@ -264,6 +268,27 @@ async def _periodic_ping(self) -> None:
if self.registered:
await self.msg("PING", self.server.servername)

def enqueue_bot_message(self, target: str, msg: str) -> None:
"""Add a message to the queue of pending bot messages.
Will never block. May raise asyncio.QueueFull if this client's queue is full.
Typically called from IRCServer.broadcast().
"""
return self._bot_message_queue.put_nowait((target, msg))

async def _bot_message_queue_processor(self) -> None:
"""Process the bot message queue continuously."""
while True:
target, msg = await self._bot_message_queue.get() # blocks until there is a queued item
try:
await self.msg("PRIVMSG", [target, msg], from_bot=True)
except Exception:
self.server.metrics["errors"].labels("broadcast_send").inc()
self.log.debug("Unable to dequeue broadcast", exc_info=True)
finally:
self._bot_message_queue.task_done()

async def msg(self, command: str | IRCNumeric, params: list[str] | str, from_bot: bool = False) -> None:
"""Prepare and sends a response to the client.
Expand Down Expand Up @@ -842,9 +867,13 @@ async def broadcast(self, target: str, msg: str) -> None:
clients = self._channels.setdefault(target, set())
for client in clients:
try:
await client.msg("PRIVMSG", [target, msg], from_bot=True)
client.enqueue_bot_message(target, msg)
except asyncio.QueueFull:
self.metrics["errors"].labels("queue_overrun").inc()
self.log.debug("Client queue overrun", client=client.internal_ident)
continue # client is losing messages, but other clients should not suffer
except Exception:
self.metrics["errors"].labels("broadcast").inc()
self.log.debug("Unable to broadcast", exc_info=True)
continue # ignore exceptions, to catch corner cases
self.metrics["errors"].labels("broadcast_enqueue").inc()
self.log.warning("Unable to enqueue broadcast", client=client.internal_ident, exc_info=True)
continue # ignore all exceptions, to catch corner cases
self.metrics["messages"].inc()

0 comments on commit 6d4dd3f

Please sign in to comment.