Skip to content

Commit

Permalink
Add threads for alias and output updates on startup (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
Atticus1806 authored Nov 27, 2024
1 parent 8d4e9fd commit cf747fc
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions sisyphus/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def __init__(
self.ui = ui
self.interactive = interative
self.interactive_always_skip = set()
self.update_out_lock = threading.Lock()

self.stop_if_done = True
self._stop_loop = False
Expand Down Expand Up @@ -436,12 +437,13 @@ def f(job):
self.thread_pool.map(f, self.jobs.get(gs.STATE_RUNNABLE, []))

def check_output(self, write_output=False, update_all_outputs=False, force_update=False):
targets = self.sis_graph.targets if update_all_outputs else self.sis_graph.active_targets
for target in targets:
target.update_requirements(write_output=write_output, force=force_update)
if target.is_done():
target.run_when_done(write_output=write_output)
self.sis_graph.remove_from_active_targets(target)
with self.update_out_lock:
targets = self.sis_graph.targets if update_all_outputs else self.sis_graph.active_targets
for target in targets:
target.update_requirements(write_output=write_output, force=force_update)
if target.is_done():
target.run_when_done(write_output=write_output)
self.sis_graph.remove_from_active_targets(target)

def continue_manager_loop(self):
# Stop loop flag is set
Expand Down Expand Up @@ -482,7 +484,6 @@ def startup(self):
config_manager.continue_readers()

self.job_engine.reset_cache()
self.check_output(write_output=False, update_all_outputs=True)
self.update_jobs()

# Ensure at least one async reader head the chance to continue until he added his jobs to the list
Expand Down Expand Up @@ -548,8 +549,11 @@ def maybe_clear_state(state, always_clear, action):
self.print_state_overview(verbose=True)
elif answer.lower() == "y":
self.link_outputs = True
create_aliases(self.sis_graph.jobs())
self.check_output(write_output=self.link_outputs, update_all_outputs=True, force_update=True)
self.thread_pool.apply_async(create_aliases, self.sis_graph.jobs())
self.thread_pool.apply_async(
self.check_output,
kwds={"write_output": self.link_outputs, "update_all_outputs": True, "force_update": True},
)
break
elif answer.lower() == "u":
self.link_outputs = True
Expand Down Expand Up @@ -590,7 +594,6 @@ def run(self):
if self.mem_profile:
self.mem_profile.snapshot()
self.job_engine.reset_cache()
self.check_output(write_output=self.link_outputs)

config_manager.continue_readers()
self.update_jobs()
Expand Down Expand Up @@ -623,6 +626,8 @@ def run(self):
for job in self.jobs.get(gs.STATE_ERROR, []):
gs.on_job_failure(job)

self.check_output(write_output=self.link_outputs)

# Stop config reader
config_manager.cancel_all_reader()

Expand Down

0 comments on commit cf747fc

Please sign in to comment.