Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove the unused retry index field #11903

Merged
merged 12 commits into from
Dec 23, 2024
61 changes: 32 additions & 29 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
else:
continue

raise Exception("Queue listening stopped unexpectedly.")
raise ValueError("queue listening stopped unexpectedly.")

def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
Expand Down Expand Up @@ -291,9 +291,27 @@ def _process_stream_response(
yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
elif isinstance(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)

Expand Down Expand Up @@ -331,63 +349,48 @@ def _process_stream_response(

if response:
yield response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationStartEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationNextEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("workflow run not initialized.")

workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
Expand All @@ -406,10 +409,10 @@ def _process_stream_response(
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run,
Expand All @@ -429,10 +432,10 @@ def _process_stream_response(
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run,
Expand Down Expand Up @@ -522,7 +525,7 @@ def _process_stream_response(
yield self._message_replace_to_stream_response(answer=event.text)
elif isinstance(event, QueueAdvancedChatMessageEndEvent):
if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
if output_moderation_answer:
Expand Down
62 changes: 32 additions & 30 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
else:
continue

raise Exception("Queue listening stopped unexpectedly.")
raise ValueError("queue listening stopped unexpectedly.")

def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
Expand Down Expand Up @@ -218,7 +218,7 @@ def _wrapper_process_stream_response(
break
else:
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
except Exception as e:
except Exception:
logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
break
if tts_publisher:
Expand Down Expand Up @@ -254,9 +254,27 @@ def _process_stream_response(
yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
elif isinstance(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)

Expand Down Expand Up @@ -289,64 +307,48 @@ def _process_stream_response(
)
if node_failed_response:
yield node_failed_response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationStartEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationNextEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
Expand All @@ -366,10 +368,10 @@ def _process_stream_response(
)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run,
Expand All @@ -390,10 +392,10 @@ def _process_stream_response(
)
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run,
start_at=graph_runtime_state.start_at,
Expand Down
Loading
Loading