Skip to content

Commit

Permalink
Also ensure SHUTDOWN_WAIT happens for Python components
Browse files Browse the repository at this point in the history
  • Loading branch information
LourensVeen committed Dec 18, 2023
1 parent 891fe31 commit c8ee1e5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 28 deletions.
63 changes: 36 additions & 27 deletions libmuscle/python/libmuscle/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,38 +810,47 @@ def _decide_reuse_instance(self) -> bool:
elif self._first_run:
self._first_run = False

# resume from intermediate
do_reuse = False

# only actually happens if we don't reuse, start recording just in case
sw_event = ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, ProfileTimestamp())

f_init_connected = self._have_f_init_connections()
if self._first_run and self._snapshot_manager.resuming_from_intermediate():
# resume from intermediate
self._do_resume = True
self._do_init = False
return True

f_init_connected = self._have_f_init_connections()

# resume from final
if self._first_run and self._snapshot_manager.resuming_from_final():
do_reuse = True
elif self._first_run and self._snapshot_manager.resuming_from_final():
# resume from final
if f_init_connected:
got_f_init_messages = self._pre_receive()
no_closed_ports = self._pre_receive()
self._do_resume = True
self._do_init = True
return got_f_init_messages
do_reuse = no_closed_ports
else:
self._do_resume = False # unused
self._do_init = False # unused
return False
do_reuse = False
else:
# fresh start or resuming from implicit snapshot
self._do_resume = False

# fresh start or resuming from implicit snapshot
self._do_resume = False
no_closed_ports = self._pre_receive()

# simple straight single run without resuming
if not f_init_connected:
self._do_init = self._first_run
return self._first_run
if not f_init_connected:
# simple straight single run without resuming
self._do_init = self._first_run
do_reuse = self._first_run
else:
# not resuming and f_init connected, run while we get messages
self._do_init = no_closed_ports
do_reuse = no_closed_ports

# not resuming and f_init connected, run while we get messages
got_f_init_messages = self._pre_receive()
self._do_init = got_f_init_messages
return got_f_init_messages
if not do_reuse:
self._profiler.record_event(sw_event)

return do_reuse

def _save_snapshot(
self, message: Optional[Message], final: bool,
Expand Down Expand Up @@ -1005,17 +1014,12 @@ def _pre_receive(self) -> bool:
Returns:
True iff no ClosePort messages were received.
"""
sw_event = ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, ProfileTimestamp())

all_ports_open = self.__receive_settings()
self.__pre_receive_f_init()
for message in self._f_init_cache.values():
if isinstance(message.data, ClosePort):
all_ports_open = False

if not all_ports_open:
self._profiler.record_event(sw_event)

return all_ports_open

def __receive_settings(self) -> bool:
Expand All @@ -1024,11 +1028,16 @@ def __receive_settings(self) -> bool:
Returns:
False iff the port is connnected and ClosePort was received.
"""
default_message = Message(0.0, None, Settings(), Settings())
if not self._communicator.settings_in_connected():
self._settings_manager.overlay = Settings()
return True

message, saved_until = self._communicator.receive_message(
'muscle_settings_in', None, default_message)
'muscle_settings_in', None)

if isinstance(message.data, ClosePort):
return False

if not isinstance(message.data, Settings):
err_msg = ('"{}" received a message on'
' muscle_settings_in that is not a'
Expand Down
3 changes: 2 additions & 1 deletion libmuscle/python/libmuscle/test/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ def get_port(name):
msg = Message(0.0, 1.0, 'message')
msg_with_settings = Message(0.0, 1.0, 'message', overlay_settings)

def receive_message(name, slot, default):
def receive_message(name, slot, default=None):
if 'not_connected' in name:
assert default is not None
return default, 10.0
if 'settings' in name:
return msg_with_settings, 10.0
Expand Down

0 comments on commit c8ee1e5

Please sign in to comment.