From 063f93441317a50a6fab108c04f40a8b034eb88c Mon Sep 17 00:00:00 2001 From: Austin Weisgrau <62900254+austinweisgrau@users.noreply.github.com> Date: Wed, 21 Jun 2023 15:18:26 -0700 Subject: [PATCH] Fix condition to break while loop awaiting batch completion (#17) --- src/prefecto/concurrency.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/prefecto/concurrency.py b/src/prefecto/concurrency.py index 4bedfe6..2bbac05 100644 --- a/src/prefecto/concurrency.py +++ b/src/prefecto/concurrency.py @@ -171,14 +171,11 @@ def _map(self, batches: list[dict[str, list[Any]]]) -> list[PrefectFuture]: futures = self.task.map(**batch) results.extend(futures) # Poll futures to ensure they are not active. - is_processing: bool = False + is_processing: bool = True while is_processing: - for f in futures: - if not states.is_terminal(f.get_state()): - # If any future is still processing, break and poll again. - is_processing = True - break - else: + # If any future is still processing, loop and poll again. + # If all futures are terminal, break while loop and continue + if all(states.is_terminal(f.get_state()) for f in futures): is_processing = False # Map the last batch