Skip to content

Commit

Permalink
fix: remove the unused retry index field
Browse files Browse the repository at this point in the history
  • Loading branch information
Novice Lee authored and Novice Lee committed Dec 20, 2024
1 parent dacd457 commit 204d314
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 124 deletions.
4 changes: 2 additions & 2 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from core.app.entities.app_invoke_entities import InvokeFrom
from factories import variable_factory
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from fields.workflow_run_fields import single_step_node_execution_fields
from libs import helper
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
Expand Down Expand Up @@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource):
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_fields)
@marshal_with(single_step_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Run draft workflow node
Expand Down
31 changes: 16 additions & 15 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ def _process_stream_response(
yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
Expand Down Expand Up @@ -331,22 +347,7 @@ def _process_stream_response(

if response:
yield response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
Expand Down
32 changes: 16 additions & 16 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ def _process_stream_response(
yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
Expand Down Expand Up @@ -289,22 +305,6 @@ def _process_stream_response(
)
if node_failed_response:
yield node_failed_response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)

response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)

if response:
yield response

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
Expand Down
62 changes: 32 additions & 30 deletions api/core/app/apps/workflow_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,38 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
)
elif isinstance(event, GraphRunFailedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, NodeRunRetryEvent):
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.start_at,
node_run_index=event.node_run_index,
predecessor_node_id=event.predecessor_node_id,
in_iteration_id=event.in_iteration_id,
parallel_mode_run_id=event.parallel_mode_run_id,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
error=event.error,
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
retry_index=event.retry_index,
)
)
elif isinstance(event, NodeRunStartedEvent):
self._publish_event(
QueueNodeStartedEvent(
Expand Down Expand Up @@ -422,36 +454,6 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
error=event.error if isinstance(event, IterationRunFailedEvent) else None,
)
)
elif isinstance(event, NodeRunRetryEvent):
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.start_at,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
error=event.error,
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
in_iteration_id=event.in_iteration_id,
retry_index=event.retry_index,
start_index=event.start_index,
)
)

def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
"""
Expand Down
19 changes: 1 addition & 18 deletions api/core/app/entities/queue_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,35 +314,18 @@ class QueueNodeSucceededEvent(AppQueueEvent):
iteration_duration_map: Optional[dict[str, float]] = None


class QueueNodeRetryEvent(AppQueueEvent):
class QueueNodeRetryEvent(QueueNodeStartedEvent):
"""QueueNodeRetryEvent entity"""

event: QueueEvent = QueueEvent.RETRY

node_execution_id: str
node_id: str
node_type: NodeType
node_data: BaseNodeData
parallel_id: Optional[str] = None
"""parallel id if node is in parallel"""
parallel_start_node_id: Optional[str] = None
"""parallel start node id if node is in parallel"""
parent_parallel_id: Optional[str] = None
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""
start_at: datetime

inputs: Optional[dict[str, Any]] = None
process_data: Optional[dict[str, Any]] = None
outputs: Optional[dict[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None

error: str
retry_index: int # retry index
start_index: int # start index


class QueueNodeInIterationFailedEvent(AppQueueEvent):
Expand Down
5 changes: 4 additions & 1 deletion api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def _handle_workflow_node_execution_retried(
workflow_node_execution.workflow_id = workflow_run.workflow_id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
workflow_node_execution.workflow_run_id = workflow_run.id
workflow_node_execution.predecessor_node_id = event.predecessor_node_id
workflow_node_execution.node_execution_id = event.node_execution_id
workflow_node_execution.node_id = event.node_id
workflow_node_execution.node_type = event.node_type.value
Expand All @@ -461,9 +462,11 @@ def _handle_workflow_node_execution_retried(
workflow_node_execution.execution_metadata = json.dumps(
{
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
}
)
workflow_node_execution.index = event.start_index
workflow_node_execution.index = event.node_run_index

db.session.add(workflow_node_execution)
db.session.commit()
Expand Down
6 changes: 4 additions & 2 deletions api/core/helper/ssrf_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list")

except httpx.RequestError as e:
if max_retries == 0:
raise
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")

retries += 1
if retries <= max_retries:
time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1)))

raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")
if max_retries != 0:
raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")


def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions api/core/workflow/graph_engine/entities/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ class NodeInIterationFailedEvent(BaseNodeEvent):
error: str = Field(..., description="error")


class NodeRunRetryEvent(BaseNodeEvent):
class NodeRunRetryEvent(NodeRunStartedEvent):
error: str = Field(..., description="error")
retry_index: int = Field(..., description="which retry attempt is about to be performed")
start_at: datetime = Field(..., description="retry start time")
start_index: int = Field(..., description="retry start index")
node_run_index: int = Field(..., description="retry run index")


###########################################
Expand Down
7 changes: 4 additions & 3 deletions api/core/workflow/graph_engine/graph_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,14 +649,15 @@ def _run_node(
node_type=node_instance.node_type,
node_data=node_instance.node_data,
route_node_state=route_node_state,
error=run_result.error,
retry_index=retries,
predecessor_node_id=node_instance.previous_node_id,
parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
error=run_result.error,
retry_index=retries,
start_at=retry_start_at,
start_index=self.graph_runtime_state.node_run_steps,
node_run_index=self.graph_runtime_state.node_run_steps,
)
time.sleep(retry_interval)
continue
Expand Down
2 changes: 1 addition & 1 deletion api/core/workflow/nodes/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SingleStepRetryEvent(BaseModel):

inputs: dict | None = Field(..., description="input")
error: str = Field(..., description="error")
outputs: dict = Field(..., description="output")
outputs: dict | None = Field(..., description="output")
retry_index: int = Field(..., description="Retry attempt number")
error: str = Field(..., description="error")
elapsed_time: float = Field(..., description="elapsed time")
Expand Down
2 changes: 2 additions & 0 deletions api/core/workflow/nodes/http_request/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response:
response = getattr(ssrf_proxy, self.method)(**request_args)
except ssrf_proxy.MaxRetriesExceededError as e:
raise HttpRequestNodeError(str(e))
except httpx.RequestError as e:
raise HttpRequestNodeError(str(e))
return response

def invoke(self) -> Response:
Expand Down
4 changes: 4 additions & 0 deletions api/fields/workflow_run_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
"created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True),
"created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True),
"finished_at": TimestampField,
}

single_step_node_execution_fields = {
**workflow_run_node_execution_fields,
"retry_events": fields.List(fields.Nested(retry_event_field)),
}

Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion api/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,6 @@ class WorkflowNodeExecution(db.Model):
created_by_role = db.Column(db.String(255), nullable=False)
created_by = db.Column(StringUUID, nullable=False)
finished_at = db.Column(db.DateTime)
retry_index = db.Column(db.Integer, server_default=db.text("0"))

@property
def created_by_account(self):
Expand Down

0 comments on commit 204d314

Please sign in to comment.