Skip to content

Commit

Permalink
Merge pull request #250 from toyamarinyon/display-failed-step
Browse files Browse the repository at this point in the history
[feat] Add execution error handling and failure states
  • Loading branch information
toyamarinyon authored Dec 18, 2024
2 parents cfb83fd + 5692789 commit d4d45c6
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 81 deletions.
18 changes: 14 additions & 4 deletions app/(playground)/p/[agentId]/components/viewer.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use client";

import * as Tabs from "@radix-ui/react-tabs";
import { CircleAlertIcon, CircleSlashIcon } from "lucide-react";
import { type DetailedHTMLProps, useMemo } from "react";
import { useExecution } from "../contexts/execution";
import { useGraph } from "../contexts/graph";
Expand Down Expand Up @@ -37,6 +38,12 @@ function StepExecutionButton({
{stepExecution.status === "pending" && (
<SpinnerIcon className="w-[18px] h-[18px] stroke-black-30 fill-transparent" />
)}
{stepExecution.status === "failed" && (
<CircleAlertIcon className="w-[18px] h-[18px] stroke-black-30 fill-transparent" />
)}
{stepExecution.status === "skipped" && (
<CircleSlashIcon className="w-[18px] h-[18px] stroke-black-30 fill-transparent" />
)}
{stepExecution.status === "running" && (
<SpinnerIcon className="w-[18px] h-[18px] stroke-black-30 animate-follow-through-spin fill-transparent" />
)}
Expand Down Expand Up @@ -121,11 +128,14 @@ function ExecutionViewer({
{execution.jobExecutions.flatMap((jobExecution) =>
jobExecution.stepExecutions.map((stepExecution) => (
<Tabs.Content key={stepExecution.id} value={stepExecution.id}>
{stepExecution.artifact == null ? (
<p>Pending</p>
) : (
<Markdown>{stepExecution.artifact.object.content}</Markdown>
{stepExecution.status === "pending" && <p>Pending</p>}
{stepExecution.status === "failed" && (
<p>{stepExecution.error}</p>
)}
{stepExecution.status === "running" ||
(stepExecution.status === "completed" && (
<Markdown>{stepExecution.artifact?.object.content}</Markdown>
))}
{stepExecution.artifact?.type === "generatedArtifact" && (
<div className="mt-[10px] flex gap-[12px]">
<div className="text-[14px] font-bold text-black-70 ">
Expand Down
236 changes: 165 additions & 71 deletions app/(playground)/p/[agentId]/contexts/execution.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ import {
import type {
Artifact,
ArtifactId,
CompletedExecution,
CompletedJobExecution,
CompletedStepExecution,
Execution,
ExecutionId,
FailedExecution,
FailedJobExecution,
FailedStepExecution,
Flow,
FlowId,
JobExecution,
NodeId,
SkippedJobExecution,
StepExecution,
StepId,
TextArtifactObject,
Expand Down Expand Up @@ -90,7 +97,7 @@ const executeStep = async (
updateExecution: (
updater: (prev: Execution | null) => Execution | null,
) => void,
): Promise<number> => {
): Promise<CompletedStepExecution | FailedStepExecution> => {
const stepRunStartedAt = Date.now();
const artifactId = createArtifactId();

Expand Down Expand Up @@ -124,64 +131,98 @@ const executeStep = async (
};
});

// Execute step and process stream
const stream = await executeStepAction(
flowId,
executionId,
stepExecution.stepId,
artifacts,
);
const finalArtifact = await processStreamContent(stream, (content) => {
try {
// Execute step and process stream
const stream = await executeStepAction(
flowId,
executionId,
stepExecution.stepId,
artifacts,
);
const finalArtifact = await processStreamContent(stream, (content) => {
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;
return {
...prev,
artifacts: prev.artifacts.map((artifact) =>
artifact.id === artifactId
? { ...artifact, object: content }
: artifact,
),
};
});
});

// Complete step execution
const stepDurationMs = Date.now() - stepRunStartedAt;

const successStepExecution: CompletedStepExecution = {
...stepExecution,
status: "completed",
runStartedAt: stepRunStartedAt,
durationMs: stepDurationMs,
};
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;

return {
...prev,
jobExecutions: prev.jobExecutions.map((job) => ({
...job,
stepExecutions: job.stepExecutions.map((step) =>
step.id === stepExecution.id ? successStepExecution : step,
),
})),
artifacts: prev.artifacts.map((artifact) =>
artifact.id === artifactId
? { ...artifact, object: content }
? {
id: artifactId,
type: "generatedArtifact",
creatorNodeId: stepExecution.nodeId,
createdAt: Date.now(),
object: finalArtifact,
}
: artifact,
),
};
});
});

// Complete step execution
const stepDurationMs = Date.now() - stepRunStartedAt;
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;

return {
...prev,
jobExecutions: prev.jobExecutions.map((job) => ({
...job,
stepExecutions: job.stepExecutions.map((step) =>
step.id === stepExecution.id
? {
...step,
status: "completed",
runStartedAt: stepRunStartedAt,
durationMs: stepDurationMs,
}
: step,
),
})),
artifacts: prev.artifacts.map((artifact) =>
artifact.id === artifactId
? {
id: artifactId,
type: "generatedArtifact",
creatorNodeId: stepExecution.nodeId,
createdAt: Date.now(),
object: finalArtifact,
}
: artifact,
),
return successStepExecution;
} catch (unknownError) {
const error = toErrorWithMessage(unknownError).message;
const stepDurationMs = Date.now() - stepRunStartedAt;
const failedStepExecution: FailedStepExecution = {
...stepExecution,
status: "failed",
runStartedAt: stepRunStartedAt,
durationMs: stepDurationMs,
error,
};
});
updateExecution((prev) => {
if (!prev || prev.status !== "running") return null;

return stepDurationMs;
return {
...prev,
jobExecutions: prev.jobExecutions.map((job) => ({
...job,
stepExecutions: job.stepExecutions.map((step) =>
step.id === stepExecution.id ? failedStepExecution : step,
),
})),
};
});
return failedStepExecution;
}
};

class ExecuteJobError extends Error {
executeJob: FailedStepExecution;
constructor(executeJob: FailedStepExecution) {
super(executeJob.error);
this.name = "ExecuteJobError";
this.executeJob = executeJob;
}
}
const executeJob = async (
flowId: FlowId,
executionId: ExecutionId,
Expand All @@ -191,7 +232,7 @@ const executeJob = async (
updateExecution: (
updater: (prev: Execution | null) => Execution | null,
) => void,
): Promise<number> => {
): Promise<CompletedJobExecution | FailedJobExecution> => {
const jobRunStartedAt = Date.now();

// Start job execution
Expand All @@ -208,7 +249,7 @@ const executeJob = async (
});

// Execute all steps in parallel
const stepDurations = await Promise.all(
const stepExecutions = await Promise.all(
jobExecution.stepExecutions.map((step) =>
executeStep(
flowId,
Expand All @@ -221,30 +262,52 @@ const executeJob = async (
),
);

const jobDurationMs = stepDurations.reduce(
(sum, duration) => sum + duration,
const jobDurationMs = stepExecutions.reduce(
(sum, duration) => sum + duration.durationMs,
0,
);
const allStepsCompleted = stepExecutions.every(
(step) => step.status === "completed",
);

if (allStepsCompleted) {
// Complete job execution
const completedJobExecution: CompletedJobExecution = {
...jobExecution,
stepExecutions,
status: "completed",
runStartedAt: jobRunStartedAt,
durationMs: jobDurationMs,
};
updateExecution((prev) => {
if (!prev) return null;
return {
...prev,
jobExecutions: prev.jobExecutions.map((job) =>
job.id === jobExecution.id ? completedJobExecution : job,
),
};
});
return completedJobExecution;
}

// Complete job execution
const failedJobExecution: FailedJobExecution = {
...jobExecution,
stepExecutions,
status: "failed",
runStartedAt: jobRunStartedAt,
durationMs: jobDurationMs,
};
updateExecution((prev) => {
if (!prev) return null;
return {
...prev,
jobExecutions: prev.jobExecutions.map((job) =>
job.id === jobExecution.id
? {
...job,
status: "completed",
runStartedAt: jobRunStartedAt,
durationMs: jobDurationMs,
}
: job,
job.id === jobExecution.id ? failedJobExecution : job,
),
};
});

return jobDurationMs;
return failedJobExecution;
};

interface ExecutionContextType {
Expand Down Expand Up @@ -394,10 +457,29 @@ export function ExecutionProvider({
setExecution(currentExecution);

let totalFlowDurationMs = 0;
let hasFailed = false;

// Execute jobs sequentially
for (const jobExecution of jobExecutions) {
const jobDurationMs = await executeJob(
if (hasFailed) {
const skippedJob: SkippedJobExecution = {
...jobExecution,
status: "skipped",
stepExecutions: jobExecution.stepExecutions.map((step) => ({
...step,
status: "skipped",
})),
};
currentExecution = {
...currentExecution,
jobExecutions: currentExecution.jobExecutions.map((job) =>
job.id === jobExecution.id ? skippedJob : job,
),
};
setExecution(currentExecution);
continue;
}
const executedJob = await executeJob(
flowId,
executionId,
jobExecution,
Expand All @@ -411,19 +493,31 @@ export function ExecutionProvider({
}
},
);
totalFlowDurationMs += jobDurationMs;
totalFlowDurationMs += executedJob.durationMs;
if (executedJob.status === "failed") {
hasFailed = true;
}
}
if (hasFailed) {
const failedExecution: FailedExecution = {
...currentExecution,
status: "failed",
runStartedAt: flowRunStartedAt,
durationMs: totalFlowDurationMs,
};
currentExecution = failedExecution;
} else {
const completedExecution: CompletedExecution = {
...currentExecution,
status: "completed",
runStartedAt: flowRunStartedAt,
durationMs: totalFlowDurationMs,
resultArtifact:
currentExecution.artifacts[currentExecution.artifacts.length - 1],
};
currentExecution = completedExecution;
}

currentExecution = {
...currentExecution,
status: "completed",
runStartedAt: flowRunStartedAt,
durationMs: totalFlowDurationMs,
resultArtifact:
currentExecution.artifacts[currentExecution.artifacts.length - 1],
};

// Complete flow execution
setExecution(currentExecution);
const { blobUrl } = await putExecutionAction(currentExecution);
dispatch({
Expand Down
Loading

0 comments on commit d4d45c6

Please sign in to comment.