Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 4, 2024
1 parent 96bff6c commit f5d9f45
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from copy import deepcopy
from dataclasses import dataclass, field
from http.client import HTTPException
from typing import Any, cast
from typing import Any, Final, cast

import dask.typing
import distributed
Expand Down Expand Up @@ -99,7 +99,7 @@
}


_DASK_DEFAULT_TIMEOUT_S = 1
_DASK_DEFAULT_TIMEOUT_S: Final[int] = 1


_UserCallbackInSepThread = Callable[[], None]
Expand Down Expand Up @@ -451,9 +451,9 @@ def _get_pipeline_statuses(
)
if dask_status == "erred":
# find out if this was a cancellation
exception = await distributed.Future(job_id).exception(
timeout=_DASK_DEFAULT_TIMEOUT_S
)
exception = await distributed.Future(
job_id, client=self.backend.client
).exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
assert isinstance(exception, Exception) # nosec

if isinstance(exception, TaskCancelledError):
Expand Down

0 comments on commit f5d9f45

Please sign in to comment.