From 794d7d584fa7d1d92a77be30b10a7a98c9a0cbd3 Mon Sep 17 00:00:00 2001 From: jeanluciano Date: Wed, 24 Jul 2024 10:34:37 -0500 Subject: [PATCH] test formatting --- src/prefect/futures.py | 16 ++++++++-------- tests/test_futures.py | 8 +++++--- tests/test_task_runners.py | 3 +++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/prefect/futures.py b/src/prefect/futures.py index 8018478e56d0..55b9cb0a8b9d 100644 --- a/src/prefect/futures.py +++ b/src/prefect/futures.py @@ -166,7 +166,7 @@ def result( def add_done_callback(self, fn: Callable[[PrefectFuture], None]): if not self._final_state: - def call_with_self(): + def call_with_self(future): fn(self) self._wrapped_future.add_done_callback(call_with_self) @@ -345,24 +345,24 @@ def as_completed( pending = unique_futures - done yield from done - event = threading.Event() - lock = threading.Lock() + finished_event = threading.Event() + finished_lock = threading.Lock() finished_futures = [] def add_to_done(future): - with lock: + with finished_lock: finished_futures.append(future) - event.set() + finished_event.set() for future in pending: future.add_done_callback(add_to_done) while pending: - event.wait() - with lock: + finished_event.wait() + with finished_lock: done = finished_futures finished_futures = [] - event.clear() + finished_event.clear() for future in done: pending.remove(future) diff --git a/tests/test_futures.py b/tests/test_futures.py index 65aa3845e266..46e335d0af4e 100644 --- a/tests/test_futures.py +++ b/tests/test_futures.py @@ -91,7 +91,9 @@ def my_test_task(seconds): with ThreadPoolTaskRunner() as runner: futures = [] - for i in range(5, 25, 5): + timings = [1, 5, 10] + + for i in timings: parameters = {"seconds": i} future = runner.submit(my_test_task, parameters) future.parameters = parameters @@ -99,7 +101,7 @@ def my_test_task(seconds): results = [] for future in as_completed(futures): results.append(future.result()) - assert results == [5, 10, 15, 20] + assert results == timings async def test_as_completed_yields_correct_order_dist(self, task_run): @task @@ -110,7 +112,7 @@ async def my_task(seconds): return seconds futures = [] - timings = [1, 3, 5] + timings = [1, 5, 10] for i in timings: task_run = await my_task.create_run(parameters={"seconds": i}) future = PrefectDistributedFuture(task_run_id=task_run.id) diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index 9e761670e3d3..fb673daeee9e 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -61,6 +61,9 @@ def result( self.wait() return self._state.result() + def add_done_callback(self, fn): + return super().add_done_callback(fn) + @property def state(self) -> Any: return self._state