Skip to content

Commit

Permalink
run sub methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian-B committed Nov 14, 2024
1 parent aaeb93e commit 8ef4060
Showing 1 changed file with 81 additions and 92 deletions.
173 changes: 81 additions & 92 deletions spinn_front_end_common/interface/abstract_spinnaker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,12 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[
:rtype: tuple(int,float) or tuple(None,None)
"""
if run_time is None:
# TODO does this make sense?
# see https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(0.0)
self._data_writer.set_plan_n_timesteps(get_config_int(
"Buffers", "minimum_auto_time_steps"))
return None, None
n_machine_time_steps = self.__timesteps(run_time)
total_run_timesteps = (
Expand All @@ -503,6 +509,23 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[
total_run_time = (
total_run_timesteps *
self._data_writer.get_hardware_time_step_ms())
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(
total_run_time)

if get_config_bool("Buffers", "use_auto_pause_and_resume"):
self._data_writer.set_plan_n_timesteps(
get_config_int("Buffers", "minimum_auto_time_steps"))
else:
self._data_writer.set_plan_n_timesteps(n_machine_time_steps)

if not get_config_bool("Buffers", "use_auto_pause_and_resume"):
if (self._data_writer.get_max_run_time_steps() <
n_machine_time_steps):
raise ConfigurationException(
"The SDRAM required by one or more vertices is based on "
"the run time, so the run time is limited to "
f"{self._data_writer.get_max_run_time_steps()} time steps")

logger.info(
f"Simulating for {n_machine_time_steps} "
Expand Down Expand Up @@ -535,7 +558,7 @@ def __is_main_thread() -> bool:
"""
return threading.get_ident() == threading.main_thread().ident

def __run_verify(self):
def __run_verify(self) -> None:
# verify that we can keep doing auto pause and resume
if self._data_writer.is_ran_ever():
can_keep_running = all(
Expand All @@ -547,34 +570,6 @@ def __run_verify(self):
"Only binaries that use the simulation interface can be"
" run more than once")

def __run(self, run_time: Optional[float], sync_time: float):
"""
The main internal run function.
:param int run_time: the run duration in milliseconds.
:param int sync_time:
the time in milliseconds between synchronisations, or 0 to disable.
"""
if not self._should_run():
return

self.__run_verify()

# Install the Control-C handler
if self.__is_main_thread():
signal.signal(signal.SIGINT, self.__signal_handler)
self._raise_keyboard_interrupt = True
sys.excepthook = self.__sys_excepthook

logger.info("Starting execution process")

n_machine_time_steps, total_run_time = self._calc_run_time(run_time)
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(
total_run_time or 0.0)

n_sync_steps = self.__timesteps(sync_time)

# If we have never run before, or the graph has changed,
# start by performing mapping
if (self._data_writer.get_requires_mapping() and
Expand All @@ -584,79 +579,62 @@ def __run(self, run_time: Optional[float], sync_time: float):
"The network cannot be changed between runs without"
" resetting")

def __run_control_c_handler_on(self) -> None:
# Install the Control-C handler
if self.__is_main_thread():
signal.signal(signal.SIGINT, self.__signal_handler)
self._raise_keyboard_interrupt = True
sys.excepthook = self.__sys_excepthook

def __run_control_c_handler_off(self) -> None:
# Indicate that the signal handler needs to act
if self.__is_main_thread():
self._raise_keyboard_interrupt = False
sys.excepthook = self.exception_handler

def __run_reset_sync_signal(self) -> None:
# If we have reset and the graph has changed, stop any running
# application
if (self._data_writer.get_requires_data_generation() and
self._data_writer.has_transceiver()):
self._data_writer.get_transceiver().stop_application(
self._data_writer.get_app_id())
self._data_writer.reset_sync_signal()
# build the graphs to modify with system requirements
if self._data_writer.get_requires_mapping():
if self._data_writer.is_soft_reset():
# wipe out stuff associated with past mapping
self._hard_reset()
FecTimer.setup(self)

self._add_dependent_verts_and_edges_for_application_graph()
def __run(self, run_time: Optional[float], sync_time: float):
"""
The main internal run function.
if ((get_config_bool("Buffers", "use_auto_pause_and_resume"))
or (run_time is None)):
self._data_writer.set_plan_n_timesteps(get_config_int(
"Buffers", "minimum_auto_time_steps"))
else:
self._data_writer.set_plan_n_timesteps(n_machine_time_steps)
:param int run_time: the run duration in milliseconds.
:param int sync_time:
the time in milliseconds between synchronisations, or 0 to disable.
"""
if not self._should_run():
return
logger.info("Starting execution process")
self.__run_verify()
self.__run_control_c_handler_on()
n_machine_time_steps, total_run_time = self._calc_run_time(run_time)
n_sync_steps = self.__timesteps(sync_time)
self.__run_reset_sync_signal()

# build the graphs to modify with system requirements
if self._data_writer.get_requires_mapping():
self._do_mapping(total_run_time)

if not self._data_writer.is_ran_last():
self._do_write_metadata()

# Check if anything has per-timestep SDRAM usage
is_per_timestep_sdram = any(
placement.vertex.sdram_required.per_timestep
for placement in self._data_writer.iterate_placemements())

# Disable auto pause and resume if the binary can't do it
if not get_config_bool("Machine", "virtual_board"):
for executable_type in self._data_writer.get_executable_types():
if not executable_type.supports_auto_pause_and_resume:
set_config("Buffers", "use_auto_pause_and_resume", "False")
break

# Work out the maximum run duration given all recordings
if not self._data_writer.has_max_run_time_steps():
self._data_writer.set_max_run_time_steps(
self._deduce_data_n_timesteps())

# Work out an array of timesteps to perform
steps: Optional[Sequence[Optional[int]]] = None
if (not get_config_bool("Buffers", "use_auto_pause_and_resume")
or not is_per_timestep_sdram):
# Runs should only be in units of max_run_time_steps at most
if is_per_timestep_sdram and (
n_machine_time_steps is None
or (self._data_writer.get_max_run_time_steps()
< n_machine_time_steps)):
raise ConfigurationException(
"The SDRAM required by one or more vertices is based on "
"the run time, so the run time is limited to "
f"{self._data_writer.get_max_run_time_steps()} time steps")

steps = [n_machine_time_steps]
elif run_time is not None:
# With auto pause and resume, any time step is possible but run
# time more than the first will guarantee that run will be called
# more than once
steps = self._generate_steps(n_machine_time_steps)

# requires data_generation includes never run and requires_mapping
if self._data_writer.get_requires_data_generation():
self._do_load()

# Run for each of the given steps
if run_time is not None:
assert steps is not None
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
steps = self._generate_steps(n_machine_time_steps)
else:
steps = [n_machine_time_steps]
logger.info("Running for {} steps for a total of {}ms",
len(steps), run_time)
self._data_writer.set_n_run_steps(len(steps))
Expand All @@ -665,20 +643,17 @@ def __run(self, run_time: Optional[float], sync_time: float):
logger.info(f"Run {run_step} of {len(steps)}")
self._do_run(step, n_sync_steps)
self._data_writer.clear_run_steps()
elif run_time is None and self._run_until_complete:
elif self._run_until_complete:
logger.info("Running until complete")
self._do_run(None, n_sync_steps)
else:
if is_per_timestep_sdram:
if self._data_writer.get_max_run_time_steps() < sys.maxsize:
logger.warning("Due to recording this simulation "
"should not be run longer than {}ms",
self._data_writer.get_max_run_time_steps())
logger.info("Running until stop is called by another thread")
self._do_run(None, n_sync_steps)
# Indicate that the signal handler needs to act
if self.__is_main_thread():
self._raise_keyboard_interrupt = False
sys.excepthook = self.exception_handler
self.__run_control_c_handler_off()

@final
def _add_commands_to_command_sender(self, system_placements: Placements):
Expand Down Expand Up @@ -714,14 +689,13 @@ def _add_dependent_verts_and_edges_for_application_graph(self) -> None:
ApplicationEdge(v, dpt_vtx), edge_identifier)

@final
def _deduce_data_n_timesteps(self) -> int:
def _deduce_data_n_timesteps(self) -> None:
"""
Operates the auto pause and resume functionality by figuring out
how many timer ticks a simulation can run before SDRAM runs out,
and breaks simulation into chunks of that long.
:return: max time a simulation can run.
:rtype: int
"""
# Go through the placements and find how much SDRAM is used
# on each chip
Expand Down Expand Up @@ -750,7 +724,7 @@ def _deduce_data_n_timesteps(self) -> int:
max_this_chip = int((size - sdram.fixed) // sdram.per_timestep)
max_time_steps = min(max_time_steps, max_this_chip)

return max_time_steps
self._data_writer.set_max_run_time_steps(max_time_steps)

def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]:
"""
Expand Down Expand Up @@ -1352,8 +1326,18 @@ def _execute_locate_executable_start_type(self) -> None:
May set the executable_types data.
"""
with FecTimer("Locate executable start type", TimerWork.OTHER):
self._data_writer.set_executable_types(
locate_executable_start_type())
binary_start_types = locate_executable_start_type()
self._data_writer.set_executable_types(binary_start_types)
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
# Disable auto pause and resume if the binary can't do it
for executable_type in binary_start_types:
if not executable_type.supports_auto_pause_and_resume:
logger.warning(
"Disabling auto pause resume as graph includes {}",
executable_type)
set_config("Buffers",
"use_auto_pause_and_resume", "False")
break

@final
def _execute_buffer_manager_creator(self) -> None:
Expand Down Expand Up @@ -1395,6 +1379,10 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None:
:param float total_run_time:
"""
FecTimer.start_category(TimerCategory.MAPPING)
if self._data_writer.is_soft_reset():
# wipe out stuff associated with past mapping
self._hard_reset()
self._add_dependent_verts_and_edges_for_application_graph()

self._setup_java_caller()
self._do_extra_mapping_algorithms()
Expand Down Expand Up @@ -1438,6 +1426,7 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None:
self._execute_locate_executable_start_type()
self._execute_buffer_manager_creator()

self._deduce_data_n_timesteps()
FecTimer.end_category(TimerCategory.MAPPING)

# Overridden by spy which adds placement_order
Expand Down

0 comments on commit 8ef4060

Please sign in to comment.