Skip to content

Commit

Permalink
Merge pull request #1244 from SpiNNakerManchester/run_methods
Browse files Browse the repository at this point in the history
Run methods
  • Loading branch information
Christian-B authored Dec 30, 2024
2 parents 4996480 + 80deec2 commit 4c49c31
Showing 1 changed file with 88 additions and 94 deletions.
182 changes: 88 additions & 94 deletions spinn_front_end_common/interface/abstract_spinnaker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,12 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[
:rtype: tuple(int,float) or tuple(None,None)
"""
if run_time is None:
# TODO does this make sense?
# https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(0.0)
self._data_writer.set_plan_n_timesteps(get_config_int(
"Buffers", "minimum_auto_time_steps"))
return None, None
n_machine_time_steps = self.__timesteps(run_time)
total_run_timesteps = (
Expand All @@ -505,6 +511,15 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[
total_run_time = (
total_run_timesteps *
self._data_writer.get_hardware_time_step_ms())
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(
total_run_time)

if get_config_bool("Buffers", "use_auto_pause_and_resume"):
self._data_writer.set_plan_n_timesteps(
get_config_int("Buffers", "minimum_auto_time_steps"))
else:
self._data_writer.set_plan_n_timesteps(n_machine_time_steps)

logger.info(
f"Simulating for {n_machine_time_steps} "
Expand Down Expand Up @@ -537,17 +552,7 @@ def __is_main_thread() -> bool:
"""
return threading.get_ident() == threading.main_thread().ident

def __run(self, run_time: Optional[float], sync_time: float) -> None:
"""
The main internal run function.
:param int run_time: the run duration in milliseconds.
:param int sync_time:
the time in milliseconds between synchronisations, or 0 to disable.
"""
if not self._should_run():
return

def __run_verify(self) -> None:
# verify that we can keep doing auto pause and resume
if self._data_writer.is_ran_ever():
can_keep_running = all(
Expand All @@ -559,21 +564,6 @@ def __run(self, run_time: Optional[float], sync_time: float) -> None:
"Only binaries that use the simulation interface can be"
" run more than once")

# Install the Control-C handler
if self.__is_main_thread():
signal.signal(signal.SIGINT, self.__signal_handler)
self._raise_keyboard_interrupt = True
sys.excepthook = self.__sys_excepthook

logger.info("Starting execution process")

n_machine_time_steps, total_run_time = self._calc_run_time(run_time)
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(
total_run_time or 0.0)

n_sync_steps = self.__timesteps(sync_time)

# If we have never run before, or the graph has changed,
# start by performing mapping
if (self._data_writer.get_requires_mapping() and
Expand All @@ -583,79 +573,62 @@ def __run(self, run_time: Optional[float], sync_time: float) -> None:
"The network cannot be changed between runs without"
" resetting")

def __run_control_c_handler_on(self) -> None:
# Install the Control-C handler
if self.__is_main_thread():
signal.signal(signal.SIGINT, self.__signal_handler)
self._raise_keyboard_interrupt = True
sys.excepthook = self.__sys_excepthook

def __run_control_c_handler_off(self) -> None:
# Indicate that the signal handler needs to act
if self.__is_main_thread():
self._raise_keyboard_interrupt = False
sys.excepthook = self.exception_handler

def __run_reset_sync_signal(self) -> None:
# If we have reset and the graph has changed, stop any running
# application
if (self._data_writer.get_requires_data_generation() and
self._data_writer.has_transceiver()):
self._data_writer.get_transceiver().stop_application(
self._data_writer.get_app_id())
self._data_writer.reset_sync_signal()
# build the graphs to modify with system requirements
if self._data_writer.get_requires_mapping():
if self._data_writer.is_soft_reset():
# wipe out stuff associated with past mapping
self._hard_reset()
FecTimer.setup(self)

self._add_dependent_verts_and_edges_for_application_graph()
def __run(self, run_time: Optional[float], sync_time: float):
"""
The main internal run function.
if ((get_config_bool("Buffers", "use_auto_pause_and_resume"))
or (run_time is None)):
self._data_writer.set_plan_n_timesteps(get_config_int(
"Buffers", "minimum_auto_time_steps"))
else:
self._data_writer.set_plan_n_timesteps(n_machine_time_steps)
:param int run_time: the run duration in milliseconds.
:param int sync_time:
the time in milliseconds between synchronisations, or 0 to disable.
"""
if not self._should_run():
return
logger.info("Starting execution process")
self.__run_verify()
self.__run_control_c_handler_on()
n_machine_time_steps, total_run_time = self._calc_run_time(run_time)
n_sync_steps = self.__timesteps(sync_time)
self.__run_reset_sync_signal()

self._do_mapping(total_run_time)
# build the graphs to modify with system requirements
if self._data_writer.get_requires_mapping():
self._do_mapping(total_run_time, n_machine_time_steps)

if not self._data_writer.is_ran_last():
self._do_write_metadata()

# Check if anything has per-timestep SDRAM usage
is_per_timestep_sdram = any(
placement.vertex.sdram_required.per_timestep
for placement in self._data_writer.iterate_placemements())

# Disable auto pause and resume if the binary can't do it
if not get_config_bool("Machine", "virtual_board"):
for executable_type in self._data_writer.get_executable_types():
if not executable_type.supports_auto_pause_and_resume:
set_config("Buffers", "use_auto_pause_and_resume", "False")
break

# Work out the maximum run duration given all recordings
if not self._data_writer.has_max_run_time_steps():
self._data_writer.set_max_run_time_steps(
self._deduce_data_n_timesteps())

# Work out an array of timesteps to perform
steps: Optional[Sequence[Optional[int]]] = None
if (not get_config_bool("Buffers", "use_auto_pause_and_resume")
or not is_per_timestep_sdram):
# Runs should only be in units of max_run_time_steps at most
if is_per_timestep_sdram and (
n_machine_time_steps is None
or (self._data_writer.get_max_run_time_steps()
< n_machine_time_steps)):
raise ConfigurationException(
"The SDRAM required by one or more vertices is based on "
"the run time, so the run time is limited to "
f"{self._data_writer.get_max_run_time_steps()} time steps")

steps = [n_machine_time_steps]
elif run_time is not None:
# With auto pause and resume, any time step is possible but run
# time more than the first will guarantee that run will be called
# more than once
steps = self._generate_steps(n_machine_time_steps)

# requires data_generation includes never run and requires_mapping
if self._data_writer.get_requires_data_generation():
self._do_load()

# Run for each of the given steps
if run_time is not None:
assert steps is not None
if n_machine_time_steps is not None:
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
steps = self._generate_steps(n_machine_time_steps)
else:
steps = [n_machine_time_steps]
logger.info("Running for {} steps for a total of {}ms",
len(steps), run_time)
self._data_writer.set_n_run_steps(len(steps))
Expand All @@ -664,20 +637,17 @@ def __run(self, run_time: Optional[float], sync_time: float) -> None:
logger.info(f"Run {run_step} of {len(steps)}")
self._do_run(step, n_sync_steps)
self._data_writer.clear_run_steps()
elif run_time is None and self._run_until_complete:
elif self._run_until_complete:
logger.info("Running until complete")
self._do_run(None, n_sync_steps)
else:
if is_per_timestep_sdram:
if self._data_writer.get_max_run_time_steps() < sys.maxsize:
logger.warning("Due to recording this simulation "
"should not be run longer than {}ms",
self._data_writer.get_max_run_time_steps())
logger.info("Running until stop is called by another thread")
self._do_run(None, n_sync_steps)
# Indicate that the signal handler needs to act
if self.__is_main_thread():
self._raise_keyboard_interrupt = False
sys.excepthook = self.exception_handler
self.__run_control_c_handler_off()

@final
def _add_commands_to_command_sender(
Expand Down Expand Up @@ -714,14 +684,14 @@ def _add_dependent_verts_and_edges_for_application_graph(self) -> None:
ApplicationEdge(v, dpt_vtx), edge_identifier)

@final
def _deduce_data_n_timesteps(self) -> int:
def _deduce_data_n_timesteps(
self, n_machine_time_steps: Optional[int]) -> None:
"""
Operates the auto pause and resume functionality by figuring out
how many timer ticks a simulation can run before SDRAM runs out,
and breaks simulation into chunks of that long.
:return: max time a simulation can run.
:rtype: int
"""
# Go through the placements and find how much SDRAM is used
# on each chip
Expand Down Expand Up @@ -750,9 +720,17 @@ def _deduce_data_n_timesteps(self) -> int:
max_this_chip = int((size - sdram.fixed) // sdram.per_timestep)
max_time_steps = min(max_time_steps, max_this_chip)

return max_time_steps
if not get_config_bool("Buffers", "use_auto_pause_and_resume"):
if ((n_machine_time_steps is not None) and
(max_time_steps < n_machine_time_steps)):
raise ConfigurationException(
"The SDRAM required by one or more vertices is based on "
"the run time, so the run time is limited to "
f"{max_time_steps} time steps")

def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]:
self._data_writer.set_max_run_time_steps(max_time_steps)

def _generate_steps(self, n_steps: int) -> Sequence[int]:
"""
Generates the list of "timer" runs. These are usually in terms of
time steps, but need not be.
Expand All @@ -761,7 +739,7 @@ def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]:
:return: list of time step lengths
:rtype: list(int)
"""
if n_steps is None or n_steps == 0:
if n_steps == 0:
return [0]
n_steps_per_segment = self._data_writer.get_max_run_time_steps()
n_full_iterations = int(math.floor(n_steps / n_steps_per_segment))
Expand Down Expand Up @@ -1356,8 +1334,18 @@ def _execute_locate_executable_start_type(self) -> None:
May set the executable_types data.
"""
with FecTimer("Locate executable start type", TimerWork.OTHER):
self._data_writer.set_executable_types(
locate_executable_start_type())
binary_start_types = locate_executable_start_type()
self._data_writer.set_executable_types(binary_start_types)
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
# Disable auto pause and resume if the binary can't do it
for executable_type in binary_start_types:
if not executable_type.supports_auto_pause_and_resume:
logger.warning(
"Disabling auto pause resume as graph includes {}",
executable_type)
set_config("Buffers",
"use_auto_pause_and_resume", "False")
break

@final
def _execute_buffer_manager_creator(self) -> None:
Expand Down Expand Up @@ -1392,13 +1380,18 @@ def _execute_control_sync(self, do_sync: bool) -> None:
return
self._data_writer.get_transceiver().control_sync(do_sync)

def _do_mapping(self, total_run_time: Optional[float]) -> None:
def _do_mapping(self, total_run_time: Optional[float],
n_machine_time_steps: Optional[int]) -> None:
"""
Runs, times and logs all the algorithms in the mapping stage.
:param float total_run_time:
"""
FecTimer.start_category(TimerCategory.MAPPING)
if self._data_writer.is_soft_reset():
# wipe out stuff associated with past mapping
self._hard_reset()
self._add_dependent_verts_and_edges_for_application_graph()

self._setup_java_caller()
self._do_extra_mapping_algorithms()
Expand Down Expand Up @@ -1442,6 +1435,7 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None:
self._execute_locate_executable_start_type()
self._execute_buffer_manager_creator()

self._deduce_data_n_timesteps(n_machine_time_steps)
FecTimer.end_category(TimerCategory.MAPPING)

# Overridden by spy which adds placement_order
Expand Down

0 comments on commit 4c49c31

Please sign in to comment.