Skip to content

Commit

Permalink
Re-raise exceptions from thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
noah-weingarden committed Nov 22, 2023
1 parent a5d6976 commit 6fffb63
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ def map_stage(exe, input_dir, output_dir):
chunk,
))
part_num += 1
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished map executions: %s", part_num)


Expand Down Expand Up @@ -369,7 +372,10 @@ def reduce_stage(exe, input_dir, output_dir):
input_path,
output_path,
))
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished reduce executions: %s", i+1)


Expand Down

0 comments on commit 6fffb63

Please sign in to comment.