diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index c9c1a118..b84fa1f8 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -308,6 +308,8 @@ def init(self) -> None: try: identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid)) self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier)) + except (ConnectionClosed, ChannelInvalidStateError): + self.logger.warning('Process<%s>: no connection available to register as an RPC subscriber.', self.pid) except kiwipy.TimeoutError: self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid) @@ -316,6 +318,10 @@ def init(self) -> None: subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*')) identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid)) self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier)) + except (ConnectionClosed, ChannelInvalidStateError): + self.logger.warning( + 'Process<%s>: no connection available to register as a broadcast subscriber.', self.pid + ) except kiwipy.TimeoutError: self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid) @@ -886,6 +892,8 @@ def on_close(self) -> None: for cleanup in self._cleanups or []: try: cleanup() + except (ConnectionClosed, ChannelInvalidStateError): + self.logger.warning('Process<%s>: No connection when calling cleanup method %s.', self.pid, cleanup) except Exception: # pylint: disable=broad-except self.logger.exception('Process<%s>: Exception calling cleanup method %s', self.pid, cleanup) self._cleanups = None