Skip to content

Commit

Permalink
fix: remove the single step retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Novice Lee authored and Novice Lee committed Dec 23, 2024
1 parent bb7d8eb commit 3e48f54
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 99 deletions.
4 changes: 2 additions & 2 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from core.app.entities.app_invoke_entities import InvokeFrom
from factories import variable_factory
from fields.workflow_fields import workflow_fields
from fields.workflow_run_fields import single_step_node_execution_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs import helper
from libs.helper import TimestampField, uuid_value
from libs.login import current_user, login_required
Expand Down Expand Up @@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource):
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(single_step_node_execution_fields)
@marshal_with(workflow_run_node_execution_fields)
def post(self, app_model: App, node_id: str):
"""
Run draft workflow node
Expand Down
6 changes: 3 additions & 3 deletions api/core/helper/ssrf_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
)

retries = 0
stream = kwargs.pop("stream", False)
while retries <= max_retries:
try:
if dify_config.SSRF_PROXY_ALL_URL:
Expand All @@ -63,15 +64,14 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list")

except httpx.RequestError as e:
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")
if max_retries == 0:
raise
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")

retries += 1
if retries <= max_retries:
time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1)))
if max_retries != 0:
raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")
raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")


def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
Expand Down
5 changes: 0 additions & 5 deletions api/fields/workflow_run_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@
"finished_at": TimestampField,
}

single_step_node_execution_fields = {
**workflow_run_node_execution_fields,
"retry_events": fields.List(fields.Nested(retry_event_field)),
}

workflow_run_node_execution_list_fields = {
"data": fields.List(fields.Nested(workflow_run_node_execution_fields)),
}
137 changes: 48 additions & 89 deletions api/services/workflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.event.event import SingleStepRetryEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.workflow_entry import WorkflowEntry
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
Expand Down Expand Up @@ -221,95 +220,56 @@ def run_draft_workflow_node(

# run draft workflow node
start_at = time.perf_counter()
retries = 0
max_retries = 0
should_retry = True
retry_events = []

try:
while retries <= max_retries and should_retry:
retry_start_at = time.perf_counter()
node_instance, generator = WorkflowEntry.single_step_run(
workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
)
node_instance = cast(BaseNode[BaseNodeData], node_instance)
max_retries = (
node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0
)
retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
node_run_result: NodeRunResult | None = None
for event in generator:
if isinstance(event, RunCompletedEvent):
node_run_result = event.run_result

# sign output files
node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
break

if not node_run_result:
raise ValueError("Node run failed with no run result")
# single step debug mode error handling return
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
if (
retries == max_retries
and node_instance.node_type == NodeType.HTTP_REQUEST
and node_run_result.outputs
and not node_instance.should_continue_on_error
):
node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
should_retry = False
else:
if node_instance.should_retry:
node_run_result.status = WorkflowNodeExecutionStatus.RETRY
retries += 1
node_run_result.retry_index = retries
retry_events.append(
SingleStepRetryEvent(
elapsed_time=time.perf_counter() - retry_start_at,
inputs=WorkflowEntry.handle_special_values(node_run_result.inputs),
process_data=WorkflowEntry.handle_special_values(node_run_result.process_data),
outputs=WorkflowEntry.handle_special_values(node_run_result.outputs),
metadata=node_run_result.metadata,
llm_usage=node_run_result.llm_usage,
error=node_run_result.error,
retry_index=node_run_result.retry_index,
)
)
time.sleep(retry_interval)
else:
should_retry = False
if node_instance.should_continue_on_error:
node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error,
"inputs": node_run_result.inputs,
"metadata": {"error_strategy": node_instance.node_data.error_strategy},
}
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
**node_instance.node_data.default_value_dict,
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
else:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
run_succeeded = node_run_result.status in (
WorkflowNodeExecutionStatus.SUCCEEDED,
WorkflowNodeExecutionStatus.EXCEPTION,
)
error = node_run_result.error if not run_succeeded else None
node_instance, generator = WorkflowEntry.single_step_run(
workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
)
node_instance = cast(BaseNode[BaseNodeData], node_instance)
node_run_result: NodeRunResult | None = None
for event in generator:
if isinstance(event, RunCompletedEvent):
node_run_result = event.run_result

# sign output files
node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
break

if not node_run_result:
raise ValueError("Node run failed with no run result")
# single step debug mode error handling return
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error,
"inputs": node_run_result.inputs,
"metadata": {"error_strategy": node_instance.node_data.error_strategy},
}
if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
**node_instance.node_data.default_value_dict,
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
else:
node_run_result = NodeRunResult(
**node_error_args,
outputs={
"error_message": node_run_result.error,
"error_type": node_run_result.error_type,
},
)
run_succeeded = node_run_result.status in (
WorkflowNodeExecutionStatus.SUCCEEDED,
WorkflowNodeExecutionStatus.EXCEPTION,
)
error = node_run_result.error if not run_succeeded else None
except WorkflowNodeRunFailedError as e:
node_instance = e.node_instance
run_succeeded = False
Expand Down Expand Up @@ -358,7 +318,6 @@ def run_draft_workflow_node(

db.session.add(workflow_node_execution)
db.session.commit()
workflow_node_execution.retry_events = retry_events

return workflow_node_execution

Expand Down

0 comments on commit 3e48f54

Please sign in to comment.