Skip to content

Commit

Permalink
get_tasks_status fixed for new version of dask
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 5, 2024
1 parent 511ca1e commit d24571b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,21 @@ async def get_task_result(self, job_id: str) -> TaskOutputData:
async def release_task_result(self, job_id: str) -> None:
_logger.debug("releasing results for %s", f"{job_id=}")
try:
# NOTE: The distributed Variable holds the future of the tasks in the dask-scheduler
# Alas, deleting the variable is done asynchronously and there is no way to ensure
# the variable was effectively deleted.
# This is annoying as one can re-create the variable without error.
var = distributed.Variable(job_id, client=self.backend.client)
var.delete()
# first check if the key exists
await dask_utils.wrap_client_async_routine(
self.backend.client.get_dataset(name=job_id)
)

await dask_utils.wrap_client_async_routine(
self.backend.client.unpublish_dataset(name=job_id)
)
distributed.Variable(job_id, client=self.backend.client).delete()

except KeyError:
_logger.warning("Unknown task cannot be unpublished: %s", f"{job_id=}")

Expand Down
39 changes: 13 additions & 26 deletions services/director-v2/tests/unit/test_modules_dask_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def _assert_wait_for_task_status(
job_id: str,
dask_client: DaskClient,
expected_status: DaskClientTaskState,
timeout: int | None = None,
timeout: int | None = None, # noqa: ASYNC109
):
async for attempt in AsyncRetrying(
reraise=True,
Expand All @@ -104,24 +104,20 @@ async def _assert_wait_for_task_status(
f"waiting for task to be {expected_status=}, "
f"Attempt={attempt.retry_state.attempt_number}"
)
current_task_status = (await dask_client.get_tasks_status([job_id]))[0]
assert isinstance(current_task_status, DaskClientTaskState)
print(f"{current_task_status=} vs {expected_status=}")
if (
current_task_status is DaskClientTaskState.ERRED
and expected_status
not in [
DaskClientTaskState.ERRED,
DaskClientTaskState.LOST,
]
):
got = (await dask_client.get_tasks_status([job_id]))[0]
assert isinstance(got, DaskClientTaskState)
print(f"{got=} vs {expected_status=}")
if got is DaskClientTaskState.ERRED and expected_status not in [
DaskClientTaskState.ERRED,
DaskClientTaskState.LOST,
]:
try:
# we can fail fast here
# this will raise and we catch the Assertion to not reraise too long
await dask_client.get_task_result(job_id)
except AssertionError as exc:
raise RuntimeError from exc
assert current_task_status is expected_status
assert got is expected_status


@pytest.fixture
Expand Down Expand Up @@ -1024,11 +1020,6 @@ def fake_remote_fct(
assert len(published_computation_task) == 1

assert published_computation_task[0].node_id in cpu_image.fake_tasks
# let's get a dask future for the task here so dask will not remove the task from the scheduler at the end
computation_future = distributed.Future(
key=published_computation_task[0].job_id, client=dask_client.backend.client
)
assert computation_future

await _assert_wait_for_task_status(
published_computation_task[0].job_id,
Expand All @@ -1047,15 +1038,11 @@ def fake_remote_fct(
)
# release the task results
await dask_client.release_task_result(published_computation_task[0].job_id)
# the task is still present since we hold a future here
await _assert_wait_for_task_status(
published_computation_task[0].job_id,
dask_client,
DaskClientTaskState.ERRED if fail_remote_fct else DaskClientTaskState.SUCCESS,
)

# removing the future will let dask eventually delete the task from its memory, so its status becomes undefined
del computation_future
await asyncio.sleep(
5 # NOTE: here we wait to be sure that the dask-scheduler properly updates its state
)
# the task is gone, since the distributed Variable was removed above
await _assert_wait_for_task_status(
published_computation_task[0].job_id,
dask_client,
Expand Down

0 comments on commit d24571b

Please sign in to comment.