Skip to content

Commit

Permalink
feat(execution): Implement error handling and status visualization
Browse files Browse the repository at this point in the history
Add comprehensive error handling and skip logic for execution flow,
including visual indicators for failed and skipped states.

- Add error handling in step and job execution flows
- Implement skip logic for subsequent jobs after failure
- Add visual indicators for failed and skipped states
- Update execution viewer to display error messages
- Export execution interface types for broader usage

BREAKING CHANGE: executeStep and executeJob now return specific completion
types instead of duration numbers
  • Loading branch information
toyamarinyon committed Dec 18, 2024
1 parent e70fc50 commit 5692789
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 82 deletions.
18 changes: 14 additions & 4 deletions app/(playground)/p/[agentId]/components/viewer.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use client";

import * as Tabs from "@radix-ui/react-tabs";
import { CircleAlertIcon, CircleSlashIcon } from "lucide-react";
import { type DetailedHTMLProps, useMemo } from "react";
import { useExecution } from "../contexts/execution";
import { useGraph } from "../contexts/graph";
Expand Down Expand Up @@ -37,6 +38,12 @@ function StepExecutionButton({
{stepExecution.status === "pending" && (
<SpinnerIcon className="w-[18px] h-[18px] stroke-black-30 fill-transparent" />
)}
{stepExecution.status === "failed" && (
<CircleAlertIcon className="w-[18px] h-[18px] stroke-black-30 fill-transparent" />
)}
{stepExecution.status === "skipped" && (
<CircleSlashIcon className="w-[18px] h-[18px] stroke-black-30 fill-transparent" />
)}
{stepExecution.status === "running" && (
<SpinnerIcon className="w-[18px] h-[18px] stroke-black-30 animate-follow-through-spin fill-transparent" />
)}
Expand Down Expand Up @@ -121,11 +128,14 @@ function ExecutionViewer({
{execution.jobExecutions.flatMap((jobExecution) =>
jobExecution.stepExecutions.map((stepExecution) => (
<Tabs.Content key={stepExecution.id} value={stepExecution.id}>
{stepExecution.artifact == null ? (
<p>Pending</p>
) : (
<Markdown>{stepExecution.artifact.object.content}</Markdown>
{stepExecution.status === "pending" && <p>Pending</p>}
{stepExecution.status === "failed" && (
<p>{stepExecution.error}</p>
)}
{stepExecution.status === "running" ||
(stepExecution.status === "completed" && (
<Markdown>{stepExecution.artifact?.object.content}</Markdown>
))}
{stepExecution.artifact?.type === "generatedArtifact" && (
<div className="mt-[10px] flex gap-[12px]">
<div className="text-[14px] font-bold text-black-70 ">
Expand Down
236 changes: 165 additions & 71 deletions app/(playground)/p/[agentId]/contexts/execution.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ import {
import type {
Artifact,
ArtifactId,
CompletedExecution,
CompletedJobExecution,
CompletedStepExecution,
Execution,
ExecutionId,
FailedExecution,
FailedJobExecution,
FailedStepExecution,
Flow,
FlowId,
JobExecution,
NodeId,
SkippedJobExecution,
StepExecution,
StepId,
TextArtifactObject,
Expand Down Expand Up @@ -90,7 +97,7 @@ const executeStep = async (
updateExecution: (
updater: (prev: Execution | null) => Execution | null,
) => void,
): Promise<number> => {
): Promise<CompletedStepExecution | FailedStepExecution> => {
const stepRunStartedAt = Date.now();
const artifactId = createArtifactId();

Expand Down Expand Up @@ -124,64 +131,98 @@ const executeStep = async (
};
});

// Execute step and process stream
const stream = await executeStepAction(
flowId,
executionId,
stepExecution.stepId,
artifacts,
);
const finalArtifact = await processStreamContent(stream, (content) => {
try {
// Execute step and process stream
const stream = await executeStepAction(
flowId,
executionId,
stepExecution.stepId,
artifacts,
);
const finalArtifact = await processStreamContent(stream, (content) => {
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;
return {
...prev,
artifacts: prev.artifacts.map((artifact) =>
artifact.id === artifactId
? { ...artifact, object: content }
: artifact,
),
};
});
});

// Complete step execution
const stepDurationMs = Date.now() - stepRunStartedAt;

const successStepExecution: CompletedStepExecution = {
...stepExecution,
status: "completed",
runStartedAt: stepRunStartedAt,
durationMs: stepDurationMs,
};
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;

return {
...prev,
jobExecutions: prev.jobExecutions.map((job) => ({
...job,
stepExecutions: job.stepExecutions.map((step) =>
step.id === stepExecution.id ? successStepExecution : step,
),
})),
artifacts: prev.artifacts.map((artifact) =>
artifact.id === artifactId
? { ...artifact, object: content }
? {
id: artifactId,
type: "generatedArtifact",
creatorNodeId: stepExecution.nodeId,
createdAt: Date.now(),
object: finalArtifact,
}
: artifact,
),
};
});
});

// Complete step execution
const stepDurationMs = Date.now() - stepRunStartedAt;
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;

return {
...prev,
jobExecutions: prev.jobExecutions.map((job) => ({
...job,
stepExecutions: job.stepExecutions.map((step) =>
step.id === stepExecution.id
? {
...step,
status: "completed",
runStartedAt: stepRunStartedAt,
durationMs: stepDurationMs,
}
: step,
),
})),
artifacts: prev.artifacts.map((artifact) =>
artifact.id === artifactId
? {
id: artifactId,
type: "generatedArtifact",
creatorNodeId: stepExecution.nodeId,
createdAt: Date.now(),
object: finalArtifact,
}
: artifact,
),
return successStepExecution;
} catch (unknownError) {
const error = toErrorWithMessage(unknownError).message;
const stepDurationMs = Date.now() - stepRunStartedAt;
const failedStepExecution: FailedStepExecution = {
...stepExecution,
status: "failed",
runStartedAt: stepRunStartedAt,
durationMs: stepDurationMs,
error,
};
});
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;

return stepDurationMs;
return {
...prev,
jobExecutions: prev.jobExecutions.map((job) => ({
...job,
stepExecutions: job.stepExecutions.map((step) =>
step.id === stepExecution.id ? failedStepExecution : step,
),
})),
};
});
return failedStepExecution;
}
};

class ExecuteJobError extends Error {
executeJob: FailedStepExecution;
constructor(executeJob: FailedStepExecution) {
super(executeJob.error);
this.name = "ExecuteJobError";
this.executeJob = executeJob;
}
}
const executeJob = async (
flowId: FlowId,
executionId: ExecutionId,
Expand All @@ -191,7 +232,7 @@ const executeJob = async (
updateExecution: (
updater: (prev: Execution | null) => Execution | null,
) => void,
): Promise<number> => {
): Promise<CompletedJobExecution | FailedJobExecution> => {
const jobRunStartedAt = Date.now();

// Start job execution
Expand All @@ -208,7 +249,7 @@ const executeJob = async (
});

// Execute all steps in parallel
const stepDurations = await Promise.all(
const stepExecutions = await Promise.all(
jobExecution.stepExecutions.map((step) =>
executeStep(
flowId,
Expand All @@ -221,30 +262,52 @@ const executeJob = async (
),
);

const jobDurationMs = stepDurations.reduce(
(sum, duration) => sum + duration,
const jobDurationMs = stepExecutions.reduce(
(sum, duration) => sum + duration.durationMs,
0,
);
const allStepsCompleted = stepExecutions.every(
(step) => step.status === "completed",
);

if (allStepsCompleted) {
// Complete job execution
const completedJobExecution: CompletedJobExecution = {
...jobExecution,
stepExecutions,
status: "completed",
runStartedAt: jobRunStartedAt,
durationMs: jobDurationMs,
};
updateExecution((prev) => {
if (!prev) return null;
return {
...prev,
jobExecutions: prev.jobExecutions.map((job) =>
job.id === jobExecution.id ? completedJobExecution : job,
),
};
});
return completedJobExecution;
}

// Complete job execution
const failedJobExecution: FailedJobExecution = {
...jobExecution,
stepExecutions,
status: "failed",
runStartedAt: jobRunStartedAt,
durationMs: jobDurationMs,
};
updateExecution((prev) => {
if (!prev) return null;
return {
...prev,
jobExecutions: prev.jobExecutions.map((job) =>
job.id === jobExecution.id
? {
...job,
status: "completed",
runStartedAt: jobRunStartedAt,
durationMs: jobDurationMs,
}
: job,
job.id === jobExecution.id ? failedJobExecution : job,
),
};
});

return jobDurationMs;
return failedJobExecution;
};

interface ExecutionContextType {
Expand Down Expand Up @@ -394,10 +457,29 @@ export function ExecutionProvider({
setExecution(currentExecution);

let totalFlowDurationMs = 0;
let hasFailed = false;

// Execute jobs sequentially
for (const jobExecution of jobExecutions) {
const jobDurationMs = await executeJob(
if (hasFailed) {
const skippedJob: SkippedJobExecution = {
...jobExecution,
status: "skipped",
stepExecutions: jobExecution.stepExecutions.map((step) => ({
...step,
status: "skipped",
})),
};
currentExecution = {
...currentExecution,
jobExecutions: currentExecution.jobExecutions.map((job) =>
job.id === jobExecution.id ? skippedJob : job,
),
};
setExecution(currentExecution);
continue;
}
const executedJob = await executeJob(
flowId,
executionId,
jobExecution,
Expand All @@ -411,19 +493,31 @@ export function ExecutionProvider({
}
},
);
totalFlowDurationMs += jobDurationMs;
totalFlowDurationMs += executedJob.durationMs;
if (executedJob.status === "failed") {
hasFailed = true;
}
}
if (hasFailed) {
const failedExecution: FailedExecution = {
...currentExecution,
status: "failed",
runStartedAt: flowRunStartedAt,
durationMs: totalFlowDurationMs,
};
currentExecution = failedExecution;
} else {
const completedExecution: CompletedExecution = {
...currentExecution,
status: "completed",
runStartedAt: flowRunStartedAt,
durationMs: totalFlowDurationMs,
resultArtifact:
currentExecution.artifacts[currentExecution.artifacts.length - 1],
};
currentExecution = completedExecution;
}

currentExecution = {
...currentExecution,
status: "completed",
runStartedAt: flowRunStartedAt,
durationMs: totalFlowDurationMs,
resultArtifact:
currentExecution.artifacts[currentExecution.artifacts.length - 1],
};

// Complete flow execution
setExecution(currentExecution);
const { blobUrl } = await putExecutionAction(currentExecution);
dispatch({
Expand Down
Loading

0 comments on commit 5692789

Please sign in to comment.