Skip to content

Commit

Permalink
fix: kill_job preserves job logs.
Browse files Browse the repository at this point in the history
It does not copy outputs, or copy the log to the workspace. It's
primarily intended for staff debugging.

Fixes #511
  • Loading branch information
bloodearnest committed Dec 2, 2022
1 parent adf78e0 commit 4e2974b
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 31 deletions.
27 changes: 20 additions & 7 deletions jobrunner/cli/kill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,48 @@
"""
import argparse

from jobrunner.executors.local import container_name, docker, volume_api
from jobrunner.lib.database import find_where
from jobrunner.executors import local
from jobrunner.lib import database, docker
from jobrunner.models import Job, State, StatusCode
from jobrunner.run import mark_job_as_failed
from jobrunner.run import job_to_job_definition, mark_job_as_failed


def main(partial_job_ids, cleanup=False):
jobs = get_jobs(partial_job_ids)
#
#
for job in jobs:
# If the job has been previously killed we don't want to overwrite the
# timestamps here
container = local.container_name(job)
if job.state in (State.PENDING, State.RUNNING):
mark_job_as_failed(
job,
StatusCode.KILLED_BY_ADMIN,
"An OpenSAFELY admin manually killed this job",
)
# All these docker commands are idempotent
docker.kill(container_name(job))
docker.kill(container)

# save the logs
container_metadata = docker.container_inspect(
container, none_if_not_exists=True
)
if container_metadata:
job = job_to_job_definition(job)
metadata = local.get_job_metadata(job, {}, container_metadata)
local.write_job_logs(job, metadata, copy_log_to_workspace=False)

if cleanup:
docker.delete_container(container_name(job))
volume_api.delete_volume(job)
docker.delete_container(container)
local.volume_api.delete_volume(job)


def get_jobs(partial_job_ids):
jobs = []
need_confirmation = False
for partial_job_id in partial_job_ids:
matches = find_where(Job, id__like=f"%{partial_job_id}%")
matches = database.find_where(Job, id__like=f"%{partial_job_id}%")
if len(matches) == 0:
raise RuntimeError(f"No jobs found matching '{partial_job_id}'")
elif len(matches) > 1:
Expand Down
42 changes: 28 additions & 14 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,16 @@ def finalize_job(job):
image_id=container_metadata["Image"],
message=message,
)
persist_outputs(job, results.outputs, container_metadata)
job.completed_at = int(time.time())
job_metadata = get_job_metadata(job, outputs, container_metadata)
write_job_logs(job, job_metadata)
persist_outputs(job, results.outputs, job_metadata)
RESULTS[job.id] = results


def persist_outputs(job, outputs, container_metadata):
"""Copy logs and generated outputs to persistant storage."""
def get_job_metadata(job, outputs, container_metadata):
# job_metadata is a big dict capturing everything we know about the state
# of the job
job.completed_at = int(time.time())
job_metadata = dict()
job_metadata["job_id"] = job.id
job_metadata["job_request_id"] = job.job_request_id
Expand All @@ -321,31 +322,43 @@ def persist_outputs(job, outputs, container_metadata):
job_metadata["container_metadata"] = container_metadata
job_metadata["outputs"] = outputs
job_metadata["commit"] = job.study.commit
return job_metadata


def write_job_logs(job, job_metadata, copy_log_to_workspace=True):
"""Copy logs to log dir and workspace."""
# Dump useful info in log directory
log_dir = get_log_dir(job)
write_log_file(job, job_metadata, log_dir / "logs.txt")
with open(log_dir / "metadata.json", "w") as f:
json.dump(job_metadata, f, indent=2)

# Copy logs to workspace
workspace_dir = get_high_privacy_workspace(job.workspace)
metadata_log_file = workspace_dir / METADATA_DIR / f"{job.action}.log"
copy_file(log_dir / "logs.txt", metadata_log_file)
log.info(f"Logs written to: {metadata_log_file}")
if copy_log_to_workspace:
workspace_dir = get_high_privacy_workspace(job.workspace)
workspace_log_file = workspace_dir / METADATA_DIR / f"{job.action}.log"
copy_file(log_dir / "logs.txt", workspace_log_file)
log.info(f"Logs written to: {workspace_log_file}")

medium_privacy_dir = get_medium_privacy_workspace(job.workspace)
if medium_privacy_dir:
copy_file(
workspace_log_file,
medium_privacy_dir / METADATA_DIR / f"{job.action}.log",
)


def persist_outputs(job, outputs, job_metadata):
"""Copy generated outputs to persistant storage."""
# Extract outputs to workspace
workspace_dir = get_high_privacy_workspace(job.workspace)

for filename in outputs.keys():
log.info(f"Extracting output file: {filename}")
volume_api.copy_from_volume(job, filename, workspace_dir / filename)

# Copy out logs and medium privacy files
# Copy out medium privacy files
medium_privacy_dir = get_medium_privacy_workspace(job.workspace)
if medium_privacy_dir:
copy_file(
workspace_dir / METADATA_DIR / f"{job.action}.log",
medium_privacy_dir / METADATA_DIR / f"{job.action}.log",
)
for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
copy_file(workspace_dir / filename, medium_privacy_dir / filename)
Expand Down Expand Up @@ -519,6 +532,7 @@ def redact_environment_variables(container_metadata):

def write_manifest_file(workspace_dir, manifest):
manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE
manifest_file.parent.mkdir(exist_ok=True)
manifest_file_tmp = manifest_file.with_suffix(".tmp")
manifest_file_tmp.write_text(json.dumps(manifest, indent=2))
manifest_file_tmp.replace(manifest_file)
56 changes: 46 additions & 10 deletions tests/cli/test_kill_job.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,64 @@
from unittest import mock

import pytest

from jobrunner.cli import kill_job
from jobrunner.executors.local import volume_api
from jobrunner.lib import database, docker
from jobrunner.executors import local
from jobrunner.lib import database
from jobrunner.models import Job, State, StatusCode
from tests.factories import job_factory


def test_kill_job(db, monkeypatch):
@pytest.mark.parametrize("cleanup", [False, True])
def test_kill_job(cleanup, tmp_work_dir, db, monkeypatch):

j1 = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)
job = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)

mocker = mock.MagicMock(spec=docker)
mockume_api = mock.MagicMock(spec=volume_api)
mocker = mock.MagicMock(spec=local.docker)
mockume_api = mock.MagicMock(spec=local.volume_api)

def mock_get_jobs(partial_job_ids):
return [j1]
return [job]

# persist_outputs needs this
mocker.container_inspect.return_value = {
"Image": "image",
"State": {"ExitCode": 137},
}

# set both the docker module names used to the mocker version
monkeypatch.setattr(kill_job, "docker", mocker)
monkeypatch.setattr(kill_job, "volume_api", mockume_api)
monkeypatch.setattr(local, "docker", mocker)
monkeypatch.setattr(kill_job.local, "volume_api", mockume_api)
monkeypatch.setattr(kill_job, "get_jobs", mock_get_jobs)

kill_job.main(j1.id)
kill_job.main(job.id, cleanup=cleanup)

job1 = database.find_one(Job, id=j1.id)
job1 = database.find_one(Job, id=job.id)
assert job1.state == State.FAILED
assert job1.status_code == StatusCode.KILLED_BY_ADMIN

container = local.container_name(job)
assert mocker.kill.call_args[0] == (container,)
assert mocker.write_logs_to_file.call_args[0][0] == container

log_dir = local.get_log_dir(job)
log_file = log_dir / "logs.txt"
metadata_file = log_dir / "metadata.json"

assert log_file.exists()
assert metadata_file.exists()

workspace_log_file = (
local.get_high_privacy_workspace(job.workspace)
/ local.METADATA_DIR
/ f"{job.action}.log"
)
assert not workspace_log_file.exists()

if cleanup:
assert mocker.delete_container.called
assert mockume_api.delete_volume.called
else:
assert not mocker.delete_container.called
assert not mockume_api.delete_volume.called

0 comments on commit 4e2974b

Please sign in to comment.