From e5708ece346401471375399280fe124e7ccf78e8 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 2 Dec 2024 14:45:17 +0100 Subject: [PATCH] a future must be linked to a client --- .../tests/unit/test_modules_dask_client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index e75b96d3fd1..adc29f4b1f7 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -613,7 +613,9 @@ def fake_sidecar_fct( ) assert published_computation_task[0].node_id in image_params.fake_tasks # creating a new future shows that it is not done???? - assert not distributed.Future(published_computation_task[0].job_id).done() + assert not distributed.Future( + published_computation_task[0].job_id, client=dask_client.backend.client + ).done() # as the task is published on the dask-scheduler when sending, it shall still be published on the dask scheduler list_of_persisted_datasets = await dask_client.backend.client.list_datasets() # type: ignore @@ -636,7 +638,9 @@ def fake_sidecar_fct( assert isinstance(task_result, TaskOutputData) assert task_result.get("some_output_key") == 123 # try to create another future and this one is already done - assert distributed.Future(published_computation_task[0].job_id).done() + assert distributed.Future( + published_computation_task[0].job_id, client=dask_client.backend.client + ).done() async def test_abort_computation_tasks( @@ -1022,7 +1026,9 @@ def fake_remote_fct( assert published_computation_task[0].node_id in cpu_image.fake_tasks # let's get a dask future for the task here so dask will not remove the task from the scheduler at the end - computation_future = distributed.Future(key=published_computation_task[0].job_id) + computation_future = distributed.Future( + key=published_computation_task[0].job_id, client=dask_client.backend.client + ) assert computation_future await _assert_wait_for_task_status(