Skip to content

Commit

Permalink
Merge branch 'refs/heads/feat/workflow-parallel-support' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
takatost committed Sep 9, 2024
2 parents 01f180f + 4071ea4 commit d09a2a5
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 89 deletions.
13 changes: 9 additions & 4 deletions api/core/app/apps/workflow_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.route_node_state.start_at,
node_run_index=event.route_node_state.index,
predecessor_node_id=event.predecessor_node_id
predecessor_node_id=event.predecessor_node_id,
in_iteration_id=event.in_iteration_id
)
)
elif isinstance(event, NodeRunSucceededEvent):
Expand All @@ -226,6 +227,7 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
if event.route_node_state.node_run_result else {},
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result else {},
in_iteration_id=event.in_iteration_id
)
)
elif isinstance(event, NodeRunFailedEvent):
Expand All @@ -249,20 +251,23 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
error=event.route_node_state.node_run_result.error
if event.route_node_state.node_run_result
and event.route_node_state.node_run_result.error
else "Unknown error"
else "Unknown error",
in_iteration_id=event.in_iteration_id
)
)
elif isinstance(event, NodeRunStreamChunkEvent):
self._publish_event(
QueueTextChunkEvent(
text=event.chunk_content,
from_variable_selector=event.from_variable_selector
from_variable_selector=event.from_variable_selector,
in_iteration_id=event.in_iteration_id
)
)
elif isinstance(event, NodeRunRetrieverResourceEvent):
self._publish_event(
QueueRetrieverResourcesEvent(
retriever_resources=event.retriever_resources
retriever_resources=event.retriever_resources,
in_iteration_id=event.in_iteration_id
)
)
elif isinstance(event, ParallelBranchRunStartedEvent):
Expand Down
10 changes: 10 additions & 0 deletions api/core/app/entities/queue_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ class QueueTextChunkEvent(AppQueueEvent):
text: str
from_variable_selector: Optional[list[str]] = None
"""from variable selector"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""


class QueueAgentMessageEvent(AppQueueEvent):
Expand All @@ -176,6 +178,8 @@ class QueueRetrieverResourcesEvent(AppQueueEvent):
"""
event: QueueEvent = QueueEvent.RETRIEVER_RESOURCES
retriever_resources: list[dict]
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""


class QueueAnnotationReplyEvent(AppQueueEvent):
Expand Down Expand Up @@ -245,6 +249,8 @@ class QueueNodeStartedEvent(AppQueueEvent):
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""
start_at: datetime


Expand All @@ -266,6 +272,8 @@ class QueueNodeSucceededEvent(AppQueueEvent):
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""
start_at: datetime

inputs: Optional[dict[str, Any]] = None
Expand Down Expand Up @@ -294,6 +302,8 @@ class QueueNodeFailedEvent(AppQueueEvent):
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""
start_at: datetime

inputs: Optional[dict[str, Any]] = None
Expand Down
4 changes: 4 additions & 0 deletions api/core/app/entities/task_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class Data(BaseModel):
parallel_start_node_id: Optional[str] = None
parent_parallel_id: Optional[str] = None
parent_parallel_start_node_id: Optional[str] = None
iteration_id: Optional[str] = None

event: StreamEvent = StreamEvent.NODE_STARTED
workflow_run_id: str
Expand All @@ -247,6 +248,7 @@ def to_ignore_detail_dict(self):
"parallel_start_node_id": self.data.parallel_start_node_id,
"parent_parallel_id": self.data.parent_parallel_id,
"parent_parallel_start_node_id": self.data.parent_parallel_start_node_id,
"iteration_id": self.data.iteration_id,
}
}

Expand Down Expand Up @@ -280,6 +282,7 @@ class Data(BaseModel):
parallel_start_node_id: Optional[str] = None
parent_parallel_id: Optional[str] = None
parent_parallel_start_node_id: Optional[str] = None
iteration_id: Optional[str] = None

event: StreamEvent = StreamEvent.NODE_FINISHED
workflow_run_id: str
Expand Down Expand Up @@ -311,6 +314,7 @@ def to_ignore_detail_dict(self):
"parallel_start_node_id": self.data.parallel_start_node_id,
"parent_parallel_id": self.data.parent_parallel_id,
"parent_parallel_start_node_id": self.data.parent_parallel_start_node_id,
"iteration_id": self.data.iteration_id,
}
}

Expand Down
2 changes: 2 additions & 0 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ def _workflow_node_start_to_stream_response(
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
iteration_id=event.in_iteration_id,
),
)

Expand Down Expand Up @@ -448,6 +449,7 @@ def _workflow_node_finish_to_stream_response(
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
iteration_id=event.in_iteration_id,
),
)

Expand Down
13 changes: 10 additions & 3 deletions web/app/components/workflow/hooks/use-workflow-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import {
} from '@/service/workflow'
import { useFeaturesStore } from '@/app/components/base/features/hooks'
import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager'
import type {
NodeFinishedResponse,
NodeStartedResponse,
} from '@/types/workflow'

export const useWorkflowRun = () => {
const store = useStoreApi()
Expand Down Expand Up @@ -87,7 +91,10 @@ export const useWorkflowRun = () => {

const handleRun = useCallback(async (
params: any,
callback?: IOtherOptions,
callback?: Omit<IOtherOptions, 'onNodeStarted' | 'onNodeFinished'> & {
onNodeStarted?: (nodeStarted: NodeStartedResponse, parentId?: string) => void
onNodeFinished?: (nodeFinished: NodeFinishedResponse, parentId?: string) => void
},
) => {
const {
getNodes,
Expand Down Expand Up @@ -298,7 +305,7 @@ export const useWorkflowRun = () => {
setEdges(newEdges)
}
if (onNodeStarted)
onNodeStarted(params)
onNodeStarted(params, node?.parentId)
},
onNodeFinished: (params) => {
const { data } = params
Expand Down Expand Up @@ -367,7 +374,7 @@ export const useWorkflowRun = () => {
}

if (onNodeFinished)
onNodeFinished(params)
onNodeFinished(params, nodeParentId)
},
onIterationStart: (params) => {
const { data } = params
Expand Down
124 changes: 42 additions & 82 deletions web/app/components/workflow/panel/debug-and-preview/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export const useChat = (
}

let isInIteration = false
const iterationInfoMap = {} as Record<string, boolean>

handleResponding(true)

Expand Down Expand Up @@ -319,9 +320,9 @@ export const useChat = (
}))
isInIteration = true
},
onIterationNext: () => {
onIterationNext: ({ data }) => {
const tracing = responseItem.workflowProcess!.tracing!
const iterations = tracing[tracing.length - 1]
const iterations = tracing.find(item => item.node_id === data.node_id)!
iterations.details!.push([])

handleUpdateChatList(produce(chatListRef.current, (draft) => {
Expand All @@ -331,9 +332,9 @@ export const useChat = (
},
onIterationFinish: ({ data }) => {
const tracing = responseItem.workflowProcess!.tracing!
const iterations = tracing[tracing.length - 1]
tracing[tracing.length - 1] = {
...iterations,
const iterationsIndex = tracing.findIndex(item => item.node_id === data.node_id)!
tracing[iterationsIndex] = {
...tracing[iterationsIndex],
...data,
status: NodeRunningStatus.Succeeded,
} as any
Expand All @@ -344,85 +345,44 @@ export const useChat = (

isInIteration = false
},
onNodeStarted: ({ data }) => {
if (isInIteration) {
const tracing = responseItem.workflowProcess!.tracing!
const iterations = tracing[tracing.length - 1]
const currIteration = iterations.details![iterations.details!.length - 1]
currIteration.push({
...data,
status: NodeRunningStatus.Running,
} as any)
handleUpdateChatList(produce(chatListRef.current, (draft) => {
const currentIndex = draft.length - 1
draft[currentIndex] = responseItem
}))
}
else {
responseItem.workflowProcess!.tracing!.push({
...data,
status: NodeRunningStatus.Running,
} as any)
handleUpdateChatList(produce(chatListRef.current, (draft) => {
const currentIndex = draft.findIndex(item => item.id === responseItem.id)
draft[currentIndex] = {
...draft[currentIndex],
...responseItem,
}
}))
}
onNodeStarted: ({ data }, parentId) => {
if (parentId)
return

responseItem.workflowProcess!.tracing!.push({
...data,
status: NodeRunningStatus.Running,
} as any)
handleUpdateChatList(produce(chatListRef.current, (draft) => {
const currentIndex = draft.findIndex(item => item.id === responseItem.id)
draft[currentIndex] = {
...draft[currentIndex],
...responseItem,
}
}))
},
onNodeFinished: ({ data }) => {
if (isInIteration) {
const tracing = responseItem.workflowProcess!.tracing!
const iterations = tracing[tracing.length - 1]
if (iterations && iterations.details) {
const iterationIndex = data.execution_metadata?.iteration_index || 0
if (!iterations.details[iterationIndex])
iterations.details[iterationIndex] = []
const currIteration = iterations.details[iterationIndex]
const nodeIndex = currIteration.findIndex(node =>
node.node_id === data.node_id,
)
if (data.status === NodeRunningStatus.Succeeded) {
if (nodeIndex !== -1) {
currIteration[nodeIndex] = {
...currIteration[nodeIndex],
...data,
} as any
}
else {
currIteration.push({
...data,
} as any)
}
}
onNodeFinished: ({ data }, parentId) => {
if (parentId)
return

const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => {
if (!item.execution_metadata?.parallel_id)
return item.node_id === data.node_id
return item.node_id === data.node_id && item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id
})
responseItem.workflowProcess!.tracing[currentIndex] = {
...(responseItem.workflowProcess!.tracing[currentIndex]?.extras
? { extras: responseItem.workflowProcess!.tracing[currentIndex].extras }
: {}),
...data,
} as any
handleUpdateChatList(produce(chatListRef.current, (draft) => {
const currentIndex = draft.findIndex(item => item.id === responseItem.id)
draft[currentIndex] = {
...draft[currentIndex],
...responseItem,
}
handleUpdateChatList(produce(chatListRef.current, (draft) => {
const currentIndex = draft.length - 1
draft[currentIndex] = responseItem
}))
}
else {
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => {
if (!item.execution_metadata?.parallel_id)
return item.node_id === data.node_id
return item.node_id === data.node_id && item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id
})
responseItem.workflowProcess!.tracing[currentIndex] = {
...(responseItem.workflowProcess!.tracing[currentIndex].extras
? { extras: responseItem.workflowProcess!.tracing[currentIndex].extras }
: {}),
...data,
} as any
handleUpdateChatList(produce(chatListRef.current, (draft) => {
const currentIndex = draft.findIndex(item => item.id === responseItem.id)
draft[currentIndex] = {
...draft[currentIndex],
...responseItem,
}
}))
}
}))
},
},
)
Expand Down

0 comments on commit d09a2a5

Please sign in to comment.