Skip to content

Commit

Permalink
Merge pull request #529 from opensafely-core/more-trace-metadata
Browse files Browse the repository at this point in the history
more trace metadata
  • Loading branch information
bloodearnest authored Dec 8, 2022
2 parents e8b4fd0 + 95d2b40 commit 6519e11
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 77 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ env:

jobs:
check:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04

steps:
- uses: actions/checkout@v2
Expand All @@ -26,7 +26,7 @@ jobs:
test-job:
strategy:
matrix:
os: [ubuntu-latest, windows-2019]
os: [ubuntu-20.04, windows-2019]
# Python 3.8 is what we currently support for running cohortextractor
# locally, and 3.9 is what we required for databuilder so we need to make
# sure we can run with those
Expand All @@ -49,15 +49,15 @@ jobs:
- uses: extractions/setup-just@aa5d15c144db4585980a44ebfdd2cf337c4f14cb # 1.4

- name: Run actual tests on ${{ matrix.os }}
if: ${{ matrix.os == 'ubuntu-latest' }}
run: just test
if: ${{ matrix.os == 'ubuntu-20.04' }}
run: just test -vvv

- name: Run actual tests on ${{ matrix.os }}
if: ${{ matrix.os == 'windows-2019' }}
run: just test-no-docker
run: just test-no-docker -vvv

test-package-build:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
name: Test we can build PyPI package
steps:
- name: Checkout
Expand All @@ -84,7 +84,7 @@ jobs:
run: just package-test sdist

test-docker:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
name: Test docker image
steps:
- name: Checkout
Expand All @@ -102,7 +102,7 @@ jobs:
run: just docker/test

test-github-workflow-output:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
name: Inspect test runner output in the context of a Github Workflow
steps:
- name: Checkout
Expand Down
21 changes: 21 additions & 0 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import configparser
import os
import re
import subprocess
from multiprocessing import cpu_count
from pathlib import Path

Expand All @@ -16,6 +17,26 @@ def _is_valid_backend_name(name):
default_work_dir = Path(__file__) / "../../workdir"


VERSION = os.environ.get("VERSION", "unknown")
if VERSION == "unknown":
try:
VERSION = subprocess.check_output(
["git", "describe", "--tags"], text=True
).strip()
except (FileNotFoundError, subprocess.CalledProcessError):
pass


GIT_SHA = os.environ.get("GIT_SHA", "unknown")
if GIT_SHA == "unknown":
try:
GIT_SHA = subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"], text=True
).strip()
except (FileNotFoundError, subprocess.CalledProcessError):
pass


WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve()
DATABASE_FILE = WORKDIR / "db.sqlite"
GIT_REPO_DIR = WORKDIR / "repos"
Expand Down
6 changes: 6 additions & 0 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def finalize_job(job):
outputs, unmatched_patterns = find_matching_outputs(job)
unmatched_outputs = get_unmatched_outputs(job, outputs)
exit_code = container_metadata["State"]["ExitCode"]
labels = container_metadata.get("Config", {}).get("Labels", {})

# First get the user-friendly message for known database exit codes, for jobs
# that have db access
Expand All @@ -307,6 +308,11 @@ def finalize_job(job):
exit_code=container_metadata["State"]["ExitCode"],
image_id=container_metadata["Image"],
message=message,
action_version=labels.get("org.opencontainers.image.version", "unknown"),
action_revision=labels.get("org.opencontainers.image.revision", "unknown"),
action_created=labels.get("org.opencontainers.image.created", "unknown"),
base_revision=labels.get("org.opensafely.base.vcs-ref", "unknown"),
base_created=labels.get("org.opencontainers.base.build-date", "unknown"),
)
job.completed_at = int(time.time())
job_metadata = get_job_metadata(job, outputs, container_metadata)
Expand Down
7 changes: 7 additions & 0 deletions jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ class JobResults:
image_id: str
message: str = None

# to be extracted from the image labels
action_version: str = "unknown"
action_revision: str = "unknown"
action_created: str = "unknown"
base_revision: str = "unknown"
base_created: str = "unknown"


class ExecutorRetry(Exception):
"""Indicates to the job scheduler that there's a temporary issue and to try again later."""
Expand Down
25 changes: 11 additions & 14 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,7 @@ def save_results(job, definition, results):
code = StatusCode.SUCCEEDED
message = "Completed successfully"

trace_attrs = dict(
exit_code=results.exit_code,
image_id=results.image_id,
outputs=len(results.outputs),
unmatched_patterns=len(results.unmatched_patterns),
unmatched_outputs=len(results.unmatched_outputs),
)

set_state(job, state, code, message, error=state == State.FAILED, **trace_attrs)
set_state(job, state, code, message, error=state == State.FAILED, results=results)


def get_obsolete_files(definition, outputs):
Expand Down Expand Up @@ -479,7 +471,7 @@ def mark_job_as_failed(job, code, message, error=None, **attrs):
set_state(job, State.FAILED, code, message, error=error, attrs=attrs)


def set_state(job, state, code, message, error=None, **attrs):
def set_state(job, state, code, message, error=None, results=None, **attrs):
"""Update the high level state transitions.
Error can be an exception or a string message.
Expand All @@ -502,10 +494,10 @@ def set_state(job, state, code, message, error=None, **attrs):
elif state == State.PENDING: # restarting the job gracefully
job.started_at = None

set_code(job, code, message, error=error, **attrs)
set_code(job, code, message, error=error, results=results, **attrs)


def set_code(job, code, message, error=None, **attrs):
def set_code(job, code, message, error=None, results=None, **attrs):
"""Set the granular status code state.
We also trace this transition with OpenTelemetry traces.
Expand All @@ -525,7 +517,7 @@ def set_code(job, code, message, error=None, **attrs):

# trace we finished the previous state
tracing.finish_current_state(
job, timestamp_ns, error=error, message=message, **attrs
job, timestamp_ns, error=error, message=message, results=results, **attrs
)

# update db object
Expand All @@ -540,7 +532,12 @@ def set_code(job, code, message, error=None, **attrs):
if code in FINAL_STATUS_CODES:
# transitioning to a final state, so just record that state
tracing.record_final_state(
job, timestamp_ns, error=error, message=message, **attrs
job,
timestamp_ns,
error=error,
message=message,
results=results,
**attrs,
)
else:
# trace that we've started the next state
Expand Down
41 changes: 28 additions & 13 deletions jobrunner/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _traceable(job):
return True


def finish_current_state(job, timestamp_ns, error=None, **attrs):
def finish_current_state(job, timestamp_ns, error=None, results=None, **attrs):
"""Record a span representing the state we've just exited."""
if not _traceable(job):
return
Expand All @@ -101,13 +101,13 @@ def finish_current_state(job, timestamp_ns, error=None, **attrs):
try:
name = job.status_code.name
start_time = job.status_code_updated_at
record_job_span(job, name, start_time, timestamp_ns, error, **attrs)
record_job_span(job, name, start_time, timestamp_ns, error, results, **attrs)
except Exception:
# make sure trace failures do not error the job
logger.exception(f"failed to trace state for {job.id}")


def record_final_state(job, timestamp_ns, error=None, **attrs):
def record_final_state(job, timestamp_ns, error=None, results=None, **attrs):
"""Record a span representing the state we've just exited."""
if not _traceable(job):
return
Expand All @@ -120,9 +120,9 @@ def record_final_state(job, timestamp_ns, error=None, **attrs):
# final states have no duration, so make last for 1 sec, just act
# as a marker
end_time = int(timestamp_ns + 1e9)
record_job_span(job, name, start_time, end_time, error, **attrs)
record_job_span(job, name, start_time, end_time, error, results, **attrs)

complete_job(job, timestamp_ns, error, **attrs)
complete_job(job, timestamp_ns, error, results, **attrs)
except Exception:
# make sure trace failures do not error the job
logger.exception(f"failed to trace state for {job.id}")
Expand All @@ -143,7 +143,7 @@ def start_new_state(job, timestamp_ns, error=None, **attrs):
start_time = timestamp_ns
# fix the time for these synthetic marker events at one second
end_time = int(start_time + 1e9)
record_job_span(job, name, start_time, end_time, error, **attrs)
record_job_span(job, name, start_time, end_time, error, results=None, **attrs)
except Exception:
# make sure trace failures do not error the job
logger.exception(f"failed to trace state for {job.id}")
Expand Down Expand Up @@ -181,19 +181,19 @@ def load_trace_context(job):
return propagation.set_span_in_context(trace.NonRecordingSpan(span_context), {})


def record_job_span(job, name, start_time, end_time, error, **attrs):
def record_job_span(job, name, start_time, end_time, error, results, **attrs):
"""Record a span for a job."""
if not _traceable(job):
return

ctx = load_trace_context(job)
tracer = trace.get_tracer("jobs")
span = tracer.start_span(name, context=ctx, start_time=start_time)
set_span_metadata(span, job, error, **attrs)
set_span_metadata(span, job, error, results, **attrs)
span.end(end_time)


def complete_job(job, timestamp_ns, error=None, **attrs):
def complete_job(job, timestamp_ns, error=None, results=None, **attrs):
"""Send the root span to record the full duration for this job."""

root_ctx = load_root_span(job)
Expand All @@ -215,20 +215,20 @@ def complete_job(job, timestamp_ns, error=None, **attrs):
root_span._context = root_ctx

# annotate and send
set_span_metadata(root_span, job, error, **attrs)
set_span_metadata(root_span, job, error, results, **attrs)
root_span.end(timestamp_ns)


OTEL_ATTR_TYPES = (bool, str, bytes, int, float)


def set_span_metadata(span, job, error=None, **attrs):
def set_span_metadata(span, job, error=None, results=None, **attrs):
"""Set span metadata with everthing we know about a job."""
attributes = {}

if attrs:
attributes.update(attrs)
attributes.update(trace_attributes(job))
attributes.update(trace_attributes(job, results))

# opentelemetry can only handle serializing certain attribute types
clean_attrs = {}
Expand All @@ -250,7 +250,7 @@ def set_span_metadata(span, job, error=None, **attrs):
span.record_exception(error)


def trace_attributes(job):
def trace_attributes(job, results=None):
"""These attributes are added to every span in order to slice and dice by
each as needed.
"""
Expand All @@ -277,6 +277,8 @@ def trace_attributes(job):
state=job.state.name,
message=job.status_message,
requires_db=job.requires_db,
jobrunner_version=config.VERSION,
jobrunner_sha=config.GIT_SHA,
)

# local_run jobs don't have a commit
Expand All @@ -288,6 +290,19 @@ def trace_attributes(job):
if job.action_commit:
attrs["reusable_action"] += ":" + job.action_commit

if results:
attrs["exit_code"] = results.exit_code
attrs["image_id"] = results.image_id
attrs["outputs"] = len(results.outputs)
attrs["unmatched_patterns"] = len(results.unmatched_patterns)
attrs["unmatched_outputs"] = len(results.unmatched_outputs)
attrs["executor_message"] = results.message
attrs["action_version"] = results.action_version
attrs["action_revision"] = results.action_revision
attrs["action_created"] = results.action_created
attrs["base_revision"] = results.base_revision
attrs["base_created"] = results.base_created

return attrs


Expand Down
16 changes: 16 additions & 0 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@
}


JOB_RESULTS_DEFAULTS = {
"outputs": ["output1", "output2"],
"unmatched_patterns": [],
"unmatched_outputs": [],
"exit_code": 0,
"image_id": "image_id",
"message": "message",
}


def job_request_factory_raw(**kwargs):
if "id" not in kwargs:
kwargs["id"] = base64.b32encode(secrets.token_bytes(10)).decode("ascii").lower()
Expand Down Expand Up @@ -84,6 +94,12 @@ def job_factory(job_request=None, **kwargs):
return job


def job_results_factory(**kwargs):
values = deepcopy(JOB_RESULTS_DEFAULTS)
values.update(kwargs)
return JobResults(**values)


class StubExecutorAPI:
"""Dummy implementation of the ExecutorAPI, for use in tests.
Expand Down
Loading

0 comments on commit 6519e11

Please sign in to comment.