Skip to content

Commit

Permalink
Optimized _partition_tasks function by reducing iterations over task_…
Browse files Browse the repository at this point in the history
…history

Refactored the _partition_tasks function in execution_summary.py to improve performance. The function now iterates over task_history only once, reducing the computational complexity and enhancing the speed of task partitioning.
  • Loading branch information
arjun-234 committed Nov 16, 2023
1 parent fe7ecf4 commit a193a4b
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions luigi/execution_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,45 @@ def _partition_tasks(worker):
Still_pending_not_ext is only used to get upstream_failure, upstream_missing_dependency and run_by_other_worker
"""
task_history = worker._add_task_history
pending_tasks = {task for (task, status, ext) in task_history if status == 'PENDING'}
set_tasks = {}
set_tasks["completed"] = {task for (task, status, ext) in task_history if status == 'DONE' and task in pending_tasks}
set_tasks["already_done"] = {task for (task, status, ext) in task_history
if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]}
set_tasks["ever_failed"] = {task for (task, status, ext) in task_history if status == 'FAILED'}
set_tasks["failed"] = set_tasks["ever_failed"] - set_tasks["completed"]
set_tasks["scheduling_error"] = {task for (task, status, ext) in task_history if status == 'UNKNOWN'}
set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history
if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and not ext}
set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history
if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and ext}
set_tasks["run_by_other_worker"] = set()
set_tasks["upstream_failure"] = set()
set_tasks["upstream_missing_dependency"] = set()
set_tasks["upstream_run_by_other_worker"] = set()
set_tasks["upstream_scheduling_error"] = set()
set_tasks["not_run"] = set()

set_tasks = {
"completed": set(),
"already_done": set(),
"ever_failed": set(),
"failed": set(),
"scheduling_error": set(),
"still_pending_ext": set(),
"still_pending_not_ext": set(),
"run_by_other_worker": set(),
"upstream_failure": set(),
"upstream_missing_dependency": set(),
"upstream_run_by_other_worker": set(),
"upstream_scheduling_error": set(),
"not_run": set()
}

pending_tasks = set()

for task, status, ext in task_history:
if status == 'PENDING':
pending_tasks.add(task)
if task not in set_tasks["ever_failed"] and task not in set_tasks["completed"]:
if ext:
set_tasks["still_pending_not_ext"].add(task)
else:
set_tasks["still_pending_ext"].add(task)

Check warning on line 123 in luigi/execution_summary.py

View check run for this annotation

Codecov / codecov/patch

luigi/execution_summary.py#L123

Added line #L123 was not covered by tests
elif status == 'DONE':
if task in pending_tasks:
set_tasks["completed"].add(task)
elif task not in set_tasks["completed"]:
set_tasks["already_done"].add(task)

Check warning on line 128 in luigi/execution_summary.py

View check run for this annotation

Codecov / codecov/patch

luigi/execution_summary.py#L127-L128

Added lines #L127 - L128 were not covered by tests
elif status == 'FAILED':
set_tasks["ever_failed"].add(task)
if task not in set_tasks["completed"]:
set_tasks["failed"].add(task)
elif status == 'UNKNOWN':
set_tasks["scheduling_error"].add(task)

Check warning on line 134 in luigi/execution_summary.py

View check run for this annotation

Codecov / codecov/patch

luigi/execution_summary.py#L133-L134

Added lines #L133 - L134 were not covered by tests

return set_tasks


Expand Down

0 comments on commit a193a4b

Please sign in to comment.