diff --git a/ircstream/ircserver.py b/ircstream/ircserver.py index f98bdf3..b974d65 100644 --- a/ircstream/ircserver.py +++ b/ircstream/ircserver.py @@ -30,6 +30,8 @@ from ._version import __version__ +MESSAGE_QUEUE_SIZE = 1000 # maximum number of messages that will be queued for each client + logger = structlog.get_logger() @@ -216,6 +218,7 @@ def __init__(self, server: IRCServer, reader: asyncio.StreamReader, writer: asyn self.host: str = "" self.port: int = 0 self._background_tasks: set[asyncio.Task[Any]] = set() + self._bot_message_queue: asyncio.Queue[tuple[str, str]] = asyncio.Queue(maxsize=MESSAGE_QUEUE_SIZE) def create_background_task(self, coro: Coroutine[Any, Any, None]) -> None: """Spawn a background attached to this client.""" @@ -236,6 +239,7 @@ async def connect(self) -> None: self.server.metrics["clients"].inc() self.create_background_task(self._periodic_ping()) + self.create_background_task(self._bot_message_queue_processor()) await self._handle_forever() async def terminate(self) -> None: @@ -264,6 +268,27 @@ async def _periodic_ping(self) -> None: if self.registered: await self.msg("PING", self.server.servername) + def enqueue_bot_message(self, target: str, msg: str) -> None: + """Add a message to the queue of pending bot messages. + + Will never block. May raise asyncio.QueueFull if this client's queue is full. + + Typically called from IRCServer.broadcast(). + """ + return self._bot_message_queue.put_nowait((target, msg)) + + async def _bot_message_queue_processor(self) -> None: + """Process the bot message queue continuously.""" + while True: + target, msg = await self._bot_message_queue.get() # blocks until there is a queued item + try: + await self.msg("PRIVMSG", [target, msg], from_bot=True) + except Exception: + self.server.metrics["errors"].labels("broadcast_send").inc() + self.log.debug("Unable to dequeue broadcast", exc_info=True) + finally: + self._bot_message_queue.task_done() + async def msg(self, command: str | IRCNumeric, params: list[str] | str, from_bot: bool = False) -> None: """Prepare and sends a response to the client. @@ -842,9 +867,13 @@ async def broadcast(self, target: str, msg: str) -> None: clients = self._channels.setdefault(target, set()) for client in clients: try: - await client.msg("PRIVMSG", [target, msg], from_bot=True) + client.enqueue_bot_message(target, msg) + except asyncio.QueueFull: + self.metrics["errors"].labels("queue_overrun").inc() + self.log.debug("Client queue overrun", client=client.internal_ident) + continue # client is losing messages, but other clients should not suffer except Exception: - self.metrics["errors"].labels("broadcast").inc() - self.log.debug("Unable to broadcast", exc_info=True) - continue # ignore exceptions, to catch corner cases + self.metrics["errors"].labels("broadcast_enqueue").inc() + self.log.warning("Unable to enqueue broadcast", client=client.internal_ident, exc_info=True) + continue # ignore all exceptions, to catch corner cases self.metrics["messages"].inc()