Skip to content

Commit

Permalink
Merge pull request #527 from opensafely-core/docker-timeouts
Browse files Browse the repository at this point in the history
fix: retry on docker inpsect timeouts
  • Loading branch information
bloodearnest authored Dec 5, 2022
2 parents 0c11aab + 1925c26 commit e8b4fd0
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 5 deletions.
1 change: 1 addition & 0 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def _is_valid_backend_name(name):

MAX_WORKERS = int(os.environ.get("MAX_WORKERS") or max(cpu_count() - 1, 1))
MAX_DB_WORKERS = int(os.environ.get("MAX_DB_WORKERS") or MAX_WORKERS)
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", 0))

# This is a crude mechanism for preventing a single large JobRequest with lots
# of associated Jobs from hogging all the resources. We want this configurable
Expand Down
13 changes: 10 additions & 3 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from jobrunner.executors.volumes import copy_file, get_volume_api
from jobrunner.job_executor import (
ExecutorAPI,
ExecutorRetry,
ExecutorState,
JobDefinition,
JobResults,
Expand Down Expand Up @@ -187,9 +188,15 @@ def cleanup(self, job):

def get_status(self, job):
name = container_name(job)
job_running = docker.container_inspect(
name, "State.Running", none_if_not_exists=True
)
try:
job_running = docker.container_inspect(
name,
"State.Running",
none_if_not_exists=True,
timeout=10,
)
except docker.DockerTimeoutError:
raise ExecutorRetry("timed out inspecting container {name}")

if job_running is None:
# no volume for this job found
Expand Down
6 changes: 6 additions & 0 deletions jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class JobResults:
message: str = None


class ExecutorRetry(Exception):
"""Indicates to the job scheduler that there's a temporary issue and to try again later."""

pass


class ExecutorAPI:
"""
API for managing job execution.
Expand Down
5 changes: 4 additions & 1 deletion jobrunner/lib/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def container_is_running(name):
return container_inspect(name, "State.Running", none_if_not_exists=True) or False


def container_inspect(name, key="", none_if_not_exists=False):
def container_inspect(name, key="", none_if_not_exists=False, timeout=None):
"""
Retrieves metadata about the named container. By default will return
everything but `key` can be a dotted path to a specific piece of metadata.
Expand All @@ -315,7 +315,10 @@ def container_inspect(name, key="", none_if_not_exists=False):
["container", "inspect", "--format", "{{json .%s}}" % key, name],
check=True,
capture_output=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
raise DockerTimeoutError(f"container_inspect timeout for {name}")
except subprocess.CalledProcessError as e:
if (
none_if_not_exists
Expand Down
22 changes: 21 additions & 1 deletion jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from jobrunner.executors import get_executor_api
from jobrunner.job_executor import (
ExecutorAPI,
ExecutorRetry,
ExecutorState,
JobDefinition,
Privacy,
Expand All @@ -27,6 +28,13 @@

log = logging.getLogger(__name__)

# used to track the number of times an executor has asked to retry a job
EXECUTOR_RETRIES = {}


class RetriesExceeded(Exception):
pass


class InvalidTransition(Exception):
pass
Expand Down Expand Up @@ -188,7 +196,19 @@ def handle_job(job, api, mode=None, paused=None):
)
return

initial_status = api.get_status(definition)
try:
initial_status = api.get_status(definition)
except ExecutorRetry:
retries = EXECUTOR_RETRIES.get(job.id, 0)
if retries >= config.MAX_RETRIES:
raise RetriesExceeded(
f"Too many retries for job {job.id} from executor"
) from ExecutorRetry
else:
EXECUTOR_RETRIES[job.id] = retries + 1
return
else:
EXECUTOR_RETRIES.pop(job.id, None)

# handle the simple no change needed states.
if initial_status.state in STABLE_STATES:
Expand Down
27 changes: 27 additions & 0 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,30 @@ def test_delete_files_bad_privacy(tmp_work_dir):
populate_workspace("test", "file.txt")
with pytest.raises(Exception):
api.delete_files("test", None, ["file.txt"])


def test_get_status_timeout(tmp_work_dir, monkeypatch):

job = JobDefinition(
id="test_get_status_timeout",
job_request_id="test_request_id",
study=None,
workspace="test",
action="action",
created_at=int(time.time()),
image="ghcr.io/opensafely-core/busybox",
args=["sleep", "1"],
env={},
inputs=[],
output_spec={},
allow_database_access=False,
)

def inspect(*args, **kwargs):
raise docker.DockerTimeoutError("timeout")

monkeypatch.setattr(local.docker, "container_inspect", inspect)
api = local.LocalDockerAPI()

with pytest.raises(local.ExecutorRetry):
api.get_status(job)
57 changes: 57 additions & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,63 @@ def error(*args, **kwargs):
assert spans[-1].name == "JOB"


def test_handle_single_job_retries_exceeded(db, monkeypatch):
api = StubExecutorAPI()
job = api.add_test_job(ExecutorState.EXECUTED, State.RUNNING, StatusCode.EXECUTED)

def retry(*args, **kwargs):
raise run.ExecutorRetry("retry")

monkeypatch.setattr(api, "get_status", retry)
monkeypatch.setattr(config, "MAX_RETRIES", 3)

run.handle_single_job(job, api)
run.handle_single_job(job, api)
run.handle_single_job(job, api)

with pytest.raises(run.RetriesExceeded):
run.handle_single_job(job, api)

assert job.state is State.FAILED

spans = get_trace()
assert spans[-3].name == "EXECUTED"
error_span = spans[-2]
assert error_span.name == "INTERNAL_ERROR"
assert error_span.status.status_code == trace.StatusCode.ERROR
assert error_span.events[0].name == "exception"
assert (
error_span.events[0].attributes["exception.message"]
== f"Too many retries for job {job.id} from executor"
)
assert spans[-1].name == "JOB"


def test_handle_single_job_retries_not_exceeded(db, monkeypatch):
api = StubExecutorAPI()
job = api.add_test_job(ExecutorState.EXECUTED, State.RUNNING, StatusCode.EXECUTED)

def retry(*args, **kwargs):
raise run.ExecutorRetry("retry")

orig_get_status = api.get_status

monkeypatch.setattr(api, "get_status", retry)
monkeypatch.setattr(config, "MAX_RETRIES", 3)

run.handle_single_job(job, api)
run.handle_single_job(job, api)
run.handle_single_job(job, api)

monkeypatch.setattr(api, "get_status", orig_get_status)

# do *not* blow up this time
run.handle_single_job(job, api)

assert job.state is State.RUNNING
assert job.id not in run.EXECUTOR_RETRIES


def test_handle_single_job_shortcuts_synchronous(db):
api = StubExecutorAPI()
job = api.add_test_job(ExecutorState.UNKNOWN, State.PENDING, StatusCode.CREATED)
Expand Down

0 comments on commit e8b4fd0

Please sign in to comment.