diff --git a/app/(playground)/p/[agentId]/components/viewer.tsx b/app/(playground)/p/[agentId]/components/viewer.tsx index c606ea34..f35bf75c 100644 --- a/app/(playground)/p/[agentId]/components/viewer.tsx +++ b/app/(playground)/p/[agentId]/components/viewer.tsx @@ -1,6 +1,7 @@ "use client"; import * as Tabs from "@radix-ui/react-tabs"; +import { CircleAlertIcon, CircleSlashIcon } from "lucide-react"; import { type DetailedHTMLProps, useMemo } from "react"; import { useExecution } from "../contexts/execution"; import { useGraph } from "../contexts/graph"; @@ -37,6 +38,12 @@ function StepExecutionButton({ {stepExecution.status === "pending" && ( )} + {stepExecution.status === "failed" && ( + + )} + {stepExecution.status === "skipped" && ( + + )} {stepExecution.status === "running" && ( )} @@ -121,11 +128,14 @@ function ExecutionViewer({ {execution.jobExecutions.flatMap((jobExecution) => jobExecution.stepExecutions.map((stepExecution) => ( - {stepExecution.artifact == null ? ( -

Pending

- ) : ( - {stepExecution.artifact.object.content} + {stepExecution.status === "pending" &&

Pending

} + {stepExecution.status === "failed" && ( +

{stepExecution.error}

)} + {stepExecution.status === "running" || + (stepExecution.status === "completed" && ( + {stepExecution.artifact?.object.content} + ))} {stepExecution.artifact?.type === "generatedArtifact" && (
diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 902da17d..54716eab 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -18,12 +18,19 @@ import { import type { Artifact, ArtifactId, + CompletedExecution, + CompletedJobExecution, + CompletedStepExecution, Execution, ExecutionId, + FailedExecution, + FailedJobExecution, + FailedStepExecution, Flow, FlowId, JobExecution, NodeId, + SkippedJobExecution, StepExecution, StepId, TextArtifactObject, @@ -90,7 +97,7 @@ const executeStep = async ( updateExecution: ( updater: (prev: Execution | null) => Execution | null, ) => void, -): Promise => { +): Promise => { const stepRunStartedAt = Date.now(); const artifactId = createArtifactId(); @@ -124,64 +131,98 @@ const executeStep = async ( }; }); - // Execute step and process stream - const stream = await executeStepAction( - flowId, - executionId, - stepExecution.stepId, - artifacts, - ); - const finalArtifact = await processStreamContent(stream, (content) => { + try { + // Execute step and process stream + const stream = await executeStepAction( + flowId, + executionId, + stepExecution.stepId, + artifacts, + ); + const finalArtifact = await processStreamContent(stream, (content) => { + updateExecution((prev) => { + if (!prev || prev.status !== "running") return null; + return { + ...prev, + artifacts: prev.artifacts.map((artifact) => + artifact.id === artifactId + ? { ...artifact, object: content } + : artifact, + ), + }; + }); + }); + + // Complete step execution + const stepDurationMs = Date.now() - stepRunStartedAt; + + const successStepExecution: CompletedStepExecution = { + ...stepExecution, + status: "completed", + runStartedAt: stepRunStartedAt, + durationMs: stepDurationMs, + }; updateExecution((prev) => { if (!prev || prev.status !== "running") return null; + return { ...prev, + jobExecutions: prev.jobExecutions.map((job) => ({ + ...job, + stepExecutions: job.stepExecutions.map((step) => + step.id === stepExecution.id ? successStepExecution : step, + ), + })), artifacts: prev.artifacts.map((artifact) => artifact.id === artifactId - ? { ...artifact, object: content } + ? { + id: artifactId, + type: "generatedArtifact", + creatorNodeId: stepExecution.nodeId, + createdAt: Date.now(), + object: finalArtifact, + } : artifact, ), }; }); - }); - // Complete step execution - const stepDurationMs = Date.now() - stepRunStartedAt; - updateExecution((prev) => { - if (!prev || prev.status !== "running") return null; - - return { - ...prev, - jobExecutions: prev.jobExecutions.map((job) => ({ - ...job, - stepExecutions: job.stepExecutions.map((step) => - step.id === stepExecution.id - ? { - ...step, - status: "completed", - runStartedAt: stepRunStartedAt, - durationMs: stepDurationMs, - } - : step, - ), - })), - artifacts: prev.artifacts.map((artifact) => - artifact.id === artifactId - ? { - id: artifactId, - type: "generatedArtifact", - creatorNodeId: stepExecution.nodeId, - createdAt: Date.now(), - object: finalArtifact, - } - : artifact, - ), + return successStepExecution; + } catch (unknownError) { + const error = toErrorWithMessage(unknownError).message; + const stepDurationMs = Date.now() - stepRunStartedAt; + const failedStepExecution: FailedStepExecution = { + ...stepExecution, + status: "failed", + runStartedAt: stepRunStartedAt, + durationMs: stepDurationMs, + error, }; - }); + updateExecution((prev) => { + if (!prev || prev.status !== "running") return null; - return stepDurationMs; + return { + ...prev, + jobExecutions: prev.jobExecutions.map((job) => ({ + ...job, + stepExecutions: job.stepExecutions.map((step) => + step.id === stepExecution.id ? failedStepExecution : step, + ), + })), + }; + }); + return failedStepExecution; + } }; +class ExecuteJobError extends Error { + executeJob: FailedStepExecution; + constructor(executeJob: FailedStepExecution) { + super(executeJob.error); + this.name = "ExecuteJobError"; + this.executeJob = executeJob; + } +} const executeJob = async ( flowId: FlowId, executionId: ExecutionId, @@ -191,7 +232,7 @@ const executeJob = async ( updateExecution: ( updater: (prev: Execution | null) => Execution | null, ) => void, -): Promise => { +): Promise => { const jobRunStartedAt = Date.now(); // Start job execution @@ -208,7 +249,7 @@ const executeJob = async ( }); // Execute all steps in parallel - const stepDurations = await Promise.all( + const stepExecutions = await Promise.all( jobExecution.stepExecutions.map((step) => executeStep( flowId, @@ -221,30 +262,52 @@ const executeJob = async ( ), ); - const jobDurationMs = stepDurations.reduce( - (sum, duration) => sum + duration, + const jobDurationMs = stepExecutions.reduce( + (sum, duration) => sum + duration.durationMs, 0, ); + const allStepsCompleted = stepExecutions.every( + (step) => step.status === "completed", + ); + + if (allStepsCompleted) { + // Complete job execution + const completedJobExecution: CompletedJobExecution = { + ...jobExecution, + stepExecutions, + status: "completed", + runStartedAt: jobRunStartedAt, + durationMs: jobDurationMs, + }; + updateExecution((prev) => { + if (!prev) return null; + return { + ...prev, + jobExecutions: prev.jobExecutions.map((job) => + job.id === jobExecution.id ? completedJobExecution : job, + ), + }; + }); + return completedJobExecution; + } - // Complete job execution + const failedJobExecution: FailedJobExecution = { + ...jobExecution, + stepExecutions, + status: "failed", + runStartedAt: jobRunStartedAt, + durationMs: jobDurationMs, + }; updateExecution((prev) => { if (!prev) return null; return { ...prev, jobExecutions: prev.jobExecutions.map((job) => - job.id === jobExecution.id - ? { - ...job, - status: "completed", - runStartedAt: jobRunStartedAt, - durationMs: jobDurationMs, - } - : job, + job.id === jobExecution.id ? failedJobExecution : job, ), }; }); - - return jobDurationMs; + return failedJobExecution; }; interface ExecutionContextType { @@ -394,10 +457,29 @@ export function ExecutionProvider({ setExecution(currentExecution); let totalFlowDurationMs = 0; + let hasFailed = false; // Execute jobs sequentially for (const jobExecution of jobExecutions) { - const jobDurationMs = await executeJob( + if (hasFailed) { + const skippedJob: SkippedJobExecution = { + ...jobExecution, + status: "skipped", + stepExecutions: jobExecution.stepExecutions.map((step) => ({ + ...step, + status: "skipped", + })), + }; + currentExecution = { + ...currentExecution, + jobExecutions: currentExecution.jobExecutions.map((job) => + job.id === jobExecution.id ? skippedJob : job, + ), + }; + setExecution(currentExecution); + continue; + } + const executedJob = await executeJob( flowId, executionId, jobExecution, @@ -411,19 +493,31 @@ export function ExecutionProvider({ } }, ); - totalFlowDurationMs += jobDurationMs; + totalFlowDurationMs += executedJob.durationMs; + if (executedJob.status === "failed") { + hasFailed = true; + } + } + if (hasFailed) { + const failedExecution: FailedExecution = { + ...currentExecution, + status: "failed", + runStartedAt: flowRunStartedAt, + durationMs: totalFlowDurationMs, + }; + currentExecution = failedExecution; + } else { + const completedExecution: CompletedExecution = { + ...currentExecution, + status: "completed", + runStartedAt: flowRunStartedAt, + durationMs: totalFlowDurationMs, + resultArtifact: + currentExecution.artifacts[currentExecution.artifacts.length - 1], + }; + currentExecution = completedExecution; } - currentExecution = { - ...currentExecution, - status: "completed", - runStartedAt: flowRunStartedAt, - durationMs: totalFlowDurationMs, - resultArtifact: - currentExecution.artifacts[currentExecution.artifacts.length - 1], - }; - - // Complete flow execution setExecution(currentExecution); const { blobUrl } = await putExecutionAction(currentExecution); dispatch({ diff --git a/app/(playground)/p/[agentId]/types.ts b/app/(playground)/p/[agentId]/types.ts index 209b6833..a7f064cc 100644 --- a/app/(playground)/p/[agentId]/types.ts +++ b/app/(playground)/p/[agentId]/types.ts @@ -256,15 +256,29 @@ interface RunningStepExecution extends StepExecutionBase { runStartedAt: number; } -interface CompletedStepExecution extends StepExecutionBase { +export interface CompletedStepExecution extends StepExecutionBase { status: "completed"; runStartedAt: number; durationMs: number; } + +export interface FailedStepExecution extends StepExecutionBase { + status: "failed"; + runStartedAt: number; + durationMs: number; + error: string; +} + +interface SkippedStepExecution extends StepExecutionBase { + status: "skipped"; +} + export type StepExecution = | PendingStepExecution | RunningStepExecution - | CompletedStepExecution; + | CompletedStepExecution + | FailedStepExecution + | SkippedStepExecution; export type JobExecutionId = `jbex_${string}`; interface JobExecutionBase { @@ -280,15 +294,25 @@ interface RunningJobExecution extends JobExecutionBase { status: "running"; runStartedAt: number; } -interface CompletedJobExecution extends JobExecutionBase { +export interface CompletedJobExecution extends JobExecutionBase { status: "completed"; runStartedAt: number; durationMs: number; } +export interface FailedJobExecution extends JobExecutionBase { + status: "failed"; + runStartedAt: number; + durationMs: number; +} +export interface SkippedJobExecution extends JobExecutionBase { + status: "skipped"; +} export type JobExecution = | PendingJobExecution | RunningJobExecution - | CompletedJobExecution; + | CompletedJobExecution + | FailedJobExecution + | SkippedJobExecution; export type ExecutionId = `exct_${string}`; interface ExecutionBase { id: ExecutionId; @@ -303,16 +327,22 @@ interface RunningExecution extends ExecutionBase { status: "running"; runStartedAt: number; } -interface CompletedExecution extends ExecutionBase { +export interface CompletedExecution extends ExecutionBase { status: "completed"; runStartedAt: number; durationMs: number; resultArtifact: Artifact; } +export interface FailedExecution extends ExecutionBase { + status: "failed"; + runStartedAt: number; + durationMs: number; +} export type Execution = | PendingExecution | RunningExecution - | CompletedExecution; + | CompletedExecution + | FailedExecution; export interface ExecutionIndex { executionId: ExecutionId;