diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index ec85412c1e62dd..17097268876a67 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -203,7 +203,8 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) parent_parallel_start_node_id=event.parent_parallel_start_node_id, start_at=event.route_node_state.start_at, node_run_index=event.route_node_state.index, - predecessor_node_id=event.predecessor_node_id + predecessor_node_id=event.predecessor_node_id, + in_iteration_id=event.in_iteration_id ) ) elif isinstance(event, NodeRunSucceededEvent): @@ -226,6 +227,7 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) if event.route_node_state.node_run_result else {}, execution_metadata=event.route_node_state.node_run_result.metadata if event.route_node_state.node_run_result else {}, + in_iteration_id=event.in_iteration_id ) ) elif isinstance(event, NodeRunFailedEvent): @@ -249,20 +251,23 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) error=event.route_node_state.node_run_result.error if event.route_node_state.node_run_result and event.route_node_state.node_run_result.error - else "Unknown error" + else "Unknown error", + in_iteration_id=event.in_iteration_id ) ) elif isinstance(event, NodeRunStreamChunkEvent): self._publish_event( QueueTextChunkEvent( text=event.chunk_content, - from_variable_selector=event.from_variable_selector + from_variable_selector=event.from_variable_selector, + in_iteration_id=event.in_iteration_id ) ) elif isinstance(event, NodeRunRetrieverResourceEvent): self._publish_event( QueueRetrieverResourcesEvent( - retriever_resources=event.retriever_resources + retriever_resources=event.retriever_resources, + in_iteration_id=event.in_iteration_id ) ) elif isinstance(event, ParallelBranchRunStartedEvent): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index e4d2ab44d5b5a3..4c86b7eee13e3e 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -152,6 +152,8 @@ class QueueTextChunkEvent(AppQueueEvent): text: str from_variable_selector: Optional[list[str]] = None """from variable selector""" + in_iteration_id: Optional[str] = None + """iteration id if node is in iteration""" class QueueAgentMessageEvent(AppQueueEvent): @@ -176,6 +178,8 @@ class QueueRetrieverResourcesEvent(AppQueueEvent): """ event: QueueEvent = QueueEvent.RETRIEVER_RESOURCES retriever_resources: list[dict] + in_iteration_id: Optional[str] = None + """iteration id if node is in iteration""" class QueueAnnotationReplyEvent(AppQueueEvent): @@ -245,6 +249,8 @@ class QueueNodeStartedEvent(AppQueueEvent): """parent parallel id if node is in parallel""" parent_parallel_start_node_id: Optional[str] = None """parent parallel start node id if node is in parallel""" + in_iteration_id: Optional[str] = None + """iteration id if node is in iteration""" start_at: datetime @@ -266,6 +272,8 @@ class QueueNodeSucceededEvent(AppQueueEvent): """parent parallel id if node is in parallel""" parent_parallel_start_node_id: Optional[str] = None """parent parallel start node id if node is in parallel""" + in_iteration_id: Optional[str] = None + """iteration id if node is in iteration""" start_at: datetime inputs: Optional[dict[str, Any]] = None @@ -294,6 +302,8 @@ class QueueNodeFailedEvent(AppQueueEvent): """parent parallel id if node is in parallel""" parent_parallel_start_node_id: Optional[str] = None """parent parallel start node id if node is in parallel""" + in_iteration_id: Optional[str] = None + """iteration id if node is in iteration""" start_at: datetime inputs: Optional[dict[str, Any]] = None diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index b8b17e08969f9a..7cab6ca4e0d4bd 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -223,6 +223,7 @@ class Data(BaseModel): parallel_start_node_id: Optional[str] = None parent_parallel_id: Optional[str] = None parent_parallel_start_node_id: Optional[str] = None + iteration_id: Optional[str] = None event: StreamEvent = StreamEvent.NODE_STARTED workflow_run_id: str @@ -247,6 +248,7 @@ def to_ignore_detail_dict(self): "parallel_start_node_id": self.data.parallel_start_node_id, "parent_parallel_id": self.data.parent_parallel_id, "parent_parallel_start_node_id": self.data.parent_parallel_start_node_id, + "iteration_id": self.data.iteration_id, } } @@ -280,6 +282,7 @@ class Data(BaseModel): parallel_start_node_id: Optional[str] = None parent_parallel_id: Optional[str] = None parent_parallel_start_node_id: Optional[str] = None + iteration_id: Optional[str] = None event: StreamEvent = StreamEvent.NODE_FINISHED workflow_run_id: str @@ -311,6 +314,7 @@ def to_ignore_detail_dict(self): "parallel_start_node_id": self.data.parallel_start_node_id, "parent_parallel_id": self.data.parent_parallel_id, "parent_parallel_start_node_id": self.data.parent_parallel_start_node_id, + "iteration_id": self.data.iteration_id, } } diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index e4c712ccb014b5..ed3225310a8ec4 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -394,6 +394,7 @@ def _workflow_node_start_to_stream_response( parallel_start_node_id=event.parallel_start_node_id, parent_parallel_id=event.parent_parallel_id, parent_parallel_start_node_id=event.parent_parallel_start_node_id, + iteration_id=event.in_iteration_id, ), ) @@ -448,6 +449,7 @@ def _workflow_node_finish_to_stream_response( parallel_start_node_id=event.parallel_start_node_id, parent_parallel_id=event.parent_parallel_id, parent_parallel_start_node_id=event.parent_parallel_start_node_id, + iteration_id=event.in_iteration_id, ), ) diff --git a/web/app/components/workflow/hooks/use-workflow-run.ts b/web/app/components/workflow/hooks/use-workflow-run.ts index 4f8259b2e4b8a6..f4d356afcf29b1 100644 --- a/web/app/components/workflow/hooks/use-workflow-run.ts +++ b/web/app/components/workflow/hooks/use-workflow-run.ts @@ -24,6 +24,10 @@ import { } from '@/service/workflow' import { useFeaturesStore } from '@/app/components/base/features/hooks' import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager' +import type { + NodeFinishedResponse, + NodeStartedResponse, +} from '@/types/workflow' export const useWorkflowRun = () => { const store = useStoreApi() @@ -87,7 +91,10 @@ export const useWorkflowRun = () => { const handleRun = useCallback(async ( params: any, - callback?: IOtherOptions, + callback?: Omit & { + onNodeStarted?: (nodeStarted: NodeStartedResponse, parentId?: string) => void + onNodeFinished?: (nodeFinished: NodeFinishedResponse, parentId?: string) => void + }, ) => { const { getNodes, @@ -298,7 +305,7 @@ export const useWorkflowRun = () => { setEdges(newEdges) } if (onNodeStarted) - onNodeStarted(params) + onNodeStarted(params, node?.parentId) }, onNodeFinished: (params) => { const { data } = params @@ -367,7 +374,7 @@ export const useWorkflowRun = () => { } if (onNodeFinished) - onNodeFinished(params) + onNodeFinished(params, nodeParentId) }, onIterationStart: (params) => { const { data } = params diff --git a/web/app/components/workflow/panel/debug-and-preview/hooks.ts b/web/app/components/workflow/panel/debug-and-preview/hooks.ts index 315b8dad913e74..59252e5e4d77b6 100644 --- a/web/app/components/workflow/panel/debug-and-preview/hooks.ts +++ b/web/app/components/workflow/panel/debug-and-preview/hooks.ts @@ -181,6 +181,7 @@ export const useChat = ( } let isInIteration = false + const iterationInfoMap = {} as Record handleResponding(true) @@ -319,9 +320,9 @@ export const useChat = ( })) isInIteration = true }, - onIterationNext: () => { + onIterationNext: ({ data }) => { const tracing = responseItem.workflowProcess!.tracing! - const iterations = tracing[tracing.length - 1] + const iterations = tracing.find(item => item.node_id === data.node_id)! iterations.details!.push([]) handleUpdateChatList(produce(chatListRef.current, (draft) => { @@ -331,9 +332,9 @@ export const useChat = ( }, onIterationFinish: ({ data }) => { const tracing = responseItem.workflowProcess!.tracing! - const iterations = tracing[tracing.length - 1] - tracing[tracing.length - 1] = { - ...iterations, + const iterationsIndex = tracing.findIndex(item => item.node_id === data.node_id)! + tracing[iterationsIndex] = { + ...tracing[iterationsIndex], ...data, status: NodeRunningStatus.Succeeded, } as any @@ -344,85 +345,44 @@ export const useChat = ( isInIteration = false }, - onNodeStarted: ({ data }) => { - if (isInIteration) { - const tracing = responseItem.workflowProcess!.tracing! - const iterations = tracing[tracing.length - 1] - const currIteration = iterations.details![iterations.details!.length - 1] - currIteration.push({ - ...data, - status: NodeRunningStatus.Running, - } as any) - handleUpdateChatList(produce(chatListRef.current, (draft) => { - const currentIndex = draft.length - 1 - draft[currentIndex] = responseItem - })) - } - else { - responseItem.workflowProcess!.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - } as any) - handleUpdateChatList(produce(chatListRef.current, (draft) => { - const currentIndex = draft.findIndex(item => item.id === responseItem.id) - draft[currentIndex] = { - ...draft[currentIndex], - ...responseItem, - } - })) - } + onNodeStarted: ({ data }, parentId) => { + if (parentId) + return + + responseItem.workflowProcess!.tracing!.push({ + ...data, + status: NodeRunningStatus.Running, + } as any) + handleUpdateChatList(produce(chatListRef.current, (draft) => { + const currentIndex = draft.findIndex(item => item.id === responseItem.id) + draft[currentIndex] = { + ...draft[currentIndex], + ...responseItem, + } + })) }, - onNodeFinished: ({ data }) => { - if (isInIteration) { - const tracing = responseItem.workflowProcess!.tracing! - const iterations = tracing[tracing.length - 1] - if (iterations && iterations.details) { - const iterationIndex = data.execution_metadata?.iteration_index || 0 - if (!iterations.details[iterationIndex]) - iterations.details[iterationIndex] = [] - const currIteration = iterations.details[iterationIndex] - const nodeIndex = currIteration.findIndex(node => - node.node_id === data.node_id, - ) - if (data.status === NodeRunningStatus.Succeeded) { - if (nodeIndex !== -1) { - currIteration[nodeIndex] = { - ...currIteration[nodeIndex], - ...data, - } as any - } - else { - currIteration.push({ - ...data, - } as any) - } - } + onNodeFinished: ({ data }, parentId) => { + if (parentId) + return + + const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => { + if (!item.execution_metadata?.parallel_id) + return item.node_id === data.node_id + return item.node_id === data.node_id && item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id + }) + responseItem.workflowProcess!.tracing[currentIndex] = { + ...(responseItem.workflowProcess!.tracing[currentIndex]?.extras + ? { extras: responseItem.workflowProcess!.tracing[currentIndex].extras } + : {}), + ...data, + } as any + handleUpdateChatList(produce(chatListRef.current, (draft) => { + const currentIndex = draft.findIndex(item => item.id === responseItem.id) + draft[currentIndex] = { + ...draft[currentIndex], + ...responseItem, } - handleUpdateChatList(produce(chatListRef.current, (draft) => { - const currentIndex = draft.length - 1 - draft[currentIndex] = responseItem - })) - } - else { - const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => { - if (!item.execution_metadata?.parallel_id) - return item.node_id === data.node_id - return item.node_id === data.node_id && item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id - }) - responseItem.workflowProcess!.tracing[currentIndex] = { - ...(responseItem.workflowProcess!.tracing[currentIndex].extras - ? { extras: responseItem.workflowProcess!.tracing[currentIndex].extras } - : {}), - ...data, - } as any - handleUpdateChatList(produce(chatListRef.current, (draft) => { - const currentIndex = draft.findIndex(item => item.id === responseItem.id) - draft[currentIndex] = { - ...draft[currentIndex], - ...responseItem, - } - })) - } + })) }, }, )