Skip to content

Commit

Permalink
Process: Catch ChannelInvalidStateError during init and cleanup
Browse files Browse the repository at this point in the history
During `Process.init`, the process registers itself as an RPC and
broadcast subscriber. These calls can fail with a `ConnectionClosed` or
`ChannelInvalidStateError` exception if the connection to RabbitMQ fails.
The success of these calls is not crucial for the process running to
completion and we don't want them excepting to cause the entire process
to except. Therefore the exceptions are caught and simply logged as
warnings.

In the `Process.init` method, there are also cleanups registered to
remove the process as an RPC and broadcast subscriber. These can
similarly except and so the cleanup callbacks are likewise updated to
catch and log these exceptions.
  • Loading branch information
sphuber committed Dec 14, 2023
1 parent ff5770f commit 009b292
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ def init(self) -> None:
try:
identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
except (ConnectionClosed, ChannelInvalidStateError):
self.logger.warning('Process<%s>: no connection available to register as an RPC subscriber.', self.pid)
except kiwipy.TimeoutError:
self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

Expand All @@ -316,6 +318,10 @@ def init(self) -> None:
subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*'))
identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid))
self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
except (ConnectionClosed, ChannelInvalidStateError):
self.logger.warning(
'Process<%s>: no connection available to register as a broadcast subscriber.', self.pid
)
except kiwipy.TimeoutError:
self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)

Expand Down Expand Up @@ -886,6 +892,8 @@ def on_close(self) -> None:
for cleanup in self._cleanups or []:
try:
cleanup()
except (ConnectionClosed, ChannelInvalidStateError):
self.logger.warning('Process<%s>: No connection when calling cleanup method %s.', self.pid, cleanup)
except Exception: # pylint: disable=broad-except
self.logger.exception('Process<%s>: Exception calling cleanup method %s', self.pid, cleanup)
self._cleanups = None
Expand Down

0 comments on commit 009b292

Please sign in to comment.