diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index fab1736c..54da6d16 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -87,6 +87,8 @@ def workspace_is_archived(workspace): class LocalDockerAPI(ExecutorAPI): """ExecutorAPI implementation using local docker service.""" + synchronous_transitions = [ExecutorState.PREPARING, ExecutorState.FINALIZING] + def prepare(self, job): current = self.get_status(job) if current.state != ExecutorState.UNKNOWN: diff --git a/jobrunner/executors/logging.py b/jobrunner/executors/logging.py index ff44e393..72ddb9a4 100644 --- a/jobrunner/executors/logging.py +++ b/jobrunner/executors/logging.py @@ -32,6 +32,10 @@ def get_results(self, job: JobDefinition) -> JobResults: def delete_files(self, workspace: str, privacy: Privacy, paths: [str]) -> List[str]: return self._wrapped.delete_files(workspace, privacy, paths) + @property + def synchronous_transitions(self): + return getattr(self._wrapped, "synchronous_transitions", []) + def _add_logging(self, method: Callable[[JobDefinition], JobStatus]): def wrapper(job: JobDefinition) -> JobStatus: status = method(job) diff --git a/jobrunner/run.py b/jobrunner/run.py index c28e18ff..538d6121 100644 --- a/jobrunner/run.py +++ b/jobrunner/run.py @@ -117,7 +117,14 @@ def handle_single_job(job, api): mode = get_flag_value("mode") paused = str(get_flag_value("paused", "False")).lower() == "true" try: - handle_job(job, api, mode, paused) + synchronous_transition = handle_job(job, api, mode, paused) + + # provide a way to shortcut moving a job on to the next state right away + # this is intended to support executors where some state transitions + # are synchronous, particularly the local executor where prepare is + # synchronous and can be time consuming. + if synchronous_transition: + handle_job(job, api, mode, paused) except Exception as exc: mark_job_as_failed( job, @@ -293,6 +300,15 @@ def handle_job(job, api, mode=None, paused=None): ) set_code(job, code, message) + # does this api have synchronous_transitions? + synchronous_transitions = getattr(api, "synchronous_transitions", []) + + if new_status.state in synchronous_transitions: + # we want to immediately run this function for this job again to + # avoid blocking it as we know the state transition has already + # completed. + return True + elif new_status.state == ExecutorState.ERROR: # all transitions can go straight to error raise ExecutorError(new_status.message) diff --git a/tests/factories.py b/tests/factories.py index ac5e59ac..47a803b3 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -133,6 +133,13 @@ def add_test_job( def set_job_state(self, definition, state, message="message"): """Directly set a job state.""" + # handle the synchronous state meaning the state has completed + synchronous = getattr(self, "synchronous_transitions", []) + if state in synchronous: + if state == ExecutorState.PREPARING: + state = ExecutorState.PREPARED + if state == ExecutorState.FINALIZING: + state = ExecutorState.FINALIZED self.state[definition.id] = JobStatus(state, message) def set_job_transition(self, definition, state, message="executor message"): diff --git a/tests/test_run.py b/tests/test_run.py index 4b1f2f88..e246f055 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -549,6 +549,32 @@ def error(*args, **kwargs): assert spans[-1].name == "JOB" +def test_handle_single_job_shortcuts_synchronous(db): + api = StubExecutorAPI() + job = api.add_test_job(ExecutorState.UNKNOWN, State.PENDING, StatusCode.CREATED) + + api.synchronous_transitions = [ExecutorState.PREPARING] + + run.handle_single_job(job, api) + + # executor state + assert job.id in api.tracker["prepare"] + assert job.id in api.tracker["execute"] + assert api.get_status(job).state == ExecutorState.EXECUTING + + # our state + assert job.status_message == "Executing job on the backend" + assert job.state == State.RUNNING + assert job.started_at + + # tracing + spans = get_trace() + assert spans[-4].name == "CREATED" + assert spans[-3].name == "ENTER PREPARING" + assert spans[-2].name == "PREPARING" + assert spans[-1].name == "ENTER EXECUTING" + + def test_ignores_cancelled_jobs_when_calculating_dependencies(db): job_factory( id="1",