Skip to content

Commit

Permalink
feat: instrument assert_new_jobs_created (#556)
Browse files Browse the repository at this point in the history
* feat: instrument assert_new_jobs_created

Co-authored-by: Simon Davy <[email protected]>
  • Loading branch information
madwort and bloodearnest authored Jan 20, 2023
1 parent 5e3e981 commit 82a655e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 25 deletions.
77 changes: 52 additions & 25 deletions jobrunner/create_or_update_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import time

from opentelemetry import trace
from pipeline import RUN_ALL_COMMAND, ProjectValidationError, load_pipeline

from jobrunner import config, tracing
Expand Down Expand Up @@ -279,33 +280,59 @@ def job_should_be_rerun(job_request, job):


def assert_new_jobs_created(new_jobs, current_jobs):
if new_jobs:
return

pending = [job.action for job in current_jobs if job.state == State.PENDING]
running = [job.action for job in current_jobs if job.state == State.RUNNING]
succeeded = [job.action for job in current_jobs if job.state == State.SUCCEEDED]
failed = [job.action for job in current_jobs if job.state == State.FAILED]
tracer = trace.get_tracer("new_jobs")
with tracer.start_as_current_span("assert_new_jobs_created") as span:

if new_jobs:
span.set_attribute("new_jobs", True)
# want to have access to the job id that is causing these new jobs
# span.set_attribute("job_id", job.id)
span.set_attribute(
"new_job_actions", ", ".join(f"{job.action}" for job in new_jobs)
)
span.set_attribute(
"new_job_ids", ", ".join(f"{job.id}" for job in new_jobs)
)
return

span.set_attribute("new_jobs", False)

pending = [job.action for job in current_jobs if job.state == State.PENDING]
running = [job.action for job in current_jobs if job.state == State.RUNNING]
succeeded = [job.action for job in current_jobs if job.state == State.SUCCEEDED]
failed = [job.action for job in current_jobs if job.state == State.FAILED]

span.set_attribute("pending", ", ".join(pending))
span.set_attribute("running", ", ".join(running))
span.set_attribute("succeeded", ", ".join(succeeded))
span.set_attribute("failed", ", ".join(failed))
# do something better here
span.set_attribute("current_jobs", len(current_jobs))

# There are two legitimate reasons we can end up with no new jobs to run...
if len(succeeded) == len(current_jobs):
# ...one is that the "run all" action was requested but everything has already run successfully
log.info(
f"run_all requested, but all jobs are already successful: {', '.join(succeeded)}"
)
span.set_attribute("status", "nothing_to_do")
raise NothingToDoError()

# There are two legitimate reasons we can end up with no new jobs to run...
if len(succeeded) == len(current_jobs):
# ...one is that the "run all" action was requested but everything has already run successfully
log.info(
f"run_all requested, but all jobs are already successful: {', '.join(succeeded)}"
if len(pending) + len(running) == len(current_jobs):
# ...the other is that every requested action is already running or pending, this is considered a user error
statuses = ", ".join(
f"{job.action}({job.state.name})" for job in current_jobs
)
log.info(f"All requested actions were already scheduled to run: {statuses}")
span.set_attribute("status", "requested_actions_scheduled")
raise JobRequestError("All requested actions were already scheduled to run")

# But if we get here then we've somehow failed to schedule new jobs despite the fact that some of the actions we
# depend on have failed, which is a bug.
span.set_attribute("status", "unexpected_failed_jobs_error")
raise Exception(
f"Unexpected failed jobs in dependency graph after scheduling: {', '.join(failed)}"
)
raise NothingToDoError()

if len(pending) + len(running) == len(current_jobs):
# ...the other is that every requested action is already running or pending, this is considered a user error
statuses = ", ".join(f"{job.action}({job.state.name})" for job in current_jobs)
log.info(f"All requested actions were already scheduled to run: {statuses}")
raise JobRequestError("All requested actions were already scheduled to run")

# But if we get here then we've somehow failed to schedule new jobs despite the fact that some of the actions we
# depend on have failed, which is a bug.
raise Exception(
f"Unexpected failed jobs in dependency graph after scheduling: {', '.join(failed)}"
)


def create_failed_job(job_request, exception):
Expand Down
28 changes: 28 additions & 0 deletions tests/test_create_or_update_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def test_create_or_update_jobs(tmp_work_dir, db):
new_job = find_one(Job)
assert old_job == new_job

spans = get_trace("new_jobs")

assert spans[0].name == "assert_new_jobs_created"
assert spans[0].attributes["new_jobs"] is True
assert spans[0].attributes["new_job_actions"] == "generate_cohort"


# Basic smoketest to test the error path
def test_create_or_update_jobs_with_git_error(tmp_work_dir):
Expand Down Expand Up @@ -203,12 +209,34 @@ def test_run_all_ignores_failed_actions_that_have_been_removed(tmp_work_dir):
actions=["generate_cohort", "prepare_data_1", "prepare_data_2", "analyse_data"]
)
create_jobs_with_project_file(request, TEST_PROJECT)

update_where(Job, {"state": State.SUCCEEDED}, job_request_id=request.id)

with pytest.raises(NothingToDoError):
# Now this should be a no-op because all the actions that are still part of the study have succeeded
create_jobs_with_project_file(make_job_request(action="run_all"), TEST_PROJECT)

spans = get_trace("new_jobs")
assert spans[0].name == "assert_new_jobs_created"
assert spans[0].attributes["new_jobs"] is True
assert spans[0].attributes["new_job_actions"] == "obsolete_action"

assert spans[1].name == "assert_new_jobs_created"
assert spans[1].attributes["new_jobs"] is True
assert (
spans[1].attributes["new_job_actions"]
== "generate_cohort, prepare_data_1, prepare_data_2, analyse_data"
)

assert spans[2].name == "assert_new_jobs_created"
assert spans[2].attributes["new_jobs"] is False
assert (
spans[2].attributes["succeeded"]
== "analyse_data, generate_cohort, prepare_data_1, prepare_data_2"
)
assert spans[2].attributes["failed"] == ""
assert spans[2].attributes["current_jobs"] == 4


def test_cancelled_jobs_are_flagged(tmp_work_dir):
job_request = make_job_request(action="analyse_data")
Expand Down

0 comments on commit 82a655e

Please sign in to comment.