Skip to content

Commit

Permalink
Merge pull request #549 from opensafely-core/no-fail-on-retry
Browse files Browse the repository at this point in the history
fix: don't error the job on ExecutorRetry
  • Loading branch information
bloodearnest authored Jan 6, 2023
2 parents bb1374f + 3387027 commit f14c761
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 69 deletions.
16 changes: 9 additions & 7 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ class LocalDockerAPI(ExecutorAPI):
synchronous_transitions = [ExecutorState.PREPARING, ExecutorState.FINALIZING]

def prepare(self, job):
current = self.get_status(job)
if current.state != ExecutorState.UNKNOWN:
return current

# Check the workspace is not archived
workspace_dir = get_high_privacy_workspace(job.workspace)
if not workspace_dir.exists():
Expand All @@ -124,6 +120,10 @@ def prepare(self, job):
f"Docker image {job.image} is not currently available",
)

current = self.get_status(job)
if current.state != ExecutorState.UNKNOWN:
return current

try:
prepare_job(job)
except docker.DockerDiskSpaceError as e:
Expand Down Expand Up @@ -194,16 +194,18 @@ def cleanup(self, job):
RESULTS.pop(job.id, None)
return JobStatus(ExecutorState.UNKNOWN)

def get_status(self, job):
def get_status(self, job, timeout=15):
name = container_name(job)
try:
container = docker.container_inspect(
name,
none_if_not_exists=True,
timeout=10,
timeout=timeout,
)
except docker.DockerTimeoutError:
raise ExecutorRetry("timed out inspecting container {name}")
raise ExecutorRetry(
f"docker timed out after {timeout}s inspecting container {name}"
)

if container is None: # container doesn't exist
# timestamp file presence means we have finished preparing
Expand Down
19 changes: 9 additions & 10 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
log = logging.getLogger(__name__)
tracer = trace.get_tracer("loop")

# used to track the number of times an executor has asked to retry a job
EXECUTOR_RETRIES = {}


Expand Down Expand Up @@ -224,15 +223,15 @@ def handle_job(job, api, mode=None, paused=None):

try:
initial_status = api.get_status(definition)
except ExecutorRetry:
retries = EXECUTOR_RETRIES.get(job.id, 0)
if retries >= config.MAX_RETRIES:
raise RetriesExceeded(
f"Too many retries for job {job.id} from executor"
) from ExecutorRetry
else:
EXECUTOR_RETRIES[job.id] = retries + 1
return
except ExecutorRetry as retry:
job_retries = EXECUTOR_RETRIES.get(job.id, 0) + 1
EXECUTOR_RETRIES[job.id] = job_retries
span = trace.get_current_span()
span.set_attribute("executor_retry", True)
span.set_attribute("executor_retry_message", str(retry))
span.set_attribute("executor_retry_count", job_retries)
log.info(f"ExecutorRetry: {retry}")
return
else:
EXECUTOR_RETRIES.pop(job.id, None)

Expand Down
9 changes: 7 additions & 2 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,5 +646,10 @@ def inspect(*args, **kwargs):
monkeypatch.setattr(local.docker, "container_inspect", inspect)
api = local.LocalDockerAPI()

with pytest.raises(local.ExecutorRetry):
api.get_status(job)
with pytest.raises(local.ExecutorRetry) as exc:
api.get_status(job, timeout=11)

assert (
str(exc.value)
== "docker timed out after 11s inspecting container os-job-test_get_status_timeout"
)
61 changes: 11 additions & 50 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,69 +575,30 @@ def error(*args, **kwargs):
assert "final_state" not in spans[0].attributes


def test_handle_single_job_retries_exceeded(db, monkeypatch):
def test_handle_single_job_with_executor_retry(db, monkeypatch):
api = StubExecutorAPI()
job = api.add_test_job(ExecutorState.EXECUTED, State.RUNNING, StatusCode.EXECUTED)

def retry(*args, **kwargs):
raise run.ExecutorRetry("retry")
raise run.ExecutorRetry("retry message")

monkeypatch.setattr(api, "get_status", retry)
monkeypatch.setattr(config, "MAX_RETRIES", 3)

run.handle_single_job(job, api)
run.handle_single_job(job, api)
run.handle_single_job(job, api)

with pytest.raises(run.RetriesExceeded):
run.handle_single_job(job, api)

assert job.state is State.FAILED

spans = get_trace("jobs")
assert spans[-3].name == "EXECUTED"
error_span = spans[-2]
assert error_span.name == "INTERNAL_ERROR"
assert error_span.status.status_code == trace.StatusCode.ERROR
assert error_span.events[0].name == "exception"
assert (
error_span.events[0].attributes["exception.message"]
== f"Too many retries for job {job.id} from executor"
)
assert spans[-1].name == "JOB"

spans = get_trace("loop")
assert len(spans) == 4
assert all(s for s in spans if s.name == "LOOP_JOB")


def test_handle_single_job_retries_not_exceeded(db, monkeypatch):
api = StubExecutorAPI()
job = api.add_test_job(ExecutorState.EXECUTED, State.RUNNING, StatusCode.EXECUTED)

def retry(*args, **kwargs):
raise run.ExecutorRetry("retry")

orig_get_status = api.get_status

monkeypatch.setattr(api, "get_status", retry)
monkeypatch.setattr(config, "MAX_RETRIES", 3)

run.handle_single_job(job, api)
run.handle_single_job(job, api)
run.handle_single_job(job, api)

monkeypatch.setattr(api, "get_status", orig_get_status)

# do *not* blow up this time
run.handle_single_job(job, api)

assert job.state is State.RUNNING
assert job.id not in run.EXECUTOR_RETRIES
assert run.EXECUTOR_RETRIES[job.id] == 2

spans = get_trace("loop")
assert len(spans) == 4
assert all(s for s in spans if s.name == "LOOP_JOB")
assert len(spans) == 2
assert spans[0].attributes["executor_retry"] is True
assert spans[0].attributes["executor_retry_message"] == "retry message"
assert spans[0].attributes["executor_retry_count"] == 1

assert spans[1].attributes["executor_retry"] is True
assert spans[1].attributes["executor_retry_message"] == "retry message"
assert spans[1].attributes["executor_retry_count"] == 2


def test_handle_single_job_shortcuts_synchronous(db):
Expand Down

0 comments on commit f14c761

Please sign in to comment.