Skip to content

Commit

Permalink
test formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano committed Jul 24, 2024
1 parent dfc529c commit 794d7d5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
16 changes: 8 additions & 8 deletions src/prefect/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def result(
def add_done_callback(self, fn: Callable[[PrefectFuture], None]):
if not self._final_state:

def call_with_self():
def call_with_self(future):
fn(self)

self._wrapped_future.add_done_callback(call_with_self)
Expand Down Expand Up @@ -345,24 +345,24 @@ def as_completed(
pending = unique_futures - done
yield from done

event = threading.Event()
lock = threading.Lock()
finished_event = threading.Event()
finished_lock = threading.Lock()
finished_futures = []

def add_to_done(future):
with lock:
with finished_lock:
finished_futures.append(future)
event.set()
finished_event.set()

for future in pending:
future.add_done_callback(add_to_done)

while pending:
event.wait()
with lock:
finished_event.wait()
with finished_lock:
done = finished_futures
finished_futures = []
event.clear()
finished_event.clear()

for future in done:
pending.remove(future)
Expand Down
8 changes: 5 additions & 3 deletions tests/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,17 @@ def my_test_task(seconds):

with ThreadPoolTaskRunner() as runner:
futures = []
for i in range(5, 25, 5):
timings = [1, 5, 10]

for i in timings:
parameters = {"seconds": i}
future = runner.submit(my_test_task, parameters)
future.parameters = parameters
futures.append(future)
results = []
for future in as_completed(futures):
results.append(future.result())
assert results == [5, 10, 15, 20]
assert results == timings

async def test_as_completed_yields_correct_order_dist(self, task_run):
@task
Expand All @@ -110,7 +112,7 @@ async def my_task(seconds):
return seconds

futures = []
timings = [1, 3, 5]
timings = [1, 5, 10]
for i in timings:
task_run = await my_task.create_run(parameters={"seconds": i})
future = PrefectDistributedFuture(task_run_id=task_run.id)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def result(
self.wait()
return self._state.result()

def add_done_callback(self, fn):
return super().add_done_callback(fn)

@property
def state(self) -> Any:
return self._state
Expand Down

0 comments on commit 794d7d5

Please sign in to comment.