Skip to content

Commit

Permalink
Merge pull request #514 from opensafely-core/kill-job-preserve-log
Browse files Browse the repository at this point in the history
fix: kill_job preserves job logs.
  • Loading branch information
bloodearnest authored Dec 2, 2022
2 parents c6b20c9 + 4e2974b commit 0c11aab
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 31 deletions.
27 changes: 20 additions & 7 deletions jobrunner/cli/kill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,48 @@
"""
import argparse

from jobrunner.executors.local import container_name, docker, volume_api
from jobrunner.lib.database import find_where
from jobrunner.executors import local
from jobrunner.lib import database, docker
from jobrunner.models import Job, State, StatusCode
from jobrunner.run import mark_job_as_failed
from jobrunner.run import job_to_job_definition, mark_job_as_failed


def main(partial_job_ids, cleanup=False):
jobs = get_jobs(partial_job_ids)
#
#
for job in jobs:
# If the job has been previously killed we don't want to overwrite the
# timestamps here
container = local.container_name(job)
if job.state in (State.PENDING, State.RUNNING):
mark_job_as_failed(
job,
StatusCode.KILLED_BY_ADMIN,
"An OpenSAFELY admin manually killed this job",
)
# All these docker commands are idempotent
docker.kill(container_name(job))
docker.kill(container)

# save the logs
container_metadata = docker.container_inspect(
container, none_if_not_exists=True
)
if container_metadata:
job = job_to_job_definition(job)
metadata = local.get_job_metadata(job, {}, container_metadata)
local.write_job_logs(job, metadata, copy_log_to_workspace=False)

if cleanup:
docker.delete_container(container_name(job))
volume_api.delete_volume(job)
docker.delete_container(container)
local.volume_api.delete_volume(job)


def get_jobs(partial_job_ids):
jobs = []
need_confirmation = False
for partial_job_id in partial_job_ids:
matches = find_where(Job, id__like=f"%{partial_job_id}%")
matches = database.find_where(Job, id__like=f"%{partial_job_id}%")
if len(matches) == 0:
raise RuntimeError(f"No jobs found matching '{partial_job_id}'")
elif len(matches) > 1:
Expand Down
42 changes: 28 additions & 14 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,16 @@ def finalize_job(job):
image_id=container_metadata["Image"],
message=message,
)
persist_outputs(job, results.outputs, container_metadata)
job.completed_at = int(time.time())
job_metadata = get_job_metadata(job, outputs, container_metadata)
write_job_logs(job, job_metadata)
persist_outputs(job, results.outputs, job_metadata)
RESULTS[job.id] = results


def persist_outputs(job, outputs, container_metadata):
"""Copy logs and generated outputs to persistant storage."""
def get_job_metadata(job, outputs, container_metadata):
# job_metadata is a big dict capturing everything we know about the state
# of the job
job.completed_at = int(time.time())
job_metadata = dict()
job_metadata["job_id"] = job.id
job_metadata["job_request_id"] = job.job_request_id
Expand All @@ -321,31 +322,43 @@ def persist_outputs(job, outputs, container_metadata):
job_metadata["container_metadata"] = container_metadata
job_metadata["outputs"] = outputs
job_metadata["commit"] = job.study.commit
return job_metadata


def write_job_logs(job, job_metadata, copy_log_to_workspace=True):
"""Copy logs to log dir and workspace."""
# Dump useful info in log directory
log_dir = get_log_dir(job)
write_log_file(job, job_metadata, log_dir / "logs.txt")
with open(log_dir / "metadata.json", "w") as f:
json.dump(job_metadata, f, indent=2)

# Copy logs to workspace
workspace_dir = get_high_privacy_workspace(job.workspace)
metadata_log_file = workspace_dir / METADATA_DIR / f"{job.action}.log"
copy_file(log_dir / "logs.txt", metadata_log_file)
log.info(f"Logs written to: {metadata_log_file}")
if copy_log_to_workspace:
workspace_dir = get_high_privacy_workspace(job.workspace)
workspace_log_file = workspace_dir / METADATA_DIR / f"{job.action}.log"
copy_file(log_dir / "logs.txt", workspace_log_file)
log.info(f"Logs written to: {workspace_log_file}")

medium_privacy_dir = get_medium_privacy_workspace(job.workspace)
if medium_privacy_dir:
copy_file(
workspace_log_file,
medium_privacy_dir / METADATA_DIR / f"{job.action}.log",
)


def persist_outputs(job, outputs, job_metadata):
"""Copy generated outputs to persistant storage."""
# Extract outputs to workspace
workspace_dir = get_high_privacy_workspace(job.workspace)

for filename in outputs.keys():
log.info(f"Extracting output file: {filename}")
volume_api.copy_from_volume(job, filename, workspace_dir / filename)

# Copy out logs and medium privacy files
# Copy out medium privacy files
medium_privacy_dir = get_medium_privacy_workspace(job.workspace)
if medium_privacy_dir:
copy_file(
workspace_dir / METADATA_DIR / f"{job.action}.log",
medium_privacy_dir / METADATA_DIR / f"{job.action}.log",
)
for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
copy_file(workspace_dir / filename, medium_privacy_dir / filename)
Expand Down Expand Up @@ -519,6 +532,7 @@ def redact_environment_variables(container_metadata):

def write_manifest_file(workspace_dir, manifest):
manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE
manifest_file.parent.mkdir(exist_ok=True)
manifest_file_tmp = manifest_file.with_suffix(".tmp")
manifest_file_tmp.write_text(json.dumps(manifest, indent=2))
manifest_file_tmp.replace(manifest_file)
56 changes: 46 additions & 10 deletions tests/cli/test_kill_job.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,64 @@
from unittest import mock

import pytest

from jobrunner.cli import kill_job
from jobrunner.executors.local import volume_api
from jobrunner.lib import database, docker
from jobrunner.executors import local
from jobrunner.lib import database
from jobrunner.models import Job, State, StatusCode
from tests.factories import job_factory


def test_kill_job(db, monkeypatch):
@pytest.mark.parametrize("cleanup", [False, True])
def test_kill_job(cleanup, tmp_work_dir, db, monkeypatch):

j1 = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)
job = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)

mocker = mock.MagicMock(spec=docker)
mockume_api = mock.MagicMock(spec=volume_api)
mocker = mock.MagicMock(spec=local.docker)
mockume_api = mock.MagicMock(spec=local.volume_api)

def mock_get_jobs(partial_job_ids):
return [j1]
return [job]

# persist_outputs needs this
mocker.container_inspect.return_value = {
"Image": "image",
"State": {"ExitCode": 137},
}

# set both the docker module names used to the mocker version
monkeypatch.setattr(kill_job, "docker", mocker)
monkeypatch.setattr(kill_job, "volume_api", mockume_api)
monkeypatch.setattr(local, "docker", mocker)
monkeypatch.setattr(kill_job.local, "volume_api", mockume_api)
monkeypatch.setattr(kill_job, "get_jobs", mock_get_jobs)

kill_job.main(j1.id)
kill_job.main(job.id, cleanup=cleanup)

job1 = database.find_one(Job, id=j1.id)
job1 = database.find_one(Job, id=job.id)
assert job1.state == State.FAILED
assert job1.status_code == StatusCode.KILLED_BY_ADMIN

container = local.container_name(job)
assert mocker.kill.call_args[0] == (container,)
assert mocker.write_logs_to_file.call_args[0][0] == container

log_dir = local.get_log_dir(job)
log_file = log_dir / "logs.txt"
metadata_file = log_dir / "metadata.json"

assert log_file.exists()
assert metadata_file.exists()

workspace_log_file = (
local.get_high_privacy_workspace(job.workspace)
/ local.METADATA_DIR
/ f"{job.action}.log"
)
assert not workspace_log_file.exists()

if cleanup:
assert mocker.delete_container.called
assert mockume_api.delete_volume.called
else:
assert not mocker.delete_container.called
assert not mockume_api.delete_volume.called

0 comments on commit 0c11aab

Please sign in to comment.