Skip to content

Commit

Permalink
Merge pull request #552 from opensafely-core/fix-timings
Browse files Browse the repository at this point in the history
fix timings
  • Loading branch information
bloodearnest authored Jan 13, 2023
2 parents b008c30 + 5b2abda commit a419ba8
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 108 deletions.
5 changes: 2 additions & 3 deletions jobrunner/cli/prepare_for_reboot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from jobrunner.executors.local import container_name, docker, volume_api
from jobrunner.lib.database import find_where
from jobrunner.models import Job, State, StatusCode
from jobrunner.run import set_state
from jobrunner.run import set_code


def main(pause=True):
Expand All @@ -26,9 +26,8 @@ def main(pause=True):

for job in find_where(Job, state=State.RUNNING):
print(f"reseting job {job.id} to PENDING")
set_state(
set_code(
job,
State.PENDING,
StatusCode.WAITING_ON_REBOOT,
"Job restarted - waiting for server to reboot",
)
Expand Down
9 changes: 4 additions & 5 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ def prepare(self, job):
ExecutorState.ERROR, "Out of disk space, please try again later"
)

# technically, we're acutally PREPARED, as we did in synchronously, but
# the loop code is expecting PREPARING, so return that. The next time
# around the loop, it will pick up that it is PREPARED, and move on.
return JobStatus(ExecutorState.PREPARING)
# this API is synchronous, so we are PREPARED now
return JobStatus(ExecutorState.PREPARED)

def execute(self, job):
current = self.get_status(job)
Expand Down Expand Up @@ -169,7 +167,8 @@ def finalize(self, job):
except LocalDockerError as exc:
return JobStatus(ExecutorState.ERROR, f"failed to finalize job: {exc}")

return JobStatus(ExecutorState.FINALIZING)
# this api is synchronous, so we are now FINALIZED
return JobStatus(ExecutorState.FINALIZED)

def terminate(self, job):
docker.kill(container_name(job))
Expand Down
9 changes: 9 additions & 0 deletions jobrunner/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ def datestr_to_ns_timestamp(datestr):
return ts


def ns_timestamp_to_datetime(timestamp_ns):
"""Debugging helper function to make ns timestamps human readable.
We do lose 3 levels of precision, as datetime can only handle microseconds,
but for human comparison that doesn't matter.
"""
return datetime.fromtimestamp(timestamp_ns / 1e9)


def warn_assertions(f):
"""Helper decorator to catch assertions errors and emit as warnings.
Expand Down
8 changes: 7 additions & 1 deletion jobrunner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class State(Enum):
# current low-level state of a job. They are simply machine readable versions
# of the human readable status_message which allow us to provide certain UX
# affordances in the web, cli and telemetry.


class StatusCode(Enum):

# PENDING states
Expand Down Expand Up @@ -72,9 +74,13 @@ class StatusCode(Enum):
INTERNAL_ERROR = "internal_error"
KILLED_BY_ADMIN = "killed_by_admin"

@property
def is_final_code(self):
return self in StatusCode._FINAL_STATUS_CODES


# used for tracing to know if a state is final or not
FINAL_STATUS_CODES = [
StatusCode._FINAL_STATUS_CODES = [
StatusCode.SUCCEEDED,
StatusCode.DEPENDENCY_FAILED,
StatusCode.NONZERO_EXIT,
Expand Down
141 changes: 73 additions & 68 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
Privacy,
Study,
)
from jobrunner.lib import ns_timestamp_to_datetime
from jobrunner.lib.database import find_where, select_values, update
from jobrunner.lib.log_utils import configure_logging, set_log_context
from jobrunner.models import FINAL_STATUS_CODES, Job, State, StatusCode
from jobrunner.models import Job, State, StatusCode
from jobrunner.queries import calculate_workspace_state, get_flag_value


Expand Down Expand Up @@ -186,6 +187,10 @@ def handle_job(job, api, mode=None, paused=None):
assert job.state in (State.PENDING, State.RUNNING)
definition = job_to_job_definition(job)

# does this api have synchronous_transitions?
synchronous_transitions = getattr(api, "synchronous_transitions", [])
is_synchronous = False

if job.cancelled:
# cancelled is driven by user request, so is handled explicitly first
# regardless of executor state.
Expand Down Expand Up @@ -213,9 +218,8 @@ def handle_job(job, api, mode=None, paused=None):
api.cleanup(definition)

# reset state to pending and exit
set_state(
set_code(
job,
State.PENDING,
StatusCode.WAITING_DB_MAINTENANCE,
"Waiting for database to finish maintenance",
)
Expand Down Expand Up @@ -297,15 +301,36 @@ def handle_job(job, api, mode=None, paused=None):
set_code(job, code, message)
return

expected_state = ExecutorState.PREPARING
if ExecutorState.PREPARING in synchronous_transitions:
# prepare is synchronous, which means set our code to PREPARING
# before calling api.prepare(), and we expect it to be PREPARED
# when finished
code, message = STATE_MAP[ExecutorState.PREPARING]
set_code(job, code, message)
expected_state = ExecutorState.PREPARED
is_synchronous = True
else:
expected_state = ExecutorState.PREPARING

new_status = api.prepare(definition)

elif initial_status.state == ExecutorState.PREPARED:
expected_state = ExecutorState.EXECUTING
new_status = api.execute(definition)

elif initial_status.state == ExecutorState.EXECUTED:
expected_state = ExecutorState.FINALIZING

if ExecutorState.FINALIZING in synchronous_transitions:
# finalize is synchronous, which means set our code to FINALIZING
# before calling api.finalize(), and we expect it to be FINALIZED
# when finished
code, message = STATE_MAP[ExecutorState.FINALIZING]
set_code(job, code, message)
expected_state = ExecutorState.FINALIZED
is_synchronous = True
else:
expected_state = ExecutorState.FINALIZING

new_status = api.finalize(definition)

elif initial_status.state == ExecutorState.FINALIZED:
Expand Down Expand Up @@ -341,29 +366,13 @@ def handle_job(job, api, mode=None, paused=None):

elif new_status.state == expected_state:
# successful state change to the expected next state

code, message = STATE_MAP[new_status.state]
set_code(job, code, message)

# special case PENDING -> RUNNING transition
# allow both states to do the transition to RUNNING, due to synchronous transitions
if new_status.state in [ExecutorState.PREPARING, ExecutorState.PREPARED]:
set_state(job, State.RUNNING, code, message)
else:
if job.state != State.RUNNING:
# got an ExecutorState that should mean the job.state is RUNNING, but it is not
log.warning(
f"state error: got {new_status.state} for job we thought was {job.state}"
)
set_code(job, code, message)

# does this api have synchronous_transitions?
synchronous_transitions = getattr(api, "synchronous_transitions", [])

if new_status.state in synchronous_transitions:
# we want to immediately run this function for this job again to
# avoid blocking it as we know the state transition has already
# completed.
return True
# we want to immediately run this function for this job again to
# avoid blocking it as we know the state transition has already
# completed.
return is_synchronous

elif new_status.state == ExecutorState.ERROR:
# all transitions can go straight to error
Expand All @@ -381,10 +390,11 @@ def save_results(job, definition, results):
job.outputs = results.outputs

message = None
error = False

if results.exit_code != 0:
state = State.FAILED
code = StatusCode.NONZERO_EXIT
error = True
message = "Job exited with an error"
if results.message:
message += f": {results.message}"
Expand All @@ -395,8 +405,8 @@ def save_results(job, definition, results):

elif results.unmatched_patterns:
job.unmatched_outputs = results.unmatched_outputs
state = State.FAILED
code = StatusCode.UNMATCHED_PATTERNS
error = True
# If the job fails because an output was missing its very useful to
# show the user what files were created as often the issue is just a
# typo
Expand All @@ -405,11 +415,10 @@ def save_results(job, definition, results):
)

else:
state = State.SUCCEEDED
code = StatusCode.SUCCEEDED
message = "Completed successfully"

set_state(job, state, code, message, error=state == State.FAILED, results=results)
set_code(job, code, message, error=error, results=results)


def get_obsolete_files(definition, outputs):
Expand Down Expand Up @@ -505,33 +514,7 @@ def mark_job_as_failed(job, code, message, error=None, **attrs):
if error is None:
error = True

set_state(job, State.FAILED, code, message, error=error, attrs=attrs)


def set_state(job, state, code, message, error=None, results=None, **attrs):
"""Update the high level state transitions.
Error can be an exception or a string message.
This sets the high level state and records the timestamps we currently use
for job-server metrics, then sets the granular code state via set_code.
Argubly these higher level states are not strictly required now we're
tracking the full status code progression. But that would be a big change,
so we'll leave for a later refactor.
"""

timestamp = int(time.time())
job.state = state
if state == State.RUNNING:
job.started_at = timestamp
elif state == State.FAILED or state == State.SUCCEEDED:
job.completed_at = timestamp
elif state == State.PENDING: # restarting the job gracefully
job.started_at = None

set_code(job, code, message, error=error, results=results, **attrs)
set_code(job, code, message, error=error, attrs=attrs)


def set_code(job, code, message, error=None, results=None, timestamp_ns=None, **attrs):
Expand All @@ -550,20 +533,42 @@ def set_code(job, code, message, error=None, results=None, timestamp_ns=None, **
timestamp_s = int(t)
timestamp_ns = int(t * 1e9)
else:
timestamp_s = int(timestamp_ns // 1e9)

if job.status_code_updated_at > timestamp_ns:
# we somehow have a negative duration, which honeycomb does funny things with.
# This can happen in tests, where things are fast, but we've seen it in production too.
log.warning(
f"negative state duration, clamping to 1ms ({job.status_code_updated_at} > {timestamp_ns})"
)
timestamp_ns = job.status_code_updated_at + 1e6 # set duration to 1ms
timestamp_s = int(timestamp_ns // 1e9)
timestamp_s = int(timestamp_ns / 1e9)

# if code has changed then trace it and update
if job.status_code != code:

# handle timer measurement errors
if job.status_code_updated_at > timestamp_ns:
# we somehow have a negative duration, which honeycomb does funny things with.
# This can happen in tests, where things are fast, but we've seen it in production too.
duration = datetime.timedelta(
microseconds=int((timestamp_ns - job.status_code_updated_at) / 1e3)
)
log.warning(
f"negative state duration of {duration}, clamping to 1ms\n"
f"before: {job.status_code:<24} at {ns_timestamp_to_datetime(job.status_code_updated_at)}\n"
f"after : {code:<24} at {ns_timestamp_to_datetime(timestamp_ns)}\n"
)
timestamp_ns = int(job.status_code_updated_at + 1e6) # set duration to 1ms
timestamp_s = int(timestamp_ns // 1e9)

# update coarse state and timings for user
if code in [StatusCode.PREPARED, StatusCode.PREPARING]:
# we've started running
job.state = State.RUNNING
job.started_at = timestamp_s
elif code.is_final_code:
job.completed_at = timestamp_s
if code == StatusCode.SUCCEEDED:
job.state = State.SUCCEEDED
else:
job.state = State.FAILED
# we sometimes reset the job back to pending
elif code in [StatusCode.WAITING_ON_REBOOT, StatusCode.WAITING_DB_MAINTENANCE]:
job.state = State.PENDING
job.started_at = None

# job trace: we finished the previous state
tracing.finish_current_state(
job, timestamp_ns, error=error, message=message, results=results, **attrs
Expand All @@ -578,7 +583,7 @@ def set_code(job, code, message, error=None, results=None, timestamp_ns=None, **
job.status_code_updated_at = timestamp_ns
update_job(job)

if code in FINAL_STATUS_CODES:
if code.is_final_code:
# transitioning to a final state, so just record that state
tracing.record_final_state(
job,
Expand Down
29 changes: 19 additions & 10 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class StubExecutorAPI:
"""

synchronous_transitions = []

def __init__(self):

self.tracker = {
Expand Down Expand Up @@ -165,10 +167,10 @@ def set_job_state(self, definition, state, message="message", timestamp_ns=None)
self.state[definition.id] = JobStatus(state, message, timestamp_ns)

def set_job_transition(
self, definition, state, message="executor message", timestamp_ns=None
self, definition, state, message="executor message", hook=None
):
"""Set the next transition for this job when called"""
self.transitions[definition.id] = (state, message, timestamp_ns)
self.transitions[definition.id] = (state, message, hook)

def set_job_result(self, definition, timestamp_ns=None, **kwargs):
if timestamp_ns is None:
Expand All @@ -186,24 +188,29 @@ def set_job_result(self, definition, timestamp_ns=None, **kwargs):

def do_transition(self, definition, expected, next_state):
current = self.get_status(definition)
timestamp_ns = time.time_ns()
if current.state != expected:
state = current.state
message = f"Invalid transition to {next_state}, currently state is {current.state}"
elif definition.id in self.transitions:
state, message, timestamp_ns = self.transitions[definition.id]
state, message, hook = self.transitions.pop(definition.id)
if hook:
hook(definition)
else:
state = next_state
message = "executor message"

timestamp_ns = time.time_ns()
self.set_job_state(definition, state, message, timestamp_ns)
return JobStatus(state, message, timestamp_ns)

def prepare(self, definition):
self.tracker["prepare"].add(definition.id)
return self.do_transition(
definition, ExecutorState.UNKNOWN, ExecutorState.PREPARING
)
if ExecutorState.PREPARING in self.synchronous_transitions:
expected = ExecutorState.PREPARED
else:
expected = ExecutorState.PREPARING

return self.do_transition(definition, ExecutorState.UNKNOWN, expected)

def execute(self, definition):
self.tracker["execute"].add(definition.id)
Expand All @@ -213,9 +220,11 @@ def execute(self, definition):

def finalize(self, definition):
self.tracker["finalize"].add(definition.id)
return self.do_transition(
definition, ExecutorState.EXECUTED, ExecutorState.FINALIZING
)
if ExecutorState.FINALIZING in self.synchronous_transitions:
expected = ExecutorState.FINALIZED
else:
expected = ExecutorState.FINALIZING
return self.do_transition(definition, ExecutorState.EXECUTED, expected)

def terminate(self, definition):
self.tracker["terminate"].add(definition.id)
Expand Down
Loading

0 comments on commit a419ba8

Please sign in to comment.