Skip to content

Commit

Permalink
add try catch
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 5, 2024
1 parent 205e67e commit 2ef87d6
Showing 1 changed file with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,23 +450,30 @@ def _get_pipeline_statuses(
DaskSchedulerTaskState | None, task_statuses.get(job_id, "lost")
)
if dask_status == "erred":
# find out if this was a cancellation
exception = await distributed.Future(
job_id, client=self.backend.client
).exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
assert isinstance(exception, Exception) # nosec

if isinstance(exception, TaskCancelledError):
running_states.append(DaskClientTaskState.ABORTED)
else:
assert exception # nosec
try:
# find out if this was a cancellation
exception = await distributed.Future(
job_id, client=self.backend.client
).exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
assert isinstance(exception, Exception) # nosec

if isinstance(exception, TaskCancelledError):
running_states.append(DaskClientTaskState.ABORTED)
else:
assert exception # nosec
_logger.warning(
"Task %s completed in error:\n%s\nTrace:\n%s",
job_id,
exception,
"".join(traceback.format_exception(exception)),
)
running_states.append(DaskClientTaskState.ERRED)
except TimeoutError:
_logger.warning(
"Task %s completed in error:\n%s\nTrace:\n%s",
job_id,
exception,
"".join(traceback.format_exception(exception)),
"Task %s completed in error but was lost from dask-scheduler since then."
"TIP: This can happen when the future just disappeared from the dask-scheduler when this call was done."
)
running_states.append(DaskClientTaskState.ERRED)
running_states.append(DaskClientTaskState.LOST)
elif dask_status is None:
running_states.append(DaskClientTaskState.LOST)
else:
Expand Down

0 comments on commit 2ef87d6

Please sign in to comment.