Skip to content

Commit

Permalink
Merge pull request #255 from toyamarinyon/retrun2
Browse files Browse the repository at this point in the history
Retry step
  • Loading branch information
toyamarinyon authored Dec 18, 2024
2 parents 2b8caf0 + 8ba3df0 commit 1e44109
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 47 deletions.
14 changes: 12 additions & 2 deletions app/(playground)/p/[agentId]/components/viewer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,29 @@ function ExecutionViewer({
<Markdown>{stepExecution.artifact?.object.content}</Markdown>
))}
{stepExecution.artifact?.type === "generatedArtifact" && (
<div className="mt-[10px] flex gap-[12px]">
<div className="mt-[10px] flex gap-[12px] items-center">
<div className="text-[14px] font-bold text-black-70 ">
Generated{" "}
{formatTimestamp.toRelativeTime(
stepExecution.artifact.createdAt,
)}
</div>
<div className="text-black-30">
<div className="text-black-30 flex items-center">
<ClipboardButton
text={stepExecution.artifact.object.content}
sizeClassName="w-[16px] h-[16px]"
/>
</div>
<div className="text-black-30 text-[14px]">
<button
type="button"
onClick={() => {
retryFlowExecution(execution.id, stepExecution.stepId);
}}
>
Retry
</button>
</div>
</div>
)}
</Tabs.Content>
Expand Down
106 changes: 61 additions & 45 deletions app/(playground)/p/[agentId]/contexts/execution.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,50 @@ export function createExecutionSnapshot({
} as ExecutionSnapshot;
}

function buildExecutionsFromSnapshot({
executionSnapshot,
forceRetryStepId,
}: { executionSnapshot: ExecutionSnapshot; forceRetryStepId?: StepId }) {
function buildJobExecutionsFromSnapshot(): JobExecution[] {
return executionSnapshot.execution.jobExecutions.map((jobExecution) => {
const hasForceRetryStep = jobExecution.stepExecutions.some(
(stepExecution) => stepExecution.stepId === forceRetryStepId,
);
if (hasForceRetryStep || jobExecution.status !== "completed") {
return {
...jobExecution,
status: "pending",
stepExecutions: jobExecution.stepExecutions.map((stepExecution) =>
stepExecution.stepId === forceRetryStepId ||
stepExecution.status !== "completed"
? {
...stepExecution,
status: "pending",
}
: stepExecution,
),
};
}
return jobExecution;
});
}
function buildArtifacts() {
const retryStepNodeId = executionSnapshot.flow.jobs
.flatMap((job) => job.steps)
.find((step) => step.id === forceRetryStepId)?.nodeId;
return executionSnapshot.execution.artifacts.filter(
(artifact) => artifact.creatorNodeId !== retryStepNodeId,
);
}
return {
id: createExecutionId(),
flowId: executionSnapshot.flow.id,
status: "pending",
artifacts: buildArtifacts(),
jobExecutions: buildJobExecutionsFromSnapshot(),
} as Execution;
}

// Helper functions for execution state management
const createInitialJobExecutions = (flow: Flow): JobExecution[] => {
return flow.jobs.map((job) => ({
Expand Down Expand Up @@ -230,6 +274,9 @@ const executeStep = async (
step.id === stepExecution.id ? failedStepExecution : step,
),
})),
artifacts: prev.artifacts.filter(
(prevArtifact) => prevArtifact.creatorNodeId !== stepExecution.nodeId,
),
};
});
return failedStepExecution;
Expand Down Expand Up @@ -319,7 +366,10 @@ interface ExecutionContextType {
execution: Execution | null;
execute: (nodeId: NodeId) => Promise<void>;
executeFlow: (flowId: FlowId) => Promise<void>;
retryFlowExecution: (executionId: ExecutionId) => Promise<void>;
retryFlowExecution: (
executionId: ExecutionId,
forceRetryStepId?: StepId,
) => Promise<void>;
}

const ExecutionContext = createContext<ExecutionContextType | undefined>(
Expand Down Expand Up @@ -568,7 +618,7 @@ export function ExecutionProvider({
);

const retryFlowExecution = useCallback(
async (retryExecutionId: ExecutionId) => {
async (retryExecutionId: ExecutionId, forceRetryStepId?: StepId) => {
const executionIndex = graph.executionIndexes.find(
(executionIndex) => executionIndex.executionId === retryExecutionId,
);
Expand All @@ -580,45 +630,15 @@ export function ExecutionProvider({
)) as unknown as ExecutionSnapshot;

await flush();
const executionId = createExecutionId();
const flowRunStartedAt = Date.now();
const flowId = retryExecutionSnapshot.flow.id;
let currentExecution: Execution = {
id: retryExecutionId,
flowId,
runStartedAt: flowRunStartedAt,
let currentExecution = buildExecutionsFromSnapshot({
executionSnapshot: retryExecutionSnapshot,
forceRetryStepId,
});
currentExecution = {
...currentExecution,
status: "running",
artifacts: retryExecutionSnapshot.execution.artifacts,
jobExecutions: retryExecutionSnapshot.execution.jobExecutions.map(
(job) =>
job.status === "completed"
? {
...job,
id: createJobExecutionId(),
stepExecutions: job.stepExecutions.map((step) => ({
...step,
id: createStepExecutionId(),
})),
}
: {
...job,
id: createJobExecutionId(),
status: "pending",
stepExecutions: job.stepExecutions.map((step) =>
step.status === "completed"
? {
...step,
id: createStepExecutionId(),
status: "completed",
}
: {
...step,
id: createStepExecutionId(),
status: "pending",
},
),
},
),
runStartedAt: flowRunStartedAt,
};
setExecution(currentExecution);

Expand Down Expand Up @@ -654,7 +674,7 @@ export function ExecutionProvider({
(stepId) =>
retryStepAction(
executionIndex.blobUrl,
executionId,
currentExecution.id,
stepId,
currentExecution.artifacts,
),
Expand All @@ -675,7 +695,6 @@ export function ExecutionProvider({
const failedExecution: FailedExecution = {
...currentExecution,
status: "failed",
runStartedAt: flowRunStartedAt,
durationMs: totalFlowDurationMs,
};
currentExecution = failedExecution;
Expand All @@ -698,15 +717,12 @@ export function ExecutionProvider({
nodes: retryExecutionSnapshot.nodes,
connections: retryExecutionSnapshot.connections,
});
console.log({
executionSnapshot,
});
const { blobUrl } = await putExecutionAction(executionSnapshot);
dispatch({
type: "addExecutionIndex",
input: {
executionIndex: {
executionId,
executionId: currentExecution.id,
blobUrl,
completedAt: Date.now(),
},
Expand Down

0 comments on commit 1e44109

Please sign in to comment.