From b18bdd947c4253d6a5ff5645ed3a67b63cb2c8e0 Mon Sep 17 00:00:00 2001 From: satoshi toyama Date: Wed, 18 Dec 2024 21:24:21 +0900 Subject: [PATCH 1/4] refactor(execution): Extract text generation logic to shared function Consolidate duplicate code between executeStep and retryStep into a shared generateText function. Created ExecutionContext interface to standardize data passing between functions. Key changes: - Extract core text generation logic into standalone generateText function - Add ExecutionContext interface to encapsulate execution state - Remove duplicated node resolution and artifact handling code - Maintain existing functionality while reducing code complexity --- app/(playground)/p/[agentId]/lib/execution.ts | 222 ++++++------------ 1 file changed, 70 insertions(+), 152 deletions(-) diff --git a/app/(playground)/p/[agentId]/lib/execution.ts b/app/(playground)/p/[agentId]/lib/execution.ts index 5b842b23..4b5f178f 100644 --- a/app/(playground)/p/[agentId]/lib/execution.ts +++ b/app/(playground)/p/[agentId]/lib/execution.ts @@ -229,48 +229,40 @@ function resolveRequirement( } } -export async function executeStep( - agentId: AgentId, - flowId: FlowId, - executionId: ExecutionId, - stepId: StepId, - artifacts: Artifact[], -) { +interface ExecutionContext { + executionId: ExecutionId; + stepId: StepId; + artifacts: Artifact[]; + nodes: Node[]; + connections: ExecutionSnapshot["connections"]; + flow: ExecutionSnapshot["flow"]; +} + +async function generateText(context: ExecutionContext) { const startTime = Date.now(); const lf = new Langfuse(); const trace = lf.trace({ - sessionId: executionId, + sessionId: context.executionId, }); - const agent = await db.query.agents.findFirst({ - where: (agents, { eq }) => eq(agents.id, agentId), - }); - if (agent === undefined || agent.graphUrl === null) { - throw new Error(`Agent with id ${agentId} not found`); - } - const graph = await fetch(agent.graphUrl).then( - (res) => res.json() as unknown as Graph, - ); - const flow = graph.flows.find((flow) => flow.id === flowId); - if (flow === undefined) { - throw new Error(`Flow with id ${flowId} not found`); - } - const step = flow.jobs + const step = context.flow.jobs .flatMap((job) => job.steps) - .find((step) => step.id === stepId); + .find((step) => step.id === context.stepId); + if (step === undefined) { - throw new Error(`Step with id ${stepId} not found`); + throw new Error(`Step with id ${context.stepId} not found`); } - const node = graph.nodes.find((node) => node.id === step.nodeId); + + const node = context.nodes.find((node) => node.id === step.nodeId); if (node === undefined) { throw new Error("Node not found"); } function nodeResolver(nodeHandleId: NodeHandleId) { - const connection = graph.connections.find( + const connection = context.connections.find( (connection) => connection.targetNodeHandleId === nodeHandleId, ); - const node = graph.nodes.find( + const node = context.nodes.find( (node) => node.id === connection?.sourceNodeId, ); if (node === undefined) { @@ -278,8 +270,9 @@ export async function executeStep( } return node; } + function artifactResolver(artifactCreatorNodeId: NodeId) { - const generatedArtifact = artifacts.find( + const generatedArtifact = context.artifacts.find( (artifact) => artifact.creatorNodeId === artifactCreatorNodeId, ); if ( @@ -290,7 +283,7 @@ export async function executeStep( } return generatedArtifact; } - // The main switch statement handles the different types of nodes + switch (node.content.type) { case "textGeneration": { const actionSources = await resolveSources(node.content.sources, { @@ -327,6 +320,7 @@ export async function executeStep( temperature: node.content.temperature, }, }); + (async () => { const { partialObjectStream, object, usage } = streamObject({ model, @@ -349,6 +343,7 @@ export async function executeStep( }, }); } + const result = await object; await withTokenMeasurement( createLogger(node.content.type), @@ -370,6 +365,7 @@ export async function executeStep( }); stream.error(error); }); + return stream.value; } default: @@ -377,138 +373,60 @@ export async function executeStep( } } -export async function retryStep( - retryExecutionSnapshotUrl: string, +export async function executeStep( + agentId: AgentId, + flowId: FlowId, executionId: ExecutionId, stepId: StepId, artifacts: Artifact[], ) { - const startTime = Date.now(); - const lf = new Langfuse(); - const trace = lf.trace({ - sessionId: executionId, + const agent = await db.query.agents.findFirst({ + where: (agents, { eq }) => eq(agents.id, agentId), }); - const executionSnapshot = (await fetch(retryExecutionSnapshotUrl).then( - (res) => res.json(), - )) as unknown as ExecutionSnapshot; - const step = executionSnapshot.flow.jobs - .flatMap((job) => job.steps) - .find((step) => step.id === stepId); - if (step === undefined) { - throw new Error(`Step with id ${stepId} not found`); - } - const node = executionSnapshot.nodes.find((node) => node.id === step.nodeId); - if (node === undefined) { - throw new Error("Node not found"); - } - function nodeResolver(nodeHandleId: NodeHandleId) { - const connection = executionSnapshot.connections.find( - (connection) => connection.targetNodeHandleId === nodeHandleId, - ); - const node = executionSnapshot.nodes.find( - (node) => node.id === connection?.sourceNodeId, - ); - if (node === undefined) { - return null; - } - return node; + if (agent === undefined || agent.graphUrl === null) { + throw new Error(`Agent with id ${agentId} not found`); } - function artifactResolver(artifactCreatorNodeId: NodeId) { - const generatedArtifact = artifacts.find( - (artifact) => artifact.creatorNodeId === artifactCreatorNodeId, - ); - if ( - generatedArtifact === undefined || - generatedArtifact.type !== "generatedArtifact" - ) { - return null; - } - return generatedArtifact; + + const graph = await fetch(agent.graphUrl).then( + (res) => res.json() as unknown as Graph, + ); + + const flow = graph.flows.find((flow) => flow.id === flowId); + if (flow === undefined) { + throw new Error(`Flow with id ${flowId} not found`); } - // The main switch statement handles the different types of nodes - switch (node.content.type) { - case "textGeneration": { - const actionSources = await resolveSources(node.content.sources, { - nodeResolver, - artifactResolver, - }); - const requirement = resolveRequirement(node.content.requirement ?? null, { - nodeResolver, - artifactResolver, - }); - const model = resolveLanguageModel(node.content.llm); - const promptTemplate = HandleBars.compile( - node.content.system ?? textGenerationPrompt, - ); - const prompt = promptTemplate({ - instruction: node.content.instruction, - requirement, - sources: actionSources, - }); - const topP = node.content.topP; - const temperature = node.content.temperature; - const stream = createStreamableValue(); - trace.update({ - input: prompt, - }); + const context: ExecutionContext = { + executionId, + stepId, + artifacts, + nodes: graph.nodes, + connections: graph.connections, + flow, + }; - const generationTracer = trace.generation({ - name: "generate-text", - input: prompt, - model: langfuseModel(node.content.llm), - modelParameters: { - topP: node.content.topP, - temperature: node.content.temperature, - }, - }); - (async () => { - const { partialObjectStream, object, usage } = streamObject({ - model, - prompt, - schema: jsonSchema>( - toJsonSchema(artifactSchema), - ), - topP, - temperature, - }); + return generateText(context); +} - for await (const partialObject of partialObjectStream) { - stream.update({ - type: "text", - title: partialObject.title ?? "", - content: partialObject.content ?? "", - messages: { - plan: partialObject.plan ?? "", - description: partialObject.description ?? "", - }, - }); - } - const result = await object; - await withTokenMeasurement( - createLogger(node.content.type), - async () => { - generationTracer.end({ output: result }); - trace.update({ output: result }); - await lf.shutdownAsync(); - waitForTelemetryExport(); - return { usage: await usage }; - }, - model, - startTime, - ); - stream.done(); - })().catch((error) => { - generationTracer.update({ - level: "ERROR", - statusMessage: toErrorWithMessage(error).message, - }); - stream.error(error); - }); - return stream.value; - } - default: - throw new Error("Invalid node type"); - } +export async function retryStep( + retryExecutionSnapshotUrl: string, + executionId: ExecutionId, + stepId: StepId, + artifacts: Artifact[], +) { + const executionSnapshot = await fetch(retryExecutionSnapshotUrl).then( + (res) => res.json() as unknown as ExecutionSnapshot, + ); + + const context: ExecutionContext = { + executionId, + stepId, + artifacts, + nodes: executionSnapshot.nodes, + connections: executionSnapshot.connections, + flow: executionSnapshot.flow, + }; + + return generateText(context); } From 2a19a65814521150c4f070c383d6f947e82ead93 Mon Sep 17 00:00:00 2001 From: satoshi toyama Date: Wed, 18 Dec 2024 21:40:52 +0900 Subject: [PATCH 2/4] refactor(execution): Extract shared flow execution logic Consolidate duplicate flow execution code into reusable performFlowExecution function. Simplifies both new executions and retries through a common path. Key changes: - Create ExecuteFlowParams interface to standardize execution parameters - Extract core flow execution logic into performFlowExecution - Unify execution state management between new and retry flows - Reduce code duplication while maintaining execution behaviors - Simplify job execution status handling --- .../p/[agentId]/contexts/execution.tsx | 244 +++++++----------- 1 file changed, 98 insertions(+), 146 deletions(-) diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index f0d9c160..2c7c524b 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -504,30 +504,31 @@ export function ExecutionProvider({ [executeAction, dispatch, flush, setTab, addToast], ); - const executeFlow = useCallback( - async (flowId: FlowId) => { - const flow = graph.flows.find((flow) => flow.id === flowId); - if (!flow) throw new Error("Flow not found"); - - setPlaygroundMode("viewer"); - await flush(); - const executionId = createExecutionId(); - const jobExecutions = createInitialJobExecutions(flow); - const flowRunStartedAt = Date.now(); - - // Initialize flow execution - let currentExecution = createInitialExecution( - flowId, - executionId, - jobExecutions, - ); - setExecution(currentExecution); - + interface ExecuteFlowParams { + flowId: FlowId; + initialExecution: Execution; + flow: Flow; + nodes: Node[]; + connections: Connection[]; + executeStepCallback: ( + stepId: StepId, + ) => Promise>; + } + const performFlowExecution = useCallback( + async ({ + flowId, + initialExecution, + flow, + nodes, + connections, + executeStepCallback, + }: ExecuteFlowParams) => { + let currentExecution = initialExecution; let totalFlowDurationMs = 0; let hasFailed = false; // Execute jobs sequentially - for (const jobExecution of jobExecutions) { + for (const jobExecution of currentExecution.jobExecutions) { if (hasFailed) { const skippedJob: SkippedJobExecution = { ...jobExecution, @@ -546,15 +547,15 @@ export function ExecutionProvider({ setExecution(currentExecution); continue; } + + // Skip completed jobs for retry flows + if (jobExecution.status === "completed") { + continue; + } + const executedJob = await executeJob( jobExecution, - (stepExecutionId) => - executeStepAction( - flowId, - executionId, - stepExecutionId, - currentExecution.artifacts, - ), + executeStepCallback, (updater) => { const updated = updater(currentExecution); if (updated) { @@ -563,58 +564,90 @@ export function ExecutionProvider({ } }, ); + totalFlowDurationMs += executedJob.durationMs; if (executedJob.status === "failed") { hasFailed = true; } } + + // Update final execution state if (hasFailed) { - const failedExecution: FailedExecution = { + currentExecution = { ...currentExecution, status: "failed", - runStartedAt: flowRunStartedAt, durationMs: totalFlowDurationMs, - }; - currentExecution = failedExecution; + } as FailedExecution; } else { - const completedExecution: CompletedExecution = { + currentExecution = { ...currentExecution, status: "completed", - runStartedAt: flowRunStartedAt, durationMs: totalFlowDurationMs, resultArtifact: currentExecution.artifacts[currentExecution.artifacts.length - 1], - }; - currentExecution = completedExecution; + } as CompletedExecution; } - setExecution(currentExecution); + // Create and store execution snapshot const executionSnapshot = createExecutionSnapshot({ flow, execution: currentExecution, - nodes: graph.nodes, - connections: graph.connections, + nodes, + connections, }); + const { blobUrl } = await putExecutionAction(executionSnapshot); dispatch({ type: "addExecutionIndex", input: { executionIndex: { - executionId, + executionId: currentExecution.id, blobUrl, completedAt: Date.now(), }, }, }); + + return currentExecution; }, - [ - setPlaygroundMode, - executeStepAction, - putExecutionAction, - dispatch, - flush, - graph, - ], + [dispatch, putExecutionAction], + ); + + const executeFlow = useCallback( + async (flowId: FlowId) => { + const flow = graph.flows.find((flow) => flow.id === flowId); + if (!flow) throw new Error("Flow not found"); + + setPlaygroundMode("viewer"); + await flush(); + const executionId = createExecutionId(); + const jobExecutions = createInitialJobExecutions(flow); + const flowRunStartedAt = Date.now(); + + // Initialize flow execution + const initialExecution = { + ...createInitialExecution(flowId, executionId, jobExecutions), + runStartedAt: flowRunStartedAt, + }; + setExecution(initialExecution); + + const finalExecution = await performFlowExecution({ + flowId, + initialExecution, + flow, + nodes: graph.nodes, + connections: graph.connections, + executeStepCallback: (stepId) => + executeStepAction( + flowId, + executionId, + stepId, + initialExecution.artifacts, + ), + }); + setExecution(finalExecution); + }, + [setPlaygroundMode, executeStepAction, flush, graph, performFlowExecution], ); const retryFlowExecution = useCallback( @@ -631,112 +664,31 @@ export function ExecutionProvider({ await flush(); const flowRunStartedAt = Date.now(); - let currentExecution = buildExecutionsFromSnapshot({ - executionSnapshot: retryExecutionSnapshot, - forceRetryStepId, - }); - currentExecution = { - ...currentExecution, - status: "running", + const initialExecution = { + ...buildExecutionsFromSnapshot({ + executionSnapshot: retryExecutionSnapshot, + forceRetryStepId, + }), + status: "running" as const, runStartedAt: flowRunStartedAt, }; - setExecution(currentExecution); - - let totalFlowDurationMs = 0; - let hasFailed = false; - - // Execute jobs sequentially - for (const jobExecution of currentExecution.jobExecutions) { - if (hasFailed) { - const skippedJob: SkippedJobExecution = { - ...jobExecution, - status: "skipped", - stepExecutions: jobExecution.stepExecutions.map((step) => ({ - ...step, - status: "skipped", - })), - }; - currentExecution = { - ...currentExecution, - jobExecutions: currentExecution.jobExecutions.map((job) => - job.id === jobExecution.id ? skippedJob : job, - ), - }; - setExecution(currentExecution); - continue; - } - // if retrying, skip completed jobs - if (jobExecution.status === "completed") { - continue; - } - const executedJob = await executeJob( - jobExecution, - (stepId) => - retryStepAction( - executionIndex.blobUrl, - currentExecution.id, - stepId, - currentExecution.artifacts, - ), - (updater) => { - const updated = updater(currentExecution); - if (updated) { - currentExecution = updated; - setExecution(updated); - } - }, - ); - totalFlowDurationMs += executedJob.durationMs; - if (executedJob.status === "failed") { - hasFailed = true; - } - } - if (hasFailed) { - const failedExecution: FailedExecution = { - ...currentExecution, - status: "failed", - durationMs: totalFlowDurationMs, - }; - currentExecution = failedExecution; - } else { - const completedExecution: CompletedExecution = { - ...currentExecution, - status: "completed", - runStartedAt: flowRunStartedAt, - durationMs: totalFlowDurationMs, - resultArtifact: - currentExecution.artifacts[currentExecution.artifacts.length - 1], - }; - currentExecution = completedExecution; - } - - setExecution(currentExecution); - const executionSnapshot = createExecutionSnapshot({ + const finalExecution = await performFlowExecution({ + flowId: retryExecutionSnapshot.flow.id, + initialExecution, flow: retryExecutionSnapshot.flow, - execution: currentExecution, nodes: retryExecutionSnapshot.nodes, connections: retryExecutionSnapshot.connections, + executeStepCallback: (stepId) => + retryStepAction( + executionIndex.blobUrl, + initialExecution.id, + stepId, + initialExecution.artifacts, + ), }); - const { blobUrl } = await putExecutionAction(executionSnapshot); - dispatch({ - type: "addExecutionIndex", - input: { - executionIndex: { - executionId: currentExecution.id, - blobUrl, - completedAt: Date.now(), - }, - }, - }); + setExecution(finalExecution); }, - [ - graph.executionIndexes, - dispatch, - graph, - flush, - putExecutionAction, - retryStepAction, - ], + [graph.executionIndexes, flush, retryStepAction, performFlowExecution], ); return ( From 8a5a67675699e9e2a2a50cb7f37d200976ba0bfc Mon Sep 17 00:00:00 2001 From: satoshi toyama Date: Wed, 18 Dec 2024 21:46:53 +0900 Subject: [PATCH 3/4] refactor(execution): Simplify execution state initialization Split execution snapshot building logic into focused utility functions and cleanup initialization code for both new and retry executions. Key changes: - Split buildExecutionsFromSnapshot into separate utility functions - Remove createInitialExecution in favor of direct object creation - Standardize execution state initialization between new and retry flows - Improve type safety with explicit Execution type annotations The refactoring improves clarity by: - Making execution state construction more explicit - Reducing complexity of snapshot handling - Keeping utility functions focused on single responsibilities --- .../p/[agentId]/contexts/execution.tsx | 113 ++++++++---------- 1 file changed, 52 insertions(+), 61 deletions(-) diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 2c7c524b..26a51825 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -64,48 +64,42 @@ export function createExecutionSnapshot({ } as ExecutionSnapshot; } -function buildExecutionsFromSnapshot({ - executionSnapshot, - forceRetryStepId, -}: { executionSnapshot: ExecutionSnapshot; forceRetryStepId?: StepId }) { - function buildJobExecutionsFromSnapshot(): JobExecution[] { - return executionSnapshot.execution.jobExecutions.map((jobExecution) => { - const hasForceRetryStep = jobExecution.stepExecutions.some( - (stepExecution) => stepExecution.stepId === forceRetryStepId, - ); - if (hasForceRetryStep || jobExecution.status !== "completed") { - return { - ...jobExecution, - status: "pending", - stepExecutions: jobExecution.stepExecutions.map((stepExecution) => - stepExecution.stepId === forceRetryStepId || - stepExecution.status !== "completed" - ? { - ...stepExecution, - status: "pending", - } - : stepExecution, - ), - }; - } - return jobExecution; - }); - } - function buildArtifacts() { - const retryStepNodeId = executionSnapshot.flow.jobs - .flatMap((job) => job.steps) - .find((step) => step.id === forceRetryStepId)?.nodeId; - return executionSnapshot.execution.artifacts.filter( - (artifact) => artifact.creatorNodeId !== retryStepNodeId, +function buildJobExecutionsFromSnapshot( + executionSnapshot: ExecutionSnapshot, + forceRetryStepId?: StepId, +): JobExecution[] { + return executionSnapshot.execution.jobExecutions.map((jobExecution) => { + const hasForceRetryStep = jobExecution.stepExecutions.some( + (stepExecution) => stepExecution.stepId === forceRetryStepId, ); - } - return { - id: createExecutionId(), - flowId: executionSnapshot.flow.id, - status: "pending", - artifacts: buildArtifacts(), - jobExecutions: buildJobExecutionsFromSnapshot(), - } as Execution; + if (hasForceRetryStep || jobExecution.status !== "completed") { + return { + ...jobExecution, + status: "pending", + stepExecutions: jobExecution.stepExecutions.map((stepExecution) => + stepExecution.stepId === forceRetryStepId || + stepExecution.status !== "completed" + ? { + ...stepExecution, + status: "pending", + } + : stepExecution, + ), + }; + } + return jobExecution; + }); +} +function buildArtifactsFromSnapshot( + executionSnapshot: ExecutionSnapshot, + forceRetryStepId?: StepId, +) { + const retryStepNodeId = executionSnapshot.flow.jobs + .flatMap((job) => job.steps) + .find((step) => step.id === forceRetryStepId)?.nodeId; + return executionSnapshot.execution.artifacts.filter( + (artifact) => artifact.creatorNodeId !== retryStepNodeId, + ); } // Helper functions for execution state management @@ -123,19 +117,6 @@ const createInitialJobExecutions = (flow: Flow): JobExecution[] => { })); }; -const createInitialExecution = ( - flowId: FlowId, - executionId: ExecutionId, - jobExecutions: JobExecution[], -): Execution => ({ - id: executionId, - status: "running", - runStartedAt: Date.now(), - flowId, - jobExecutions, - artifacts: [], -}); - const processStreamContent = async ( stream: StreamableValue, updateArtifact: (content: TextArtifactObject) => void, @@ -625,8 +606,12 @@ export function ExecutionProvider({ const flowRunStartedAt = Date.now(); // Initialize flow execution - const initialExecution = { - ...createInitialExecution(flowId, executionId, jobExecutions), + const initialExecution: Execution = { + id: executionId, + status: "running", + flowId, + jobExecutions, + artifacts: [], runStartedAt: flowRunStartedAt, }; setExecution(initialExecution); @@ -664,12 +649,18 @@ export function ExecutionProvider({ await flush(); const flowRunStartedAt = Date.now(); - const initialExecution = { - ...buildExecutionsFromSnapshot({ - executionSnapshot: retryExecutionSnapshot, + const initialExecution: Execution = { + id: createExecutionId(), + flowId: retryExecutionSnapshot.flow.id, + jobExecutions: buildJobExecutionsFromSnapshot( + retryExecutionSnapshot, forceRetryStepId, - }), - status: "running" as const, + ), + artifacts: buildArtifactsFromSnapshot( + retryExecutionSnapshot, + forceRetryStepId, + ), + status: "running", runStartedAt: flowRunStartedAt, }; const finalExecution = await performFlowExecution({ From 5b4b22d8d79ac9c3e1e24a2395348c2815239d51 Mon Sep 17 00:00:00 2001 From: satoshi toyama Date: Wed, 18 Dec 2024 21:49:14 +0900 Subject: [PATCH 4/4] rename --- app/(playground)/p/[agentId]/lib/execution.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/(playground)/p/[agentId]/lib/execution.ts b/app/(playground)/p/[agentId]/lib/execution.ts index 4b5f178f..d1e2cac5 100644 --- a/app/(playground)/p/[agentId]/lib/execution.ts +++ b/app/(playground)/p/[agentId]/lib/execution.ts @@ -238,7 +238,7 @@ interface ExecutionContext { flow: ExecutionSnapshot["flow"]; } -async function generateText(context: ExecutionContext) { +async function performFlowExecution(context: ExecutionContext) { const startTime = Date.now(); const lf = new Langfuse(); const trace = lf.trace({ @@ -406,7 +406,7 @@ export async function executeStep( flow, }; - return generateText(context); + return performFlowExecution(context); } export async function retryStep( @@ -428,5 +428,5 @@ export async function retryStep( flow: executionSnapshot.flow, }; - return generateText(context); + return performFlowExecution(context); }