diff --git a/libmuscle/python/libmuscle/instance.py b/libmuscle/python/libmuscle/instance.py index f4aa85fb..7a033dd4 100644 --- a/libmuscle/python/libmuscle/instance.py +++ b/libmuscle/python/libmuscle/instance.py @@ -810,38 +810,47 @@ def _decide_reuse_instance(self) -> bool: elif self._first_run: self._first_run = False - # resume from intermediate + do_reuse = False + + # only actually happens if we don't reuse, start recording just in case + sw_event = ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, ProfileTimestamp()) + + f_init_connected = self._have_f_init_connections() if self._first_run and self._snapshot_manager.resuming_from_intermediate(): + # resume from intermediate self._do_resume = True self._do_init = False - return True - - f_init_connected = self._have_f_init_connections() - - # resume from final - if self._first_run and self._snapshot_manager.resuming_from_final(): + do_reuse = True + elif self._first_run and self._snapshot_manager.resuming_from_final(): + # resume from final if f_init_connected: - got_f_init_messages = self._pre_receive() + no_closed_ports = self._pre_receive() self._do_resume = True self._do_init = True - return got_f_init_messages + do_reuse = no_closed_ports else: self._do_resume = False # unused self._do_init = False # unused - return False + do_reuse = False + else: + # fresh start or resuming from implicit snapshot + self._do_resume = False - # fresh start or resuming from implicit snapshot - self._do_resume = False + no_closed_ports = self._pre_receive() - # simple straight single run without resuming - if not f_init_connected: - self._do_init = self._first_run - return self._first_run + if not f_init_connected: + # simple straight single run without resuming + self._do_init = self._first_run + do_reuse = self._first_run + else: + # not resuming and f_init connected, run while we get messages + self._do_init = no_closed_ports + do_reuse = no_closed_ports - # not resuming and f_init connected, run while we get messages - got_f_init_messages = self._pre_receive() - self._do_init = got_f_init_messages - return got_f_init_messages + if not do_reuse: + self._profiler.record_event(sw_event) + + return do_reuse def _save_snapshot( self, message: Optional[Message], final: bool, @@ -1005,17 +1014,12 @@ def _pre_receive(self) -> bool: Returns: True iff no ClosePort messages were received. """ - sw_event = ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, ProfileTimestamp()) - all_ports_open = self.__receive_settings() self.__pre_receive_f_init() for message in self._f_init_cache.values(): if isinstance(message.data, ClosePort): all_ports_open = False - if not all_ports_open: - self._profiler.record_event(sw_event) - return all_ports_open def __receive_settings(self) -> bool: @@ -1024,11 +1028,16 @@ def __receive_settings(self) -> bool: Returns: False iff the port is connnected and ClosePort was received. """ - default_message = Message(0.0, None, Settings(), Settings()) + if not self._communicator.settings_in_connected(): + self._settings_manager.overlay = Settings() + return True + message, saved_until = self._communicator.receive_message( - 'muscle_settings_in', None, default_message) + 'muscle_settings_in', None) + if isinstance(message.data, ClosePort): return False + if not isinstance(message.data, Settings): err_msg = ('"{}" received a message on' ' muscle_settings_in that is not a' diff --git a/libmuscle/python/libmuscle/test/test_instance.py b/libmuscle/python/libmuscle/test/test_instance.py index f500dd64..1a15c90d 100644 --- a/libmuscle/python/libmuscle/test/test_instance.py +++ b/libmuscle/python/libmuscle/test/test_instance.py @@ -78,8 +78,9 @@ def get_port(name): msg = Message(0.0, 1.0, 'message') msg_with_settings = Message(0.0, 1.0, 'message', overlay_settings) - def receive_message(name, slot, default): + def receive_message(name, slot, default=None): if 'not_connected' in name: + assert default is not None return default, 10.0 if 'settings' in name: return msg_with_settings, 10.0