Skip to content

Commit

Permalink
fix(workflow_service): assign UUID to workflow_node_execution id and …
Browse files Browse the repository at this point in the history
…update optional fields in WorkflowRun and WorkflowNodeExecution models (#12096)

Signed-off-by: -LAN- <[email protected]>
  • Loading branch information
laipz8200 authored Dec 25, 2024
1 parent d5f3321 commit 822af70
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 329 deletions.
187 changes: 119 additions & 68 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
from models.workflow import (
Workflow,
WorkflowNodeExecution,
WorkflowRun,
WorkflowRunStatus,
)

Expand Down Expand Up @@ -104,10 +103,12 @@ def __init__(
)

if isinstance(user, EndUser):
self._user_id = user.session_id
self._user_id = user.id
user_session_id = user.session_id
self._created_by_role = CreatedByRole.END_USER
elif isinstance(user, Account):
self._user_id = user.id
user_session_id = user.id
self._created_by_role = CreatedByRole.ACCOUNT
else:
raise NotImplementedError(f"User type not supported: {type(user)}")
Expand All @@ -125,7 +126,7 @@ def __init__(
SystemVariableKey.QUERY: message.query,
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.CONVERSATION_ID: conversation.id,
SystemVariableKey.USER_ID: self._user_id,
SystemVariableKey.USER_ID: user_session_id,
SystemVariableKey.DIALOGUE_COUNT: dialogue_count,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
Expand All @@ -137,6 +138,7 @@ def __init__(

self._conversation_name_generate_thread = None
self._recorded_files: list[Mapping[str, Any]] = []
self._workflow_run_id = ""

def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
"""
Expand Down Expand Up @@ -266,7 +268,6 @@ def _process_stream_response(
"""
# init fake graph runtime state
graph_runtime_state: Optional[GraphRuntimeState] = None
workflow_run: Optional[WorkflowRun] = None

for queue_message in self._queue_manager.listen():
event = queue_message.event
Expand All @@ -291,111 +292,163 @@ def _process_stream_response(
user_id=self._user_id,
created_by_role=self._created_by_role,
)
self._workflow_run_id = workflow_run.id
message = self._get_message(session=session)
if not message:
raise ValueError(f"Message not found: {self._message_id}")
message.workflow_run_id = workflow_run.id
session.commit()

workflow_start_resp = self._workflow_start_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
session.commit()

yield workflow_start_resp
elif isinstance(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

node_retry_resp = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
workflow_node_execution = self._handle_workflow_node_execution_retried(
session=session, workflow_run=workflow_run, event=event
)
node_retry_resp = self._workflow_node_retry_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()

if node_retry_resp:
yield node_retry_resp
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
workflow_node_execution = self._handle_node_execution_start(
session=session, workflow_run=workflow_run, event=event
)

node_start_resp = self._workflow_node_start_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
node_start_resp = self._workflow_node_start_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()

if node_start_resp:
yield node_start_resp
elif isinstance(event, QueueNodeSucceededEvent):
workflow_node_execution = self._handle_workflow_node_execution_success(event)

# Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END]:
self._recorded_files.extend(self._fetch_files_from_node_outputs(event.outputs or {}))

node_finish_resp = self._workflow_node_finish_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
with Session(db.engine) as session:
workflow_node_execution = self._handle_workflow_node_execution_success(session=session, event=event)

node_finish_resp = self._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()

if node_finish_resp:
yield node_finish_resp
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
with Session(db.engine) as session:
workflow_node_execution = self._handle_workflow_node_execution_failed(session=session, event=event)

node_finish_resp = self._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()

node_finish_resp = self._workflow_node_finish_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if node_finish_resp:
yield node_finish_resp

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
parallel_start_resp = self._workflow_parallel_branch_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield parallel_start_resp
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
parallel_finish_resp = self._workflow_parallel_branch_finished_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield parallel_finish_resp
elif isinstance(event, QueueIterationStartEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_start_resp = self._workflow_iteration_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield iter_start_resp
elif isinstance(event, QueueIterationNextEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_next_resp = self._workflow_iteration_next_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield iter_next_resp
elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
with Session(db.engine) as session:
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
iter_finish_resp = self._workflow_iteration_completed_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield iter_finish_resp
elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
Expand All @@ -404,7 +457,7 @@ def _process_stream_response(
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_success(
session=session,
workflow_run=workflow_run,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
Expand All @@ -421,16 +474,15 @@ def _process_stream_response(
yield workflow_finish_resp
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")

with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_partial_success(
session=session,
workflow_run=workflow_run,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
Expand All @@ -439,7 +491,6 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)

workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand All @@ -448,16 +499,15 @@ def _process_stream_response(
yield workflow_finish_resp
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowFailedEvent):
if not workflow_run:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise ValueError("graph runtime state not initialized.")

with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_failed(
session=session,
workflow_run=workflow_run,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
Expand All @@ -473,15 +523,16 @@ def _process_stream_response(
err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_run.error}"))
err = self._handle_error(event=err_event, session=session, message_id=self._message_id)
session.commit()

yield workflow_finish_resp
yield self._error_to_stream_response(err)
break
elif isinstance(event, QueueStopEvent):
if workflow_run and graph_runtime_state:
if self._workflow_run_id and graph_runtime_state:
with Session(db.engine) as session:
workflow_run = self._handle_workflow_run_failed(
session=session,
workflow_run=workflow_run,
workflow_run_id=self._workflow_run_id,
start_at=graph_runtime_state.start_at,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
Expand All @@ -490,7 +541,6 @@ def _process_stream_response(
conversation_id=self._conversation_id,
trace_manager=trace_manager,
)

workflow_finish_resp = self._workflow_finish_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
Expand All @@ -499,6 +549,7 @@ def _process_stream_response(
# Save message
self._save_message(session=session, graph_runtime_state=graph_runtime_state)
session.commit()

yield workflow_finish_resp

yield self._message_end_to_stream_response()
Expand Down
Loading

0 comments on commit 822af70

Please sign in to comment.