From 0ae228d7735ad4f866e4975a7a3046a184d5ea89 Mon Sep 17 00:00:00 2001 From: bloodearnest Date: Fri, 6 Jan 2023 12:01:12 +0000 Subject: [PATCH 1/2] fix: don't error the job on ExecutorRetry Instead, record that it happened, but don't fail the job. The idea is that we can then be alerted to it happening, but when it's over, the job continues. Also, increase get_status timeout from 10s to 15s, we a) occasionally we saw it in GHA CI, and b) having it slow down a bit when docker is blocking is no bad thing --- jobrunner/executors/local.py | 8 +++-- jobrunner/run.py | 19 ++++++----- tests/test_local_executor.py | 9 ++++-- tests/test_run.py | 61 +++++++----------------------------- 4 files changed, 32 insertions(+), 65 deletions(-) diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index 822083b5..fee80d0c 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -194,16 +194,18 @@ def cleanup(self, job): RESULTS.pop(job.id, None) return JobStatus(ExecutorState.UNKNOWN) - def get_status(self, job): + def get_status(self, job, timeout=15): name = container_name(job) try: container = docker.container_inspect( name, none_if_not_exists=True, - timeout=10, + timeout=timeout, ) except docker.DockerTimeoutError: - raise ExecutorRetry("timed out inspecting container {name}") + raise ExecutorRetry( + f"docker timed out after {timeout}s inspecting container {name}" + ) if container is None: # container doesn't exist # timestamp file presence means we have finished preparing diff --git a/jobrunner/run.py b/jobrunner/run.py index bb1c54c2..5bfd35ba 100644 --- a/jobrunner/run.py +++ b/jobrunner/run.py @@ -31,7 +31,6 @@ log = logging.getLogger(__name__) tracer = trace.get_tracer("loop") -# used to track the number of times an executor has asked to retry a job EXECUTOR_RETRIES = {} @@ -224,15 +223,15 @@ def handle_job(job, api, mode=None, paused=None): try: initial_status = api.get_status(definition) - except ExecutorRetry: - retries = EXECUTOR_RETRIES.get(job.id, 0) - if retries >= config.MAX_RETRIES: - raise RetriesExceeded( - f"Too many retries for job {job.id} from executor" - ) from ExecutorRetry - else: - EXECUTOR_RETRIES[job.id] = retries + 1 - return + except ExecutorRetry as retry: + job_retries = EXECUTOR_RETRIES.get(job.id, 0) + 1 + EXECUTOR_RETRIES[job.id] = job_retries + span = trace.get_current_span() + span.set_attribute("executor_retry", True) + span.set_attribute("executor_retry_message", str(retry)) + span.set_attribute("executor_retry_count", job_retries) + log.info(f"ExecutorRetry: {retry}") + return else: EXECUTOR_RETRIES.pop(job.id, None) diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 213eced4..73db885b 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -646,5 +646,10 @@ def inspect(*args, **kwargs): monkeypatch.setattr(local.docker, "container_inspect", inspect) api = local.LocalDockerAPI() - with pytest.raises(local.ExecutorRetry): - api.get_status(job) + with pytest.raises(local.ExecutorRetry) as exc: + api.get_status(job, timeout=11) + + assert ( + str(exc.value) + == "docker timed out after 11s inspecting container os-job-test_get_status_timeout" + ) diff --git a/tests/test_run.py b/tests/test_run.py index 8b96f188..31027e11 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -575,69 +575,30 @@ def error(*args, **kwargs): assert "final_state" not in spans[0].attributes -def test_handle_single_job_retries_exceeded(db, monkeypatch): +def test_handle_single_job_with_executor_retry(db, monkeypatch): api = StubExecutorAPI() job = api.add_test_job(ExecutorState.EXECUTED, State.RUNNING, StatusCode.EXECUTED) def retry(*args, **kwargs): - raise run.ExecutorRetry("retry") + raise run.ExecutorRetry("retry message") monkeypatch.setattr(api, "get_status", retry) - monkeypatch.setattr(config, "MAX_RETRIES", 3) run.handle_single_job(job, api) run.handle_single_job(job, api) - run.handle_single_job(job, api) - - with pytest.raises(run.RetriesExceeded): - run.handle_single_job(job, api) - - assert job.state is State.FAILED - - spans = get_trace("jobs") - assert spans[-3].name == "EXECUTED" - error_span = spans[-2] - assert error_span.name == "INTERNAL_ERROR" - assert error_span.status.status_code == trace.StatusCode.ERROR - assert error_span.events[0].name == "exception" - assert ( - error_span.events[0].attributes["exception.message"] - == f"Too many retries for job {job.id} from executor" - ) - assert spans[-1].name == "JOB" - - spans = get_trace("loop") - assert len(spans) == 4 - assert all(s for s in spans if s.name == "LOOP_JOB") - - -def test_handle_single_job_retries_not_exceeded(db, monkeypatch): - api = StubExecutorAPI() - job = api.add_test_job(ExecutorState.EXECUTED, State.RUNNING, StatusCode.EXECUTED) - - def retry(*args, **kwargs): - raise run.ExecutorRetry("retry") - - orig_get_status = api.get_status - - monkeypatch.setattr(api, "get_status", retry) - monkeypatch.setattr(config, "MAX_RETRIES", 3) - - run.handle_single_job(job, api) - run.handle_single_job(job, api) - run.handle_single_job(job, api) - - monkeypatch.setattr(api, "get_status", orig_get_status) - - # do *not* blow up this time - run.handle_single_job(job, api) assert job.state is State.RUNNING - assert job.id not in run.EXECUTOR_RETRIES + assert run.EXECUTOR_RETRIES[job.id] == 2 spans = get_trace("loop") - assert len(spans) == 4 - assert all(s for s in spans if s.name == "LOOP_JOB") + assert len(spans) == 2 + assert spans[0].attributes["executor_retry"] is True + assert spans[0].attributes["executor_retry_message"] == "retry message" + assert spans[0].attributes["executor_retry_count"] == 1 + + assert spans[1].attributes["executor_retry"] is True + assert spans[1].attributes["executor_retry_message"] == "retry message" + assert spans[1].attributes["executor_retry_count"] == 2 def test_handle_single_job_shortcuts_synchronous(db): From 3387027e4157baa0041b25ed1beac692c915284d Mon Sep 17 00:00:00 2001 From: bloodearnest Date: Fri, 6 Jan 2023 13:29:34 +0000 Subject: [PATCH 2/2] fix: check for workspace/docker errors first The local executors prepare() implementation was calling get_status() before checking for archived workspaces or missing docker images. This means that we can't test for that logic on windows, where we'd need docker. So, we perform those checks first, so the tests can run on windows --- jobrunner/executors/local.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index fee80d0c..c39c18ff 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -99,10 +99,6 @@ class LocalDockerAPI(ExecutorAPI): synchronous_transitions = [ExecutorState.PREPARING, ExecutorState.FINALIZING] def prepare(self, job): - current = self.get_status(job) - if current.state != ExecutorState.UNKNOWN: - return current - # Check the workspace is not archived workspace_dir = get_high_privacy_workspace(job.workspace) if not workspace_dir.exists(): @@ -124,6 +120,10 @@ def prepare(self, job): f"Docker image {job.image} is not currently available", ) + current = self.get_status(job) + if current.state != ExecutorState.UNKNOWN: + return current + try: prepare_job(job) except docker.DockerDiskSpaceError as e: