From 8ef40609e3ddf0484efa041f44b43de291bd15b2 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Wed, 13 Nov 2024 16:34:41 +0000 Subject: [PATCH] run sub methods --- .../interface/abstract_spinnaker_base.py | 173 ++++++++---------- 1 file changed, 81 insertions(+), 92 deletions(-) diff --git a/spinn_front_end_common/interface/abstract_spinnaker_base.py b/spinn_front_end_common/interface/abstract_spinnaker_base.py index 3ea9266fd..6d6ffe9af 100644 --- a/spinn_front_end_common/interface/abstract_spinnaker_base.py +++ b/spinn_front_end_common/interface/abstract_spinnaker_base.py @@ -495,6 +495,12 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[ :rtype: tuple(int,float) or tuple(None,None) """ if run_time is None: + # TODO does this make sense? + # see https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243 + if FecDataView.has_allocation_controller(): + FecDataView.get_allocation_controller().extend_allocation(0.0) + self._data_writer.set_plan_n_timesteps(get_config_int( + "Buffers", "minimum_auto_time_steps")) return None, None n_machine_time_steps = self.__timesteps(run_time) total_run_timesteps = ( @@ -503,6 +509,23 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[ total_run_time = ( total_run_timesteps * self._data_writer.get_hardware_time_step_ms()) + if FecDataView.has_allocation_controller(): + FecDataView.get_allocation_controller().extend_allocation( + total_run_time) + + if get_config_bool("Buffers", "use_auto_pause_and_resume"): + self._data_writer.set_plan_n_timesteps( + get_config_int("Buffers", "minimum_auto_time_steps")) + else: + self._data_writer.set_plan_n_timesteps(n_machine_time_steps) + + if not get_config_bool("Buffers", "use_auto_pause_and_resume"): + if (self._data_writer.get_max_run_time_steps() < + n_machine_time_steps): + raise ConfigurationException( + "The SDRAM required by one or more vertices is based on " + "the run time, so the run time is limited to " + f"{self._data_writer.get_max_run_time_steps()} time steps") logger.info( f"Simulating for {n_machine_time_steps} " @@ -535,7 +558,7 @@ def __is_main_thread() -> bool: """ return threading.get_ident() == threading.main_thread().ident - def __run_verify(self): + def __run_verify(self) -> None: # verify that we can keep doing auto pause and resume if self._data_writer.is_ran_ever(): can_keep_running = all( @@ -547,34 +570,6 @@ def __run_verify(self): "Only binaries that use the simulation interface can be" " run more than once") - def __run(self, run_time: Optional[float], sync_time: float): - """ - The main internal run function. - - :param int run_time: the run duration in milliseconds. - :param int sync_time: - the time in milliseconds between synchronisations, or 0 to disable. - """ - if not self._should_run(): - return - - self.__run_verify() - - # Install the Control-C handler - if self.__is_main_thread(): - signal.signal(signal.SIGINT, self.__signal_handler) - self._raise_keyboard_interrupt = True - sys.excepthook = self.__sys_excepthook - - logger.info("Starting execution process") - - n_machine_time_steps, total_run_time = self._calc_run_time(run_time) - if FecDataView.has_allocation_controller(): - FecDataView.get_allocation_controller().extend_allocation( - total_run_time or 0.0) - - n_sync_steps = self.__timesteps(sync_time) - # If we have never run before, or the graph has changed, # start by performing mapping if (self._data_writer.get_requires_mapping() and @@ -584,6 +579,20 @@ def __run(self, run_time: Optional[float], sync_time: float): "The network cannot be changed between runs without" " resetting") + def __run_control_c_handler_on(self) -> None: + # Install the Control-C handler + if self.__is_main_thread(): + signal.signal(signal.SIGINT, self.__signal_handler) + self._raise_keyboard_interrupt = True + sys.excepthook = self.__sys_excepthook + + def __run_control_c_handler_off(self) -> None: + # Indicate that the signal handler needs to act + if self.__is_main_thread(): + self._raise_keyboard_interrupt = False + sys.excepthook = self.exception_handler + + def __run_reset_sync_signal(self) -> None: # If we have reset and the graph has changed, stop any running # application if (self._data_writer.get_requires_data_generation() and @@ -591,72 +600,41 @@ def __run(self, run_time: Optional[float], sync_time: float): self._data_writer.get_transceiver().stop_application( self._data_writer.get_app_id()) self._data_writer.reset_sync_signal() - # build the graphs to modify with system requirements - if self._data_writer.get_requires_mapping(): - if self._data_writer.is_soft_reset(): - # wipe out stuff associated with past mapping - self._hard_reset() - FecTimer.setup(self) - self._add_dependent_verts_and_edges_for_application_graph() + def __run(self, run_time: Optional[float], sync_time: float): + """ + The main internal run function. - if ((get_config_bool("Buffers", "use_auto_pause_and_resume")) - or (run_time is None)): - self._data_writer.set_plan_n_timesteps(get_config_int( - "Buffers", "minimum_auto_time_steps")) - else: - self._data_writer.set_plan_n_timesteps(n_machine_time_steps) + :param int run_time: the run duration in milliseconds. + :param int sync_time: + the time in milliseconds between synchronisations, or 0 to disable. + """ + if not self._should_run(): + return + logger.info("Starting execution process") + self.__run_verify() + self.__run_control_c_handler_on() + n_machine_time_steps, total_run_time = self._calc_run_time(run_time) + n_sync_steps = self.__timesteps(sync_time) + self.__run_reset_sync_signal() + # build the graphs to modify with system requirements + if self._data_writer.get_requires_mapping(): self._do_mapping(total_run_time) if not self._data_writer.is_ran_last(): self._do_write_metadata() - # Check if anything has per-timestep SDRAM usage - is_per_timestep_sdram = any( - placement.vertex.sdram_required.per_timestep - for placement in self._data_writer.iterate_placemements()) - - # Disable auto pause and resume if the binary can't do it - if not get_config_bool("Machine", "virtual_board"): - for executable_type in self._data_writer.get_executable_types(): - if not executable_type.supports_auto_pause_and_resume: - set_config("Buffers", "use_auto_pause_and_resume", "False") - break - - # Work out the maximum run duration given all recordings - if not self._data_writer.has_max_run_time_steps(): - self._data_writer.set_max_run_time_steps( - self._deduce_data_n_timesteps()) - - # Work out an array of timesteps to perform - steps: Optional[Sequence[Optional[int]]] = None - if (not get_config_bool("Buffers", "use_auto_pause_and_resume") - or not is_per_timestep_sdram): - # Runs should only be in units of max_run_time_steps at most - if is_per_timestep_sdram and ( - n_machine_time_steps is None - or (self._data_writer.get_max_run_time_steps() - < n_machine_time_steps)): - raise ConfigurationException( - "The SDRAM required by one or more vertices is based on " - "the run time, so the run time is limited to " - f"{self._data_writer.get_max_run_time_steps()} time steps") - - steps = [n_machine_time_steps] - elif run_time is not None: - # With auto pause and resume, any time step is possible but run - # time more than the first will guarantee that run will be called - # more than once - steps = self._generate_steps(n_machine_time_steps) - # requires data_generation includes never run and requires_mapping if self._data_writer.get_requires_data_generation(): self._do_load() # Run for each of the given steps if run_time is not None: - assert steps is not None + if get_config_bool("Buffers", "use_auto_pause_and_resume"): + steps = self._generate_steps(n_machine_time_steps) + else: + steps = [n_machine_time_steps] logger.info("Running for {} steps for a total of {}ms", len(steps), run_time) self._data_writer.set_n_run_steps(len(steps)) @@ -665,20 +643,17 @@ def __run(self, run_time: Optional[float], sync_time: float): logger.info(f"Run {run_step} of {len(steps)}") self._do_run(step, n_sync_steps) self._data_writer.clear_run_steps() - elif run_time is None and self._run_until_complete: + elif self._run_until_complete: logger.info("Running until complete") self._do_run(None, n_sync_steps) else: - if is_per_timestep_sdram: + if self._data_writer.get_max_run_time_steps() < sys.maxsize: logger.warning("Due to recording this simulation " "should not be run longer than {}ms", self._data_writer.get_max_run_time_steps()) logger.info("Running until stop is called by another thread") self._do_run(None, n_sync_steps) - # Indicate that the signal handler needs to act - if self.__is_main_thread(): - self._raise_keyboard_interrupt = False - sys.excepthook = self.exception_handler + self.__run_control_c_handler_off() @final def _add_commands_to_command_sender(self, system_placements: Placements): @@ -714,14 +689,13 @@ def _add_dependent_verts_and_edges_for_application_graph(self) -> None: ApplicationEdge(v, dpt_vtx), edge_identifier) @final - def _deduce_data_n_timesteps(self) -> int: + def _deduce_data_n_timesteps(self) -> None: """ Operates the auto pause and resume functionality by figuring out how many timer ticks a simulation can run before SDRAM runs out, and breaks simulation into chunks of that long. :return: max time a simulation can run. - :rtype: int """ # Go through the placements and find how much SDRAM is used # on each chip @@ -750,7 +724,7 @@ def _deduce_data_n_timesteps(self) -> int: max_this_chip = int((size - sdram.fixed) // sdram.per_timestep) max_time_steps = min(max_time_steps, max_this_chip) - return max_time_steps + self._data_writer.set_max_run_time_steps(max_time_steps) def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]: """ @@ -1352,8 +1326,18 @@ def _execute_locate_executable_start_type(self) -> None: May set the executable_types data. """ with FecTimer("Locate executable start type", TimerWork.OTHER): - self._data_writer.set_executable_types( - locate_executable_start_type()) + binary_start_types = locate_executable_start_type() + self._data_writer.set_executable_types(binary_start_types) + if get_config_bool("Buffers", "use_auto_pause_and_resume"): + # Disable auto pause and resume if the binary can't do it + for executable_type in binary_start_types: + if not executable_type.supports_auto_pause_and_resume: + logger.warning( + "Disabling auto pause resume as graph includes {}", + executable_type) + set_config("Buffers", + "use_auto_pause_and_resume", "False") + break @final def _execute_buffer_manager_creator(self) -> None: @@ -1395,6 +1379,10 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None: :param float total_run_time: """ FecTimer.start_category(TimerCategory.MAPPING) + if self._data_writer.is_soft_reset(): + # wipe out stuff associated with past mapping + self._hard_reset() + self._add_dependent_verts_and_edges_for_application_graph() self._setup_java_caller() self._do_extra_mapping_algorithms() @@ -1438,6 +1426,7 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None: self._execute_locate_executable_start_type() self._execute_buffer_manager_creator() + self._deduce_data_n_timesteps() FecTimer.end_category(TimerCategory.MAPPING) # Overridden by spy which adds placement_order