Skip to content

Commit

Permalink
fix: thread_pool submit count in parallel workflow not releasing (#8549)
Browse files Browse the repository at this point in the history
  • Loading branch information
takatost authored Sep 19, 2024
1 parent 54b9e1f commit ffd2f61
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions api/core/workflow/graph_engine/graph_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def submit(self, fn, *args, **kwargs):

return super().submit(fn, *args, **kwargs)

def task_done_callback(self, future):
self.submit_count -= 1

def check_is_full(self) -> None:
print(f"submit_count: {self.submit_count}, max_submit_count: {self.max_submit_count}")
if self.submit_count > self.max_submit_count:
Expand Down Expand Up @@ -426,20 +429,22 @@ def _run_parallel_branches(
):
continue

futures.append(
self.thread_pool.submit(
self._run_parallel_node,
**{
"flask_app": current_app._get_current_object(), # type: ignore[attr-defined]
"q": q,
"parallel_id": parallel_id,
"parallel_start_node_id": edge.target_node_id,
"parent_parallel_id": in_parallel_id,
"parent_parallel_start_node_id": parallel_start_node_id,
},
)
future = self.thread_pool.submit(
self._run_parallel_node,
**{
"flask_app": current_app._get_current_object(), # type: ignore[attr-defined]
"q": q,
"parallel_id": parallel_id,
"parallel_start_node_id": edge.target_node_id,
"parent_parallel_id": in_parallel_id,
"parent_parallel_start_node_id": parallel_start_node_id,
},
)

future.add_done_callback(self.thread_pool.task_done_callback)

futures.append(future)

succeeded_count = 0
while True:
try:
Expand Down

0 comments on commit ffd2f61

Please sign in to comment.