From aaeb93e0dc9a30197567a5b461c5563c45422de0 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 13 Nov 2024 16:26:36 +0000 Subject: [PATCH 1/7] run_verify --- .../interface/abstract_spinnaker_base.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index da3f9b405..3ea9266fd 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -535,6 +535,18 @@ def __is_main_thread() -> bool: """ return threading.get_ident() == threading.main_thread().ident + def __run_verify(self): + # verify that we can keep doing auto pause and resume + if self._data_writer.is_ran_ever(): + can_keep_running = all( + executable_type.supports_auto_pause_and_resume + for executable_type in + self._data_writer.get_executable_types()) + if not can_keep_running: + raise NotImplementedError( + "Only binaries that use the simulation interface can be" + " run more than once") + def __run(self, run_time: Optional[float], sync_time: float): """ The main internal run function. @@ -546,16 +558,7 @@ def __run(self, run_time: Optional[float], sync_time: float): if not self._should_run(): return - # verify that we can keep doing auto pause and resume - if self._data_writer.is_ran_ever(): - can_keep_running = all( - executable_type.supports_auto_pause_and_resume - for executable_type in - self._data_writer.get_executable_types()) - if not can_keep_running: - raise NotImplementedError( - "Only binaries that use the simulation interface can be" - " run more than once") + self.__run_verify() # Install the Control-C handler if self.__is_main_thread(): From 8ef40609e3ddf0484efa041f44b43de291bd15b2 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 13 Nov 2024 16:34:41 +0000 Subject: [PATCH 2/7] run sub methods --- .../interface/abstract_spinnaker_base.py | 173 ++++++++---------- 1 file changed, 81 insertions(+), 92 deletions(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index 3ea9266fd..6d6ffe9af 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -495,6 +495,12 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[ :rtype: tuple(int,float) or tuple(None,None) """ if run_time is None: + # TODO does this make sense? + # see https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243 + if FecDataView.has_allocation_controller(): + FecDataView.get_allocation_controller().extend_allocation(0.0) + self._data_writer.set_plan_n_timesteps(get_config_int( + "Buffers", "minimum_auto_time_steps")) return None, None n_machine_time_steps = self.__timesteps(run_time) total_run_timesteps = ( @@ -503,6 +509,23 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[ total_run_time = ( total_run_timesteps * self._data_writer.get_hardware_time_step_ms()) + if FecDataView.has_allocation_controller(): + FecDataView.get_allocation_controller().extend_allocation( + total_run_time) + + if get_config_bool("Buffers", "use_auto_pause_and_resume"): + self._data_writer.set_plan_n_timesteps( + get_config_int("Buffers", "minimum_auto_time_steps")) + else: + self._data_writer.set_plan_n_timesteps(n_machine_time_steps) + + if not get_config_bool("Buffers", "use_auto_pause_and_resume"): + if (self._data_writer.get_max_run_time_steps() < + n_machine_time_steps): + raise ConfigurationException( + "The SDRAM required by one or more vertices is based on " + "the run time, so the run time is limited to " + f"{self._data_writer.get_max_run_time_steps()} time steps") logger.info( f"Simulating for {n_machine_time_steps} " @@ -535,7 +558,7 @@ def __is_main_thread() -> bool: """ return threading.get_ident() == threading.main_thread().ident - def __run_verify(self): + def __run_verify(self) -> None: # verify that we can keep doing auto pause and resume if self._data_writer.is_ran_ever(): can_keep_running = all( @@ -547,34 +570,6 @@ def __run_verify(self): "Only binaries that use the simulation interface can be" " run more than once") - def __run(self, run_time: Optional[float], sync_time: float): - """ - The main internal run function. - - :param int run_time: the run duration in milliseconds. - :param int sync_time: - the time in milliseconds between synchronisations, or 0 to disable. - """ - if not self._should_run(): - return - - self.__run_verify() - - # Install the Control-C handler - if self.__is_main_thread(): - signal.signal(signal.SIGINT, self.__signal_handler) - self._raise_keyboard_interrupt = True - sys.excepthook = self.__sys_excepthook - - logger.info("Starting execution process") - - n_machine_time_steps, total_run_time = self._calc_run_time(run_time) - if FecDataView.has_allocation_controller(): - FecDataView.get_allocation_controller().extend_allocation( - total_run_time or 0.0) - - n_sync_steps = self.__timesteps(sync_time) - # If we have never run before, or the graph has changed, # start by performing mapping if (self._data_writer.get_requires_mapping() and @@ -584,6 +579,20 @@ def __run(self, run_time: Optional[float], sync_time: float): "The network cannot be changed between runs without" " resetting") + def __run_control_c_handler_on(self) -> None: + # Install the Control-C handler + if self.__is_main_thread(): + signal.signal(signal.SIGINT, self.__signal_handler) + self._raise_keyboard_interrupt = True + sys.excepthook = self.__sys_excepthook + + def __run_control_c_handler_off(self) -> None: + # Indicate that the signal handler needs to act + if self.__is_main_thread(): + self._raise_keyboard_interrupt = False + sys.excepthook = self.exception_handler + + def __run_reset_sync_signal(self) -> None: # If we have reset and the graph has changed, stop any running # application if (self._data_writer.get_requires_data_generation() and @@ -591,72 +600,41 @@ def __run(self, run_time: Optional[float], sync_time: float): self._data_writer.get_transceiver().stop_application( self._data_writer.get_app_id()) self._data_writer.reset_sync_signal() - # build the graphs to modify with system requirements - if self._data_writer.get_requires_mapping(): - if self._data_writer.is_soft_reset(): - # wipe out stuff associated with past mapping - self._hard_reset() - FecTimer.setup(self) - self._add_dependent_verts_and_edges_for_application_graph() + def __run(self, run_time: Optional[float], sync_time: float): + """ + The main internal run function. - if ((get_config_bool("Buffers", "use_auto_pause_and_resume")) - or (run_time is None)): - self._data_writer.set_plan_n_timesteps(get_config_int( - "Buffers", "minimum_auto_time_steps")) - else: - self._data_writer.set_plan_n_timesteps(n_machine_time_steps) + :param int run_time: the run duration in milliseconds. + :param int sync_time: + the time in milliseconds between synchronisations, or 0 to disable. + """ + if not self._should_run(): + return + logger.info("Starting execution process") + self.__run_verify() + self.__run_control_c_handler_on() + n_machine_time_steps, total_run_time = self._calc_run_time(run_time) + n_sync_steps = self.__timesteps(sync_time) + self.__run_reset_sync_signal() + # build the graphs to modify with system requirements + if self._data_writer.get_requires_mapping(): self._do_mapping(total_run_time) if not self._data_writer.is_ran_last(): self._do_write_metadata() - # Check if anything has per-timestep SDRAM usage - is_per_timestep_sdram = any( - placement.vertex.sdram_required.per_timestep - for placement in self._data_writer.iterate_placemements()) - - # Disable auto pause and resume if the binary can't do it - if not get_config_bool("Machine", "virtual_board"): - for executable_type in self._data_writer.get_executable_types(): - if not executable_type.supports_auto_pause_and_resume: - set_config("Buffers", "use_auto_pause_and_resume", "False") - break - - # Work out the maximum run duration given all recordings - if not self._data_writer.has_max_run_time_steps(): - self._data_writer.set_max_run_time_steps( - self._deduce_data_n_timesteps()) - - # Work out an array of timesteps to perform - steps: Optional[Sequence[Optional[int]]] = None - if (not get_config_bool("Buffers", "use_auto_pause_and_resume") - or not is_per_timestep_sdram): - # Runs should only be in units of max_run_time_steps at most - if is_per_timestep_sdram and ( - n_machine_time_steps is None - or (self._data_writer.get_max_run_time_steps() - < n_machine_time_steps)): - raise ConfigurationException( - "The SDRAM required by one or more vertices is based on " - "the run time, so the run time is limited to " - f"{self._data_writer.get_max_run_time_steps()} time steps") - - steps = [n_machine_time_steps] - elif run_time is not None: - # With auto pause and resume, any time step is possible but run - # time more than the first will guarantee that run will be called - # more than once - steps = self._generate_steps(n_machine_time_steps) - # requires data_generation includes never run and requires_mapping if self._data_writer.get_requires_data_generation(): self._do_load() # Run for each of the given steps if run_time is not None: - assert steps is not None + if get_config_bool("Buffers", "use_auto_pause_and_resume"): + steps = self._generate_steps(n_machine_time_steps) + else: + steps = [n_machine_time_steps] logger.info("Running for {} steps for a total of {}ms", len(steps), run_time) self._data_writer.set_n_run_steps(len(steps)) @@ -665,20 +643,17 @@ def __run(self, run_time: Optional[float], sync_time: float): logger.info(f"Run {run_step} of {len(steps)}") self._do_run(step, n_sync_steps) self._data_writer.clear_run_steps() - elif run_time is None and self._run_until_complete: + elif self._run_until_complete: logger.info("Running until complete") self._do_run(None, n_sync_steps) else: - if is_per_timestep_sdram: + if self._data_writer.get_max_run_time_steps() < sys.maxsize: logger.warning("Due to recording this simulation " "should not be run longer than {}ms", self._data_writer.get_max_run_time_steps()) logger.info("Running until stop is called by another thread") self._do_run(None, n_sync_steps) - # Indicate that the signal handler needs to act - if self.__is_main_thread(): - self._raise_keyboard_interrupt = False - sys.excepthook = self.exception_handler + self.__run_control_c_handler_off() @final def _add_commands_to_command_sender(self, system_placements: Placements): @@ -714,14 +689,13 @@ def _add_dependent_verts_and_edges_for_application_graph(self) -> None: ApplicationEdge(v, dpt_vtx), edge_identifier) @final - def _deduce_data_n_timesteps(self) -> int: + def _deduce_data_n_timesteps(self) -> None: """ Operates the auto pause and resume functionality by figuring out how many timer ticks a simulation can run before SDRAM runs out, and breaks simulation into chunks of that long. :return: max time a simulation can run. - :rtype: int """ # Go through the placements and find how much SDRAM is used # on each chip @@ -750,7 +724,7 @@ def _deduce_data_n_timesteps(self) -> int: max_this_chip = int((size - sdram.fixed) // sdram.per_timestep) max_time_steps = min(max_time_steps, max_this_chip) - return max_time_steps + self._data_writer.set_max_run_time_steps(max_time_steps) def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]: """ @@ -1352,8 +1326,18 @@ def _execute_locate_executable_start_type(self) -> None: May set the executable_types data. """ with FecTimer("Locate executable start type", TimerWork.OTHER): - self._data_writer.set_executable_types( - locate_executable_start_type()) + binary_start_types = locate_executable_start_type() + self._data_writer.set_executable_types(binary_start_types) + if get_config_bool("Buffers", "use_auto_pause_and_resume"): + # Disable auto pause and resume if the binary can't do it + for executable_type in binary_start_types: + if not executable_type.supports_auto_pause_and_resume: + logger.warning( + "Disabling auto pause resume as graph includes {}", + executable_type) + set_config("Buffers", + "use_auto_pause_and_resume", "False") + break @final def _execute_buffer_manager_creator(self) -> None: @@ -1395,6 +1379,10 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None: :param float total_run_time: """ FecTimer.start_category(TimerCategory.MAPPING) + if self._data_writer.is_soft_reset(): + # wipe out stuff associated with past mapping + self._hard_reset() + self._add_dependent_verts_and_edges_for_application_graph() self._setup_java_caller() self._do_extra_mapping_algorithms() @@ -1438,6 +1426,7 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None: self._execute_locate_executable_start_type() self._execute_buffer_manager_creator() + self._deduce_data_n_timesteps() FecTimer.end_category(TimerCategory.MAPPING) # Overridden by spy which adds placement_order From d2f046bf1a822d40fa2e55d07fef35c509c3500c Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Thu, 14 Nov 2024 09:49:25 +0000 Subject: [PATCH 3/7] flake8 --- spinn_front_end_common/interface/abstract_spinnaker_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index 6d6ffe9af..d88057ebd 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -496,7 +496,7 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[ """ if run_time is None: # TODO does this make sense? - # see https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243 + # https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243 if FecDataView.has_allocation_controller(): FecDataView.get_allocation_controller().extend_allocation(0.0) self._data_writer.set_plan_n_timesteps(get_config_int( From b173ad1a9d3e6b48ed9eae4fceb90e046cd87d56 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Thu, 14 Nov 2024 10:00:30 +0000 Subject: [PATCH 4/7] change for better typing --- spinn_front_end_common/interface/abstract_spinnaker_base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index d88057ebd..14aecce55 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -630,7 +630,7 @@ def __run(self, run_time: Optional[float], sync_time: float): self._do_load() # Run for each of the given steps - if run_time is not None: + if n_machine_time_steps is not None: if get_config_bool("Buffers", "use_auto_pause_and_resume"): steps = self._generate_steps(n_machine_time_steps) else: @@ -726,7 +726,7 @@ def _deduce_data_n_timesteps(self) -> None: self._data_writer.set_max_run_time_steps(max_time_steps) - def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]: + def _generate_steps(self, n_steps: int) -> Sequence[int]: """ Generates the list of "timer" runs. These are usually in terms of time steps, but need not be. @@ -735,7 +735,7 @@ def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]: :return: list of time step lengths :rtype: list(int) """ - if n_steps is None or n_steps == 0: + if n_steps == 0: return [0] n_steps_per_segment = self._data_writer.get_max_run_time_steps() n_full_iterations = int(math.floor(n_steps / n_steps_per_segment)) From 53eb9f4f74d67db999c26c53a41f51c6f3d831e0 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 18 Dec 2024 08:04:19 +0000 Subject: [PATCH 5/7] delay safetly check until data is available --- .../interface/abstract_spinnaker_base.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index 14aecce55..c2aca4735 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -519,14 +519,6 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[ else: self._data_writer.set_plan_n_timesteps(n_machine_time_steps) - if not get_config_bool("Buffers", "use_auto_pause_and_resume"): - if (self._data_writer.get_max_run_time_steps() < - n_machine_time_steps): - raise ConfigurationException( - "The SDRAM required by one or more vertices is based on " - "the run time, so the run time is limited to " - f"{self._data_writer.get_max_run_time_steps()} time steps") - logger.info( f"Simulating for {n_machine_time_steps} " f"{self._data_writer.get_simulation_time_step_ms()} ms timesteps " @@ -620,7 +612,7 @@ def __run(self, run_time: Optional[float], sync_time: float): # build the graphs to modify with system requirements if self._data_writer.get_requires_mapping(): - self._do_mapping(total_run_time) + self._do_mapping(total_run_time, n_machine_time_steps) if not self._data_writer.is_ran_last(): self._do_write_metadata() @@ -689,7 +681,8 @@ def _add_dependent_verts_and_edges_for_application_graph(self) -> None: ApplicationEdge(v, dpt_vtx), edge_identifier) @final - def _deduce_data_n_timesteps(self) -> None: + def _deduce_data_n_timesteps( + self, n_machine_time_steps: Optional[int]) -> None: """ Operates the auto pause and resume functionality by figuring out how many timer ticks a simulation can run before SDRAM runs out, @@ -724,6 +717,13 @@ def _deduce_data_n_timesteps(self) -> None: max_this_chip = int((size - sdram.fixed) // sdram.per_timestep) max_time_steps = min(max_time_steps, max_this_chip) + if not get_config_bool("Buffers", "use_auto_pause_and_resume"): + if (max_time_steps < n_machine_time_steps): + raise ConfigurationException( + "The SDRAM required by one or more vertices is based on " + "the run time, so the run time is limited to " + f"{max_time_steps} time steps") + self._data_writer.set_max_run_time_steps(max_time_steps) def _generate_steps(self, n_steps: int) -> Sequence[int]: @@ -1372,7 +1372,8 @@ def _execute_control_sync(self, do_sync: bool) -> None: return self._data_writer.get_transceiver().control_sync(do_sync) - def _do_mapping(self, total_run_time: Optional[float]) -> None: + def _do_mapping(self, total_run_time: Optional[float], + n_machine_time_steps: Optional[int]) -> None: """ Runs, times and logs all the algorithms in the mapping stage. @@ -1426,7 +1427,7 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None: self._execute_locate_executable_start_type() self._execute_buffer_manager_creator() - self._deduce_data_n_timesteps() + self._deduce_data_n_timesteps(n_machine_time_steps) FecTimer.end_category(TimerCategory.MAPPING) # Overridden by spy which adds placement_order From f1eca1f76a8b643f02b55ccbb5ba704452b8de1b Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 18 Dec 2024 09:26:54 +0000 Subject: [PATCH 6/7] handle None --- spinn_front_end_common/interface/abstract_spinnaker_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index c2aca4735..c128bbcfe 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -718,7 +718,8 @@ def _deduce_data_n_timesteps( max_time_steps = min(max_time_steps, max_this_chip) if not get_config_bool("Buffers", "use_auto_pause_and_resume"): - if (max_time_steps < n_machine_time_steps): + if ((n_machine_time_steps is not None and) + (max_time_steps < n_machine_time_steps)): raise ConfigurationException( "The SDRAM required by one or more vertices is based on " "the run time, so the run time is limited to " From 3d3f6fd2ea7b4ea548243b677d5ff27cc40b78ea Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 18 Dec 2024 09:34:57 +0000 Subject: [PATCH 7/7] fix bracket --- spinn_front_end_common/interface/abstract_spinnaker_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index c128bbcfe..6049068ca 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -718,7 +718,7 @@ def _deduce_data_n_timesteps( max_time_steps = min(max_time_steps, max_this_chip) if not get_config_bool("Buffers", "use_auto_pause_and_resume"): - if ((n_machine_time_steps is not None and) + if ((n_machine_time_steps is not None) and (max_time_steps < n_machine_time_steps)): raise ConfigurationException( "The SDRAM required by one or more vertices is based on "