From 009b292e291b9a9ab1fd6326bc58181e095074b1 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Thu, 14 Dec 2023 16:18:46 +0100 Subject: [PATCH] Process: Catch `ChannelInvalidStateError` during init and cleanup During `Process.init`, the process registers itself as an RPC and broadcast subscriber. These calls can fail with a `ConnectionClosed` or `ChannelInvalidStateError` exception if the connection to RabbitMQ fails. The success of these calls is not crucial for the process running to completion and we don't want them excepting to cause the entire process to except. Therefore the exceptions are caught and simply logged as warnings. In the `Process.init` method, there are also cleanups registered to remove the process as an RPC and broadcast subscriber. These can similarly except and so the cleanup callbacks are likewise updated to catch and log these exceptions. --- src/plumpy/processes.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index c9c1a118..b84fa1f8 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -308,6 +308,8 @@ def init(self) -> None: try: identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid)) self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier)) + except (ConnectionClosed, ChannelInvalidStateError): + self.logger.warning('Process<%s>: no connection available to register as an RPC subscriber.', self.pid) except kiwipy.TimeoutError: self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid) @@ -316,6 +318,10 @@ def init(self) -> None: subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*')) identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid)) self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier)) + except (ConnectionClosed, ChannelInvalidStateError): + self.logger.warning( + 'Process<%s>: no connection available to register as a broadcast subscriber.', self.pid + ) except kiwipy.TimeoutError: self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid) @@ -886,6 +892,8 @@ def on_close(self) -> None: for cleanup in self._cleanups or []: try: cleanup() + except (ConnectionClosed, ChannelInvalidStateError): + self.logger.warning('Process<%s>: No connection when calling cleanup method %s.', self.pid, cleanup) except Exception: # pylint: disable=broad-except self.logger.exception('Process<%s>: Exception calling cleanup method %s', self.pid, cleanup) self._cleanups = None