From 537068cfde3e089cb8f047beecc034d8b13a3e71 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 9 Dec 2024 15:41:20 +0800 Subject: [PATCH] refactor(iteration_node): use Sequence and Mapping in parameters (#11483) Signed-off-by: -LAN- --- .../workflow/graph_engine/entities/event.py | 17 +++++----- .../nodes/iteration/iteration_node.py | 31 ++++++++++--------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 3736e632c3f1eb..cb73da3cd613f6 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -1,3 +1,4 @@ +from collections.abc import Mapping from datetime import datetime from typing import Any, Optional @@ -140,8 +141,8 @@ class BaseIterationEvent(GraphEngineEvent): class IterationRunStartedEvent(BaseIterationEvent): start_at: datetime = Field(..., description="start at") - inputs: Optional[dict[str, Any]] = None - metadata: Optional[dict[str, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + metadata: Optional[Mapping[str, Any]] = None predecessor_node_id: Optional[str] = None @@ -153,18 +154,18 @@ class IterationRunNextEvent(BaseIterationEvent): class IterationRunSucceededEvent(BaseIterationEvent): start_at: datetime = Field(..., description="start at") - inputs: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None - metadata: Optional[dict[str, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None + metadata: Optional[Mapping[str, Any]] = None steps: int = 0 iteration_duration_map: Optional[dict[str, float]] = None class IterationRunFailedEvent(BaseIterationEvent): start_at: datetime = Field(..., description="start at") - inputs: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None - metadata: Optional[dict[str, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None + metadata: Optional[Mapping[str, Any]] = None steps: int = 0 error: str = Field(..., description="failed reason") diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 75a2fb78a2700b..5b3853c5d6279c 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -167,17 +167,17 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: for index, item in enumerate(iterator_list_value): future: Future = thread_pool.submit( self._run_single_iter_parallel, - current_app._get_current_object(), # type: ignore - q, - iterator_list_value, - inputs, - outputs, - start_at, - graph_engine, - iteration_graph, - index, - item, - iter_run_map, + flask_app=current_app._get_current_object(), # type: ignore + q=q, + iterator_list_value=iterator_list_value, + inputs=inputs, + outputs=outputs, + start_at=start_at, + graph_engine=graph_engine, + iteration_graph=iteration_graph, + index=index, + item=item, + iter_run_map=iter_run_map, ) future.add_done_callback(thread_pool.task_done_callback) futures.append(future) @@ -370,9 +370,9 @@ def _handle_event_metadata( def _run_single_iter( self, *, - iterator_list_value: list[str], + iterator_list_value: Sequence[str], variable_pool: VariablePool, - inputs: dict[str, list], + inputs: Mapping[str, list], outputs: list, start_at: datetime, graph_engine: "GraphEngine", @@ -559,10 +559,11 @@ def _run_single_iter( def _run_single_iter_parallel( self, + *, flask_app: Flask, q: Queue, - iterator_list_value: list[str], - inputs: dict[str, list], + iterator_list_value: Sequence[str], + inputs: Mapping[str, list], outputs: list, start_at: datetime, graph_engine: "GraphEngine",