From 4e2974b6077ff230716dc7ed4fef40e8497a4720 Mon Sep 17 00:00:00 2001 From: bloodearnest Date: Wed, 9 Nov 2022 13:35:23 +0000 Subject: [PATCH] fix: kill_job preserves job logs. It does not copy outputs, or copy the log to the workspace. It's primarily intended for staff debugging. Fixes #511 --- jobrunner/cli/kill_job.py | 27 ++++++++++++----- jobrunner/executors/local.py | 42 ++++++++++++++++++--------- tests/cli/test_kill_job.py | 56 +++++++++++++++++++++++++++++------- 3 files changed, 94 insertions(+), 31 deletions(-) diff --git a/jobrunner/cli/kill_job.py b/jobrunner/cli/kill_job.py index 47184f96..0d12406d 100644 --- a/jobrunner/cli/kill_job.py +++ b/jobrunner/cli/kill_job.py @@ -3,17 +3,20 @@ """ import argparse -from jobrunner.executors.local import container_name, docker, volume_api -from jobrunner.lib.database import find_where +from jobrunner.executors import local +from jobrunner.lib import database, docker from jobrunner.models import Job, State, StatusCode -from jobrunner.run import mark_job_as_failed +from jobrunner.run import job_to_job_definition, mark_job_as_failed def main(partial_job_ids, cleanup=False): jobs = get_jobs(partial_job_ids) + # + # for job in jobs: # If the job has been previously killed we don't want to overwrite the # timestamps here + container = local.container_name(job) if job.state in (State.PENDING, State.RUNNING): mark_job_as_failed( job, @@ -21,17 +24,27 @@ def main(partial_job_ids, cleanup=False): "An OpenSAFELY admin manually killed this job", ) # All these docker commands are idempotent - docker.kill(container_name(job)) + docker.kill(container) + + # save the logs + container_metadata = docker.container_inspect( + container, none_if_not_exists=True + ) + if container_metadata: + job = job_to_job_definition(job) + metadata = local.get_job_metadata(job, {}, container_metadata) + local.write_job_logs(job, metadata, copy_log_to_workspace=False) + if cleanup: - docker.delete_container(container_name(job)) - volume_api.delete_volume(job) + docker.delete_container(container) + local.volume_api.delete_volume(job) def get_jobs(partial_job_ids): jobs = [] need_confirmation = False for partial_job_id in partial_job_ids: - matches = find_where(Job, id__like=f"%{partial_job_id}%") + matches = database.find_where(Job, id__like=f"%{partial_job_id}%") if len(matches) == 0: raise RuntimeError(f"No jobs found matching '{partial_job_id}'") elif len(matches) > 1: diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index 54da6d16..2b670ec8 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -301,15 +301,16 @@ def finalize_job(job): image_id=container_metadata["Image"], message=message, ) - persist_outputs(job, results.outputs, container_metadata) + job.completed_at = int(time.time()) + job_metadata = get_job_metadata(job, outputs, container_metadata) + write_job_logs(job, job_metadata) + persist_outputs(job, results.outputs, job_metadata) RESULTS[job.id] = results -def persist_outputs(job, outputs, container_metadata): - """Copy logs and generated outputs to persistant storage.""" +def get_job_metadata(job, outputs, container_metadata): # job_metadata is a big dict capturing everything we know about the state # of the job - job.completed_at = int(time.time()) job_metadata = dict() job_metadata["job_id"] = job.id job_metadata["job_request_id"] = job.job_request_id @@ -321,31 +322,43 @@ def persist_outputs(job, outputs, container_metadata): job_metadata["container_metadata"] = container_metadata job_metadata["outputs"] = outputs job_metadata["commit"] = job.study.commit + return job_metadata + +def write_job_logs(job, job_metadata, copy_log_to_workspace=True): + """Copy logs to log dir and workspace.""" # Dump useful info in log directory log_dir = get_log_dir(job) write_log_file(job, job_metadata, log_dir / "logs.txt") with open(log_dir / "metadata.json", "w") as f: json.dump(job_metadata, f, indent=2) - # Copy logs to workspace - workspace_dir = get_high_privacy_workspace(job.workspace) - metadata_log_file = workspace_dir / METADATA_DIR / f"{job.action}.log" - copy_file(log_dir / "logs.txt", metadata_log_file) - log.info(f"Logs written to: {metadata_log_file}") + if copy_log_to_workspace: + workspace_dir = get_high_privacy_workspace(job.workspace) + workspace_log_file = workspace_dir / METADATA_DIR / f"{job.action}.log" + copy_file(log_dir / "logs.txt", workspace_log_file) + log.info(f"Logs written to: {workspace_log_file}") + + medium_privacy_dir = get_medium_privacy_workspace(job.workspace) + if medium_privacy_dir: + copy_file( + workspace_log_file, + medium_privacy_dir / METADATA_DIR / f"{job.action}.log", + ) + +def persist_outputs(job, outputs, job_metadata): + """Copy generated outputs to persistant storage.""" # Extract outputs to workspace + workspace_dir = get_high_privacy_workspace(job.workspace) + for filename in outputs.keys(): log.info(f"Extracting output file: {filename}") volume_api.copy_from_volume(job, filename, workspace_dir / filename) - # Copy out logs and medium privacy files + # Copy out medium privacy files medium_privacy_dir = get_medium_privacy_workspace(job.workspace) if medium_privacy_dir: - copy_file( - workspace_dir / METADATA_DIR / f"{job.action}.log", - medium_privacy_dir / METADATA_DIR / f"{job.action}.log", - ) for filename, privacy_level in outputs.items(): if privacy_level == "moderately_sensitive": copy_file(workspace_dir / filename, medium_privacy_dir / filename) @@ -519,6 +532,7 @@ def redact_environment_variables(container_metadata): def write_manifest_file(workspace_dir, manifest): manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE + manifest_file.parent.mkdir(exist_ok=True) manifest_file_tmp = manifest_file.with_suffix(".tmp") manifest_file_tmp.write_text(json.dumps(manifest, indent=2)) manifest_file_tmp.replace(manifest_file) diff --git a/tests/cli/test_kill_job.py b/tests/cli/test_kill_job.py index a7671e5d..a8fdd3eb 100644 --- a/tests/cli/test_kill_job.py +++ b/tests/cli/test_kill_job.py @@ -1,28 +1,64 @@ from unittest import mock +import pytest + from jobrunner.cli import kill_job -from jobrunner.executors.local import volume_api -from jobrunner.lib import database, docker +from jobrunner.executors import local +from jobrunner.lib import database from jobrunner.models import Job, State, StatusCode from tests.factories import job_factory -def test_kill_job(db, monkeypatch): +@pytest.mark.parametrize("cleanup", [False, True]) +def test_kill_job(cleanup, tmp_work_dir, db, monkeypatch): - j1 = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING) + job = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING) - mocker = mock.MagicMock(spec=docker) - mockume_api = mock.MagicMock(spec=volume_api) + mocker = mock.MagicMock(spec=local.docker) + mockume_api = mock.MagicMock(spec=local.volume_api) def mock_get_jobs(partial_job_ids): - return [j1] + return [job] + + # persist_outputs needs this + mocker.container_inspect.return_value = { + "Image": "image", + "State": {"ExitCode": 137}, + } + # set both the docker module names used to the mocker version monkeypatch.setattr(kill_job, "docker", mocker) - monkeypatch.setattr(kill_job, "volume_api", mockume_api) + monkeypatch.setattr(local, "docker", mocker) + monkeypatch.setattr(kill_job.local, "volume_api", mockume_api) monkeypatch.setattr(kill_job, "get_jobs", mock_get_jobs) - kill_job.main(j1.id) + kill_job.main(job.id, cleanup=cleanup) - job1 = database.find_one(Job, id=j1.id) + job1 = database.find_one(Job, id=job.id) assert job1.state == State.FAILED assert job1.status_code == StatusCode.KILLED_BY_ADMIN + + container = local.container_name(job) + assert mocker.kill.call_args[0] == (container,) + assert mocker.write_logs_to_file.call_args[0][0] == container + + log_dir = local.get_log_dir(job) + log_file = log_dir / "logs.txt" + metadata_file = log_dir / "metadata.json" + + assert log_file.exists() + assert metadata_file.exists() + + workspace_log_file = ( + local.get_high_privacy_workspace(job.workspace) + / local.METADATA_DIR + / f"{job.action}.log" + ) + assert not workspace_log_file.exists() + + if cleanup: + assert mocker.delete_container.called + assert mockume_api.delete_volume.called + else: + assert not mocker.delete_container.called + assert not mockume_api.delete_volume.called