Skip to content

Commit

Permalink
Merge pull request #490 from opensafely-core/sync-transition-optimisa…
Browse files Browse the repository at this point in the history
…tion

fix: optimise loop for synchronous prepare/finalize
  • Loading branch information
bloodearnest authored Oct 13, 2022
2 parents e72e296 + 9ede3ec commit 0fc3aa4
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 1 deletion.
2 changes: 2 additions & 0 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def workspace_is_archived(workspace):
class LocalDockerAPI(ExecutorAPI):
"""ExecutorAPI implementation using local docker service."""

synchronous_transitions = [ExecutorState.PREPARING, ExecutorState.FINALIZING]

def prepare(self, job):
current = self.get_status(job)
if current.state != ExecutorState.UNKNOWN:
Expand Down
4 changes: 4 additions & 0 deletions jobrunner/executors/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def get_results(self, job: JobDefinition) -> JobResults:
def delete_files(self, workspace: str, privacy: Privacy, paths: [str]) -> List[str]:
return self._wrapped.delete_files(workspace, privacy, paths)

@property
def synchronous_transitions(self):
return getattr(self._wrapped, "synchronous_transitions", [])

def _add_logging(self, method: Callable[[JobDefinition], JobStatus]):
def wrapper(job: JobDefinition) -> JobStatus:
status = method(job)
Expand Down
18 changes: 17 additions & 1 deletion jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,14 @@ def handle_single_job(job, api):
mode = get_flag_value("mode")
paused = str(get_flag_value("paused", "False")).lower() == "true"
try:
handle_job(job, api, mode, paused)
synchronous_transition = handle_job(job, api, mode, paused)

# provide a way to shortcut moving a job on to the next state right away
# this is intended to support executors where some state transitions
# are synchronous, particularly the local executor where prepare is
# synchronous and can be time consuming.
if synchronous_transition:
handle_job(job, api, mode, paused)
except Exception as exc:
mark_job_as_failed(
job,
Expand Down Expand Up @@ -293,6 +300,15 @@ def handle_job(job, api, mode=None, paused=None):
)
set_code(job, code, message)

# does this api have synchronous_transitions?
synchronous_transitions = getattr(api, "synchronous_transitions", [])

if new_status.state in synchronous_transitions:
# we want to immediately run this function for this job again to
# avoid blocking it as we know the state transition has already
# completed.
return True

elif new_status.state == ExecutorState.ERROR:
# all transitions can go straight to error
raise ExecutorError(new_status.message)
Expand Down
7 changes: 7 additions & 0 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ def add_test_job(

def set_job_state(self, definition, state, message="message"):
"""Directly set a job state."""
# handle the synchronous state meaning the state has completed
synchronous = getattr(self, "synchronous_transitions", [])
if state in synchronous:
if state == ExecutorState.PREPARING:
state = ExecutorState.PREPARED
if state == ExecutorState.FINALIZING:
state = ExecutorState.FINALIZED
self.state[definition.id] = JobStatus(state, message)

def set_job_transition(self, definition, state, message="executor message"):
Expand Down
26 changes: 26 additions & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,32 @@ def error(*args, **kwargs):
assert spans[-1].name == "JOB"


def test_handle_single_job_shortcuts_synchronous(db):
api = StubExecutorAPI()
job = api.add_test_job(ExecutorState.UNKNOWN, State.PENDING, StatusCode.CREATED)

api.synchronous_transitions = [ExecutorState.PREPARING]

run.handle_single_job(job, api)

# executor state
assert job.id in api.tracker["prepare"]
assert job.id in api.tracker["execute"]
assert api.get_status(job).state == ExecutorState.EXECUTING

# our state
assert job.status_message == "Executing job on the backend"
assert job.state == State.RUNNING
assert job.started_at

# tracing
spans = get_trace()
assert spans[-4].name == "CREATED"
assert spans[-3].name == "ENTER PREPARING"
assert spans[-2].name == "PREPARING"
assert spans[-1].name == "ENTER EXECUTING"


def test_ignores_cancelled_jobs_when_calculating_dependencies(db):
job_factory(
id="1",
Expand Down

0 comments on commit 0fc3aa4

Please sign in to comment.