From 1214f3eedb5ba3f915ce560b769d7cf9f0072223 Mon Sep 17 00:00:00 2001 From: Satoshi Ebisawa Date: Wed, 18 Dec 2024 14:58:13 +0900 Subject: [PATCH 01/13] Add saving agentActivities functionality --- .../p/[agentId]/contexts/execution.tsx | 24 +++++++++++++++-- .../p/[agentId]/lib/agent-activity.ts | 27 +++++++++++++++++++ app/(playground)/p/[agentId]/page.tsx | 16 +++++++++++ 3 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 app/(playground)/p/[agentId]/lib/agent-activity.ts diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 902da17d..67c7b370 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -251,6 +251,11 @@ interface ExecutionContextType { execution: Execution | null; execute: (nodeId: NodeId) => Promise; executeFlow: (flowId: FlowId) => Promise; + saveAgentActivityAction: ( + startedAt: number, + endedAt: number, + totalDurationMs: number, + ) => Promise; } const ExecutionContext = createContext( @@ -271,6 +276,11 @@ interface ExecutionProviderProps { ) => Promise>; executeStepAction: ExecuteStepAction; putExecutionAction: (execution: Execution) => Promise<{ blobUrl: string }>; + saveAgentActivityAction: ( + startedAt: number, + endedAt: number, + totalDurationMs: number, + ) => Promise; } export function ExecutionProvider({ @@ -278,6 +288,7 @@ export function ExecutionProvider({ executeAction, executeStepAction, putExecutionAction, + saveAgentActivityAction, }: ExecutionProviderProps) { const { dispatch, flush, graph } = useGraph(); const { setTab } = usePropertiesPanel(); @@ -426,28 +437,37 @@ export function ExecutionProvider({ // Complete flow execution setExecution(currentExecution); const { blobUrl } = await putExecutionAction(currentExecution); + const flowRunEndedAt = Date.now(); dispatch({ type: "addExecutionIndex", input: { executionIndex: { executionId, blobUrl, - completedAt: Date.now(), + completedAt: flowRunEndedAt, }, }, }); + await saveAgentActivityAction( + flowRunStartedAt, + flowRunEndedAt, + totalFlowDurationMs, + ); }, [ setPlaygroundMode, graph.flows, executeStepAction, putExecutionAction, + saveAgentActivityAction, dispatch, flush, ], ); return ( - + {children} ); diff --git a/app/(playground)/p/[agentId]/lib/agent-activity.ts b/app/(playground)/p/[agentId]/lib/agent-activity.ts new file mode 100644 index 00000000..b0a81251 --- /dev/null +++ b/app/(playground)/p/[agentId]/lib/agent-activity.ts @@ -0,0 +1,27 @@ +import { agentActivities, agents, db } from "@/drizzle"; +import { toUTCDate } from "@/lib/date"; +import type { AgentId } from "@/services/agents"; +import { eq } from "drizzle-orm"; + +export async function saveAgentActivity( + agentId: AgentId, + startedAt: number, + endedAt: number, + totalDurationMs: number, +) { + const records = await db + .select({ agentDbId: agents.dbId }) + .from(agents) + .where(eq(agents.id, agentId)); + if (records.length === 0) { + throw new Error(`Agent with id ${agentId} not found`); + } + const agentDbId = records[0].agentDbId; + + await db.insert(agentActivities).values({ + agentDbId, + startedAt: toUTCDate(new Date(startedAt)), + endedAt: toUTCDate(new Date(endedAt)), + totalDurationMs: totalDurationMs.toString(), + }); +} diff --git a/app/(playground)/p/[agentId]/page.tsx b/app/(playground)/p/[agentId]/page.tsx index 69544b45..3459fabb 100644 --- a/app/(playground)/p/[agentId]/page.tsx +++ b/app/(playground)/p/[agentId]/page.tsx @@ -24,6 +24,7 @@ import { PlaygroundModeProvider } from "./contexts/playground-mode"; import { PropertiesPanelProvider } from "./contexts/properties-panel"; import { ToastProvider } from "./contexts/toast"; import { ToolbarContextProvider } from "./contexts/toolbar"; +import { saveAgentActivity } from "./lib/agent-activity"; import { executeStep } from "./lib/execution"; import { isLatestVersion, migrateGraph } from "./lib/graph"; import { buildGraphExecutionPath, buildGraphFolderPath } from "./lib/utils"; @@ -144,6 +145,20 @@ export default async function Page({ return await action(artifactId, agentId, nodeId); } + async function saveAgentActivityAction( + startedAt: number, + endedAt: number, + totalDurationMs: number, + ) { + "use server"; + return await saveAgentActivity( + agentId, + startedAt, + endedAt, + totalDurationMs, + ); + } + async function executeStepAction( flowId: FlowId, executionId: ExecutionId, @@ -201,6 +216,7 @@ export default async function Page({ executeAction={execute} executeStepAction={executeStepAction} putExecutionAction={putExecutionAction} + saveAgentActivityAction={saveAgentActivityAction} > From 961f935fcf7065432cbf8335276e5a42c0cffe3a Mon Sep 17 00:00:00 2001 From: Satoshi Ebisawa Date: Wed, 18 Dec 2024 15:05:40 +0900 Subject: [PATCH 02/13] move save-agent-activity to services/agents/activities --- app/(playground)/p/[agentId]/page.tsx | 2 +- .../activities/{types.ts => deprecated-agent-activity.ts} | 3 +++ services/agents/activities/index.ts | 3 ++- .../agents/activities/save-agent-activity.ts | 0 4 files changed, 6 insertions(+), 2 deletions(-) rename services/agents/activities/{types.ts => deprecated-agent-activity.ts} (97%) rename app/(playground)/p/[agentId]/lib/agent-activity.ts => services/agents/activities/save-agent-activity.ts (100%) diff --git a/app/(playground)/p/[agentId]/page.tsx b/app/(playground)/p/[agentId]/page.tsx index 3459fabb..16046e21 100644 --- a/app/(playground)/p/[agentId]/page.tsx +++ b/app/(playground)/p/[agentId]/page.tsx @@ -9,6 +9,7 @@ import { withCountMeasurement, } from "@/lib/opentelemetry"; import { getUser } from "@/lib/supabase"; +import { saveAgentActivity } from "@/services/agents/activities"; import { del, list, put } from "@vercel/blob"; import { ReactFlowProvider } from "@xyflow/react"; import { eq } from "drizzle-orm"; @@ -24,7 +25,6 @@ import { PlaygroundModeProvider } from "./contexts/playground-mode"; import { PropertiesPanelProvider } from "./contexts/properties-panel"; import { ToastProvider } from "./contexts/toast"; import { ToolbarContextProvider } from "./contexts/toolbar"; -import { saveAgentActivity } from "./lib/agent-activity"; import { executeStep } from "./lib/execution"; import { isLatestVersion, migrateGraph } from "./lib/graph"; import { buildGraphExecutionPath, buildGraphFolderPath } from "./lib/utils"; diff --git a/services/agents/activities/types.ts b/services/agents/activities/deprecated-agent-activity.ts similarity index 97% rename from services/agents/activities/types.ts rename to services/agents/activities/deprecated-agent-activity.ts index fb05612b..92275e43 100644 --- a/services/agents/activities/types.ts +++ b/services/agents/activities/deprecated-agent-activity.ts @@ -1,5 +1,8 @@ import type { AgentId } from "../types"; +/** + * @deprecated + */ export class AgentActivity { private actions: AgentActivityAction[] = []; public agentId: AgentId; diff --git a/services/agents/activities/index.ts b/services/agents/activities/index.ts index 8e538a9c..05906a70 100644 --- a/services/agents/activities/index.ts +++ b/services/agents/activities/index.ts @@ -1,5 +1,6 @@ export { calculateAgentTimeUsageMs } from "./agent-time-usage"; export { AGENT_TIME_CHARGE_LIMIT_MINUTES } from "./constants"; +export { AgentActivity } from "./deprecated-agent-activity"; export { hasEnoughAgentTimeCharge } from "./has-enough-agent-time-charge"; -export { AgentActivity } from "./types"; +export { saveAgentActivity } from "./save-agent-activity"; export { getMonthlyBillingCycle } from "./utils"; diff --git a/app/(playground)/p/[agentId]/lib/agent-activity.ts b/services/agents/activities/save-agent-activity.ts similarity index 100% rename from app/(playground)/p/[agentId]/lib/agent-activity.ts rename to services/agents/activities/save-agent-activity.ts From a045a7043ca6822e2e5ec98cb12d4df12151b609 Mon Sep 17 00:00:00 2001 From: Satoshi Ebisawa Date: Wed, 18 Dec 2024 15:20:11 +0900 Subject: [PATCH 03/13] Add functionality to send agent time usage to Stripe --- .../p/[agentId]/contexts/execution.tsx | 12 +++++----- app/(playground)/p/[agentId]/page.tsx | 13 ++++------- services/agents/activities/index.ts | 1 + .../agents/activities/record-agent-usage.ts | 21 ++++++++++++++++++ .../agents/activities/save-agent-activity.ts | 9 ++++---- .../report-agent-time-usage.ts | 22 +++++++++++++++++++ 6 files changed, 58 insertions(+), 20 deletions(-) create mode 100644 services/agents/activities/record-agent-usage.ts create mode 100644 services/usage-based-billing/report-agent-time-usage.ts diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 67c7b370..ace10dd1 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -251,7 +251,7 @@ interface ExecutionContextType { execution: Execution | null; execute: (nodeId: NodeId) => Promise; executeFlow: (flowId: FlowId) => Promise; - saveAgentActivityAction: ( + recordAgentUsageAction: ( startedAt: number, endedAt: number, totalDurationMs: number, @@ -276,7 +276,7 @@ interface ExecutionProviderProps { ) => Promise>; executeStepAction: ExecuteStepAction; putExecutionAction: (execution: Execution) => Promise<{ blobUrl: string }>; - saveAgentActivityAction: ( + recordAgentUsageAction: ( startedAt: number, endedAt: number, totalDurationMs: number, @@ -288,7 +288,7 @@ export function ExecutionProvider({ executeAction, executeStepAction, putExecutionAction, - saveAgentActivityAction, + recordAgentUsageAction, }: ExecutionProviderProps) { const { dispatch, flush, graph } = useGraph(); const { setTab } = usePropertiesPanel(); @@ -448,7 +448,7 @@ export function ExecutionProvider({ }, }, }); - await saveAgentActivityAction( + await recordAgentUsageAction( flowRunStartedAt, flowRunEndedAt, totalFlowDurationMs, @@ -459,14 +459,14 @@ export function ExecutionProvider({ graph.flows, executeStepAction, putExecutionAction, - saveAgentActivityAction, + recordAgentUsageAction, dispatch, flush, ], ); return ( {children} diff --git a/app/(playground)/p/[agentId]/page.tsx b/app/(playground)/p/[agentId]/page.tsx index 16046e21..d458d85a 100644 --- a/app/(playground)/p/[agentId]/page.tsx +++ b/app/(playground)/p/[agentId]/page.tsx @@ -9,7 +9,7 @@ import { withCountMeasurement, } from "@/lib/opentelemetry"; import { getUser } from "@/lib/supabase"; -import { saveAgentActivity } from "@/services/agents/activities"; +import { recordAgentUsage } from "@/services/agents/activities"; import { del, list, put } from "@vercel/blob"; import { ReactFlowProvider } from "@xyflow/react"; import { eq } from "drizzle-orm"; @@ -145,18 +145,13 @@ export default async function Page({ return await action(artifactId, agentId, nodeId); } - async function saveAgentActivityAction( + async function recordAgentUsageAction( startedAt: number, endedAt: number, totalDurationMs: number, ) { "use server"; - return await saveAgentActivity( - agentId, - startedAt, - endedAt, - totalDurationMs, - ); + return await recordAgentUsage(agentId, startedAt, endedAt, totalDurationMs); } async function executeStepAction( @@ -216,7 +211,7 @@ export default async function Page({ executeAction={execute} executeStepAction={executeStepAction} putExecutionAction={putExecutionAction} - saveAgentActivityAction={saveAgentActivityAction} + recordAgentUsageAction={recordAgentUsageAction} > diff --git a/services/agents/activities/index.ts b/services/agents/activities/index.ts index 05906a70..84a48a3f 100644 --- a/services/agents/activities/index.ts +++ b/services/agents/activities/index.ts @@ -2,5 +2,6 @@ export { calculateAgentTimeUsageMs } from "./agent-time-usage"; export { AGENT_TIME_CHARGE_LIMIT_MINUTES } from "./constants"; export { AgentActivity } from "./deprecated-agent-activity"; export { hasEnoughAgentTimeCharge } from "./has-enough-agent-time-charge"; +export { recordAgentUsage } from "./record-agent-usage"; export { saveAgentActivity } from "./save-agent-activity"; export { getMonthlyBillingCycle } from "./utils"; diff --git a/services/agents/activities/record-agent-usage.ts b/services/agents/activities/record-agent-usage.ts new file mode 100644 index 00000000..1fcc4246 --- /dev/null +++ b/services/agents/activities/record-agent-usage.ts @@ -0,0 +1,21 @@ +import { toUTCDate } from "@/lib/date"; +import { reportAgentTimeUsage } from "@/services/usage-based-billing/report-agent-time-usage"; +import type { AgentId } from "../types"; +import { saveAgentActivity } from "./save-agent-activity"; + +export async function recordAgentUsage( + agentId: AgentId, + startedAt: number, + endedAt: number, + totalDurationMs: number, +) { + const startedAtDateUTC = toUTCDate(new Date(startedAt)); + const endedAtDateUTC = toUTCDate(new Date(endedAt)); + await saveAgentActivity( + agentId, + startedAtDateUTC, + endedAtDateUTC, + totalDurationMs, + ); + await reportAgentTimeUsage(endedAtDateUTC); +} diff --git a/services/agents/activities/save-agent-activity.ts b/services/agents/activities/save-agent-activity.ts index b0a81251..9c02a01d 100644 --- a/services/agents/activities/save-agent-activity.ts +++ b/services/agents/activities/save-agent-activity.ts @@ -1,12 +1,11 @@ import { agentActivities, agents, db } from "@/drizzle"; -import { toUTCDate } from "@/lib/date"; import type { AgentId } from "@/services/agents"; import { eq } from "drizzle-orm"; export async function saveAgentActivity( agentId: AgentId, - startedAt: number, - endedAt: number, + startedAt: Date, + endedAt: Date, totalDurationMs: number, ) { const records = await db @@ -20,8 +19,8 @@ export async function saveAgentActivity( await db.insert(agentActivities).values({ agentDbId, - startedAt: toUTCDate(new Date(startedAt)), - endedAt: toUTCDate(new Date(endedAt)), + startedAt: startedAt, + endedAt: endedAt, totalDurationMs: totalDurationMs.toString(), }); } diff --git a/services/usage-based-billing/report-agent-time-usage.ts b/services/usage-based-billing/report-agent-time-usage.ts new file mode 100644 index 00000000..b9932f26 --- /dev/null +++ b/services/usage-based-billing/report-agent-time-usage.ts @@ -0,0 +1,22 @@ +import { db } from "@/drizzle"; +import { stripe } from "@/services/external/stripe"; +import { fetchCurrentTeam } from "@/services/teams"; +import { processUnreportedActivities } from "@/services/usage-based-billing"; +import { AgentTimeUsageDAO } from "@/services/usage-based-billing/agent-time-usage-dao"; + +export async function reportAgentTimeUsage(targetDate: Date) { + const currentTeam = await fetchCurrentTeam(); + if (currentTeam.activeSubscriptionId == null) { + return; + } + return processUnreportedActivities( + { + teamDbId: currentTeam.dbId, + targetDate: targetDate, + }, + { + dao: new AgentTimeUsageDAO(db), + stripe: stripe, + }, + ); +} From cfb1b0789582e9a2f5fd8ba021d608675d28e81a Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Wed, 18 Dec 2024 22:47:36 +0900 Subject: [PATCH 04/13] refactor(execution): Replace stepId with step in ExecutionContext - Update ExecutionContext to use a Step type instead of StepId - Simplify step resolution logic in performFlowExecution and retryStep - Improve readability and maintainability of flow execution code --- app/(playground)/p/[agentId]/lib/execution.ts | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/app/(playground)/p/[agentId]/lib/execution.ts b/app/(playground)/p/[agentId]/lib/execution.ts index d1e2cac5..049907aa 100644 --- a/app/(playground)/p/[agentId]/lib/execution.ts +++ b/app/(playground)/p/[agentId]/lib/execution.ts @@ -20,6 +20,7 @@ import * as v from "valibot"; import type { AgentId, Artifact, + Connection, ExecutionId, ExecutionSnapshot, FlowId, @@ -28,6 +29,7 @@ import type { NodeHandle, NodeHandleId, NodeId, + Step, StepId, TextArtifactObject, TextGenerateActionContent, @@ -231,11 +233,10 @@ function resolveRequirement( interface ExecutionContext { executionId: ExecutionId; - stepId: StepId; + step: Step; artifacts: Artifact[]; nodes: Node[]; - connections: ExecutionSnapshot["connections"]; - flow: ExecutionSnapshot["flow"]; + connections: Connection[]; } async function performFlowExecution(context: ExecutionContext) { @@ -245,15 +246,7 @@ async function performFlowExecution(context: ExecutionContext) { sessionId: context.executionId, }); - const step = context.flow.jobs - .flatMap((job) => job.steps) - .find((step) => step.id === context.stepId); - - if (step === undefined) { - throw new Error(`Step with id ${context.stepId} not found`); - } - - const node = context.nodes.find((node) => node.id === step.nodeId); + const node = context.nodes.find((node) => node.id === context.step.nodeId); if (node === undefined) { throw new Error("Node not found"); } @@ -397,13 +390,20 @@ export async function executeStep( throw new Error(`Flow with id ${flowId} not found`); } + const step = flow.jobs + .flatMap((job) => job.steps) + .find((step) => step.id === stepId); + + if (step === undefined) { + throw new Error(`Step with id ${stepId} not found`); + } + const context: ExecutionContext = { executionId, - stepId, + step, artifacts, nodes: graph.nodes, connections: graph.connections, - flow, }; return performFlowExecution(context); @@ -419,13 +419,20 @@ export async function retryStep( (res) => res.json() as unknown as ExecutionSnapshot, ); + const step = executionSnapshot.flow.jobs + .flatMap((job) => job.steps) + .find((step) => step.id === stepId); + + if (step === undefined) { + throw new Error(`Step with id ${stepId} not found`); + } + const context: ExecutionContext = { executionId, - stepId, + step, artifacts, nodes: executionSnapshot.nodes, connections: executionSnapshot.connections, - flow: executionSnapshot.flow, }; return performFlowExecution(context); From cb183330d46c18cffdf453922190cebfd9e3f5d4 Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Wed, 18 Dec 2024 23:08:51 +0900 Subject: [PATCH 05/13] refactor(execution): Simplify node and artifact resolution - Extract node and artifact resolution logic into separate functions - Pass ExecutionContext directly where needed to streamline resolutions - Improve code organization and maintainability --- app/(playground)/p/[agentId]/lib/execution.ts | 109 +++++++++--------- 1 file changed, 56 insertions(+), 53 deletions(-) diff --git a/app/(playground)/p/[agentId]/lib/execution.ts b/app/(playground)/p/[agentId]/lib/execution.ts index 049907aa..68d9f465 100644 --- a/app/(playground)/p/[agentId]/lib/execution.ts +++ b/app/(playground)/p/[agentId]/lib/execution.ts @@ -64,6 +64,35 @@ function resolveLanguageModel( throw new Error("Unsupported model provider"); } +function nodeResolver(nodeHandleId: NodeHandleId, context: ExecutionContext) { + const connection = context.connections.find( + (connection) => connection.targetNodeHandleId === nodeHandleId, + ); + const node = context.nodes.find( + (node) => node.id === connection?.sourceNodeId, + ); + if (node === undefined) { + return null; + } + return node; +} + +function artifactResolver( + artifactCreatorNodeId: NodeId, + context: ExecutionContext, +) { + const generatedArtifact = context.artifacts.find( + (artifact) => artifact.creatorNodeId === artifactCreatorNodeId, + ); + if ( + generatedArtifact === undefined || + generatedArtifact.type !== "generatedArtifact" + ) { + return null; + } + return generatedArtifact; +} + const artifactSchema = v.object({ plan: v.pipe( v.string(), @@ -112,10 +141,13 @@ interface TextGenerationSource extends ExecutionSourceBase { } type ExecutionSource = TextSource | TextGenerationSource | FileSource; -async function resolveSources(sources: NodeHandle[], resolver: SourceResolver) { +async function resolveSources( + sources: NodeHandle[], + context: ExecutionContext, +) { return Promise.all( sources.map(async (source) => { - const node = resolver.nodeResolver(source.id); + const node = nodeResolver(source.id, context); switch (node?.content.type) { case "text": return { @@ -179,7 +211,7 @@ async function resolveSources(sources: NodeHandle[], resolver: SourceResolver) { ); } case "textGeneration": { - const generatedArtifact = resolver.artifactResolver(node.id); + const generatedArtifact = artifactResolver(node.id, context); if ( generatedArtifact === null || generatedArtifact.type !== "generatedArtifact" @@ -200,24 +232,19 @@ async function resolveSources(sources: NodeHandle[], resolver: SourceResolver) { ).then((sources) => sources.filter((source) => source !== null).flat()); } -interface RequirementResolver { - nodeResolver: NodeResolver; - artifactResolver: ArtifactResolver; -} - function resolveRequirement( requirement: NodeHandle | null, - resolver: RequirementResolver, + context: ExecutionContext, ) { if (requirement === null) { return null; } - const node = resolver.nodeResolver(requirement.id); + const node = nodeResolver(requirement.id, context); switch (node?.content.type) { case "text": return node.content.text; case "textGeneration": { - const generatedArtifact = resolver.artifactResolver(node.id); + const generatedArtifact = artifactResolver(node.id, context); if ( generatedArtifact === null || generatedArtifact.type === "generatedArtifact" @@ -233,7 +260,7 @@ function resolveRequirement( interface ExecutionContext { executionId: ExecutionId; - step: Step; + node: Node; artifacts: Artifact[]; nodes: Node[]; connections: Connection[]; @@ -245,48 +272,15 @@ async function performFlowExecution(context: ExecutionContext) { const trace = lf.trace({ sessionId: context.executionId, }); - - const node = context.nodes.find((node) => node.id === context.step.nodeId); - if (node === undefined) { - throw new Error("Node not found"); - } - - function nodeResolver(nodeHandleId: NodeHandleId) { - const connection = context.connections.find( - (connection) => connection.targetNodeHandleId === nodeHandleId, - ); - const node = context.nodes.find( - (node) => node.id === connection?.sourceNodeId, - ); - if (node === undefined) { - return null; - } - return node; - } - - function artifactResolver(artifactCreatorNodeId: NodeId) { - const generatedArtifact = context.artifacts.find( - (artifact) => artifact.creatorNodeId === artifactCreatorNodeId, - ); - if ( - generatedArtifact === undefined || - generatedArtifact.type !== "generatedArtifact" - ) { - return null; - } - return generatedArtifact; - } + const node = context.node; switch (node.content.type) { case "textGeneration": { - const actionSources = await resolveSources(node.content.sources, { - nodeResolver, - artifactResolver, - }); - const requirement = resolveRequirement(node.content.requirement ?? null, { - nodeResolver, - artifactResolver, - }); + const actionSources = await resolveSources(node.content.sources, context); + const requirement = resolveRequirement( + node.content.requirement ?? null, + context, + ); const model = resolveLanguageModel(node.content.llm); const promptTemplate = HandleBars.compile( node.content.system ?? textGenerationPrompt, @@ -397,10 +391,14 @@ export async function executeStep( if (step === undefined) { throw new Error(`Step with id ${stepId} not found`); } + const node = graph.nodes.find((node) => node.id === step.nodeId); + if (node === undefined) { + throw new Error("Node not found"); + } const context: ExecutionContext = { executionId, - step, + node, artifacts, nodes: graph.nodes, connections: graph.connections, @@ -427,9 +425,14 @@ export async function retryStep( throw new Error(`Step with id ${stepId} not found`); } + const node = executionSnapshot.nodes.find((node) => node.id === step.nodeId); + if (node === undefined) { + throw new Error("Node not found"); + } + const context: ExecutionContext = { executionId, - step, + node, artifacts, nodes: executionSnapshot.nodes, connections: executionSnapshot.connections, From 455137a3cb28de12b13fc0fa3f9b053a89ec8a05 Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Wed, 18 Dec 2024 23:48:46 +0900 Subject: [PATCH 06/13] refactor(execution): Remove unused flowId parameter from execution - Eliminate the flowId parameter from ExecuteFlowParams and related calls - Simplify function signatures by focusing on necessary parameters - Enhance code clarity and reduce redundancy --- app/(playground)/p/[agentId]/contexts/execution.tsx | 4 ---- 1 file changed, 4 deletions(-) diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 26a51825..6ddd2a94 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -486,7 +486,6 @@ export function ExecutionProvider({ ); interface ExecuteFlowParams { - flowId: FlowId; initialExecution: Execution; flow: Flow; nodes: Node[]; @@ -497,7 +496,6 @@ export function ExecutionProvider({ } const performFlowExecution = useCallback( async ({ - flowId, initialExecution, flow, nodes, @@ -617,7 +615,6 @@ export function ExecutionProvider({ setExecution(initialExecution); const finalExecution = await performFlowExecution({ - flowId, initialExecution, flow, nodes: graph.nodes, @@ -664,7 +661,6 @@ export function ExecutionProvider({ runStartedAt: flowRunStartedAt, }; const finalExecution = await performFlowExecution({ - flowId: retryExecutionSnapshot.flow.id, initialExecution, flow: retryExecutionSnapshot.flow, nodes: retryExecutionSnapshot.nodes, From 0a1f74a5fe7bf19ad4d1b122a077d7751b433eb7 Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Wed, 18 Dec 2024 23:48:57 +0900 Subject: [PATCH 07/13] remove --- app/(playground)/p/[agentId]/lib/graph.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/app/(playground)/p/[agentId]/lib/graph.test.ts b/app/(playground)/p/[agentId]/lib/graph.test.ts index ca097819..d64e2c90 100644 --- a/app/(playground)/p/[agentId]/lib/graph.test.ts +++ b/app/(playground)/p/[agentId]/lib/graph.test.ts @@ -150,7 +150,6 @@ describe("deriveFlows", () => { expect(flows[1].nodes.length).toBe(3); }); test("ignore ghost connectors", () => { - console.log(flows[1].jobs[2].steps); expect(flows[1].jobs[2].steps.length).toBe(1); }); }); From 4117d6b6f4b4020c704c938fc74395e1b232dfe2 Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Wed, 18 Dec 2024 23:58:28 +0900 Subject: [PATCH 08/13] feat(execution): Add node execution capability - Introduce executeNode function to handle execution for a single node - Update ExecutionProvider to include executeNode method and related logic - Modify PropertiesPanel to call executeNode instead of the previous execute method - Implement flow derivation from a node context for cleaner execution handling - Enhance graph utilities with a test case for one-node scenarios --- .../[agentId]/components/properties-panel.tsx | 6 +- .../p/[agentId]/contexts/execution.tsx | 55 ++++++++++++++++++- app/(playground)/p/[agentId]/lib/execution.ts | 33 +++++++++++ .../p/[agentId]/lib/graph.test.ts | 24 ++++++++ app/(playground)/p/[agentId]/lib/graph.ts | 4 +- app/(playground)/p/[agentId]/page.tsx | 8 ++- 6 files changed, 124 insertions(+), 6 deletions(-) diff --git a/app/(playground)/p/[agentId]/components/properties-panel.tsx b/app/(playground)/p/[agentId]/components/properties-panel.tsx index 677b2173..54bfeace 100644 --- a/app/(playground)/p/[agentId]/components/properties-panel.tsx +++ b/app/(playground)/p/[agentId]/components/properties-panel.tsx @@ -288,7 +288,7 @@ export function PropertiesPanel() { const { graph, dispatch, flush } = useGraph(); const selectedNode = useSelectedNode(); const { open, setOpen, tab, setTab } = usePropertiesPanel(); - const { execute } = useExecution(); + const { executeNode } = useExecution(); return (
execute(selectedNode.id)} + onClick={() => executeNode(selectedNode.id)} > Generate @@ -630,7 +630,7 @@ export function PropertiesPanel() { setTab("Prompt"); }} onEditPrompt={() => setTab("Prompt")} - onGenerateText={() => execute(selectedNode.id)} + onGenerateText={() => executeNode(selectedNode.id)} /> )} diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 6ddd2a94..388babff 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -8,6 +8,7 @@ import { useContext, useState, } from "react"; +import { deriveFlows } from "../lib/graph"; import { createArtifactId, createExecutionId, @@ -346,6 +347,7 @@ const executeJob = async ( interface ExecutionContextType { execution: Execution | null; execute: (nodeId: NodeId) => Promise; + executeNode: (nodeId: NodeId) => Promise; executeFlow: (flowId: FlowId) => Promise; retryFlowExecution: ( executionId: ExecutionId, @@ -381,6 +383,10 @@ interface ExecutionProviderProps { executionSnapshot: ExecutionSnapshot, ) => Promise<{ blobUrl: string }>; retryStepAction: RetryStepAction; + executeNodeAction: ( + executionId: ExecutionId, + nodeId: NodeId, + ) => Promise>; } export function ExecutionProvider({ @@ -389,6 +395,7 @@ export function ExecutionProvider({ executeStepAction, putExecutionAction, retryStepAction, + executeNodeAction, }: ExecutionProviderProps) { const { dispatch, flush, graph } = useGraph(); const { setTab } = usePropertiesPanel(); @@ -678,9 +685,55 @@ export function ExecutionProvider({ [graph.executionIndexes, flush, retryStepAction, performFlowExecution], ); + const executeNode = useCallback( + async (nodeId: NodeId) => { + const executionId = createExecutionId(); + const flowRunStartedAt = Date.now(); + const node = graph.nodes.find((node) => node.id === nodeId); + if (node === undefined) { + throw new Error("Node not found"); + } + + const tmpFlows = deriveFlows({ + nodes: [node], + connections: [], + }); + if (tmpFlows.length !== 1) { + throw new Error("Unexpected number of flows"); + } + const tmpFlow = tmpFlows[0]; + + // Initialize flow execution + const initialExecution: Execution = { + id: executionId, + status: "running", + jobExecutions: createInitialJobExecutions(tmpFlow), + artifacts: [], + runStartedAt: flowRunStartedAt, + }; + setExecution(initialExecution); + const finalExecution = await performFlowExecution({ + initialExecution, + flow: tmpFlow, + nodes: graph.nodes, + connections: graph.connections, + executeStepCallback: (stepId) => + executeNodeAction(executionId, node.id), + }); + setExecution(finalExecution); + }, + [executeNodeAction, graph.connections, graph.nodes, performFlowExecution], + ); + return ( {children} diff --git a/app/(playground)/p/[agentId]/lib/execution.ts b/app/(playground)/p/[agentId]/lib/execution.ts index 68d9f465..2d972c5c 100644 --- a/app/(playground)/p/[agentId]/lib/execution.ts +++ b/app/(playground)/p/[agentId]/lib/execution.ts @@ -440,3 +440,36 @@ export async function retryStep( return performFlowExecution(context); } + +export async function executeNode( + agentId: AgentId, + executionId: ExecutionId, + nodeId: NodeId, +) { + const agent = await db.query.agents.findFirst({ + where: (agents, { eq }) => eq(agents.id, agentId), + }); + + if (agent === undefined || agent.graphUrl === null) { + throw new Error(`Agent with id ${agentId} not found`); + } + + const graph = await fetch(agent.graphUrl).then( + (res) => res.json() as unknown as Graph, + ); + + const node = graph.nodes.find((node) => node.id === nodeId); + if (node === undefined) { + throw new Error("Node not found"); + } + + const context: ExecutionContext = { + executionId, + node, + artifacts: graph.artifacts, + nodes: graph.nodes, + connections: graph.connections, + }; + + return performFlowExecution(context); +} diff --git a/app/(playground)/p/[agentId]/lib/graph.test.ts b/app/(playground)/p/[agentId]/lib/graph.test.ts index d64e2c90..4f4919ae 100644 --- a/app/(playground)/p/[agentId]/lib/graph.test.ts +++ b/app/(playground)/p/[agentId]/lib/graph.test.ts @@ -152,6 +152,30 @@ describe("deriveFlows", () => { test("ignore ghost connectors", () => { expect(flows[1].jobs[2].steps.length).toBe(1); }); + test("one node graph", () => { + const testFlows = deriveFlows({ + nodes: [ + { + id: "nd_onenode", + name: "Summary", + position: { x: 420, y: 180 }, + selected: false, + type: "action", + content: { + type: "textGeneration", + llm: "anthropic:claude-3-5-sonnet-latest", + temperature: 0.7, + topP: 1, + instruction: "Please let me know key takeaway about ", + sources: [], + }, + }, + ], + connections: [], + }); + expect(testFlows.length).toBe(1); + expect(testFlows[0].jobs[0].steps[0].nodeId).toBe("nd_onenode"); + }); }); describe("isLatestVersion", () => { diff --git a/app/(playground)/p/[agentId]/lib/graph.ts b/app/(playground)/p/[agentId]/lib/graph.ts index edc9b966..2ab40e96 100644 --- a/app/(playground)/p/[agentId]/lib/graph.ts +++ b/app/(playground)/p/[agentId]/lib/graph.ts @@ -12,7 +12,9 @@ import type { } from "../types"; import { createFlowId, createJobId, createStepId } from "./utils"; -export function deriveFlows(graph: Graph): Flow[] { +export function deriveFlows( + graph: Pick, +): Flow[] { const processedNodes = new Set(); const flows: Flow[] = []; const connectionMap = new Map>(); diff --git a/app/(playground)/p/[agentId]/page.tsx b/app/(playground)/p/[agentId]/page.tsx index bc02892a..2b3521b1 100644 --- a/app/(playground)/p/[agentId]/page.tsx +++ b/app/(playground)/p/[agentId]/page.tsx @@ -24,7 +24,7 @@ import { PlaygroundModeProvider } from "./contexts/playground-mode"; import { PropertiesPanelProvider } from "./contexts/properties-panel"; import { ToastProvider } from "./contexts/toast"; import { ToolbarContextProvider } from "./contexts/toolbar"; -import { executeStep, retryStep } from "./lib/execution"; +import { executeNode, executeStep, retryStep } from "./lib/execution"; import { isLatestVersion, migrateGraph } from "./lib/graph"; import { buildGraphExecutionPath, buildGraphFolderPath } from "./lib/utils"; import type { @@ -197,6 +197,11 @@ export default async function Page({ ); } + async function executeNodeAction(executionId: ExecutionId, nodeId: NodeId) { + "use server"; + return await executeNode(agentId, executionId, nodeId); + } + return ( From c03bc27e1e41f7e3f12a58e69f47cd822860de2a Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Thu, 19 Dec 2024 00:21:32 +0900 Subject: [PATCH 09/13] feat(execution): Enhance artifact updates during execution - Introduce updateArtifact callback to handle live updates of artifacts - Refactor executeStep and executeJob to utilize the new callback - Improve performance of artifact management during flow execution - Ensure consistent state updates across artifact transformations --- .../p/[agentId]/contexts/execution.tsx | 74 +++++++++++++++---- 1 file changed, 59 insertions(+), 15 deletions(-) diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 388babff..7c9d0134 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -146,6 +146,7 @@ const executeStep = async ( updateExecution: ( updater: (prev: Execution | null) => Execution | null, ) => void, + updateArtifact: (artifactId: ArtifactId, content: TextArtifactObject) => void, ): Promise => { if (stepExecution.status === "completed") { return stepExecution; @@ -186,19 +187,9 @@ const executeStep = async ( try { // Execute step and process stream const stream = await executeStepAction(stepExecution.stepId); - const finalArtifact = await processStreamContent(stream, (content) => { - updateExecution((prev) => { - if (!prev || prev.status !== "running") return null; - return { - ...prev, - artifacts: prev.artifacts.map((artifact) => - artifact.id === artifactId - ? { ...artifact, object: content } - : artifact, - ), - }; - }); - }); + const finalArtifact = await processStreamContent(stream, (content) => + updateArtifact(artifactId, content), + ); // Complete step execution const stepDurationMs = Date.now() - stepRunStartedAt; @@ -273,6 +264,7 @@ const executeJob = async ( updateExecution: ( updater: (prev: Execution | null) => Execution | null, ) => void, + updateArtifact: (artifactId: ArtifactId, content: TextArtifactObject) => void, ): Promise => { const jobRunStartedAt = Date.now(); @@ -292,7 +284,7 @@ const executeJob = async ( // Execute all steps in parallel const stepExecutions = await Promise.all( jobExecution.stepExecutions.map((step) => - executeStep(step, executeStepAction, updateExecution), + executeStep(step, executeStepAction, updateExecution, updateArtifact), ), ); @@ -500,6 +492,10 @@ export function ExecutionProvider({ executeStepCallback: ( stepId: StepId, ) => Promise>; + updateArtifactCallback: ( + artifactId: ArtifactId, + content: TextArtifactObject, + ) => void; } const performFlowExecution = useCallback( async ({ @@ -508,6 +504,7 @@ export function ExecutionProvider({ nodes, connections, executeStepCallback, + updateArtifactCallback, }: ExecuteFlowParams) => { let currentExecution = initialExecution; let totalFlowDurationMs = 0; @@ -549,6 +546,7 @@ export function ExecutionProvider({ setExecution(updated); } }, + updateArtifactCallback, ); totalFlowDurationMs += executedJob.durationMs; @@ -633,6 +631,19 @@ export function ExecutionProvider({ stepId, initialExecution.artifacts, ), + updateArtifactCallback: (artifactId, content) => { + setExecution((prev) => { + if (!prev || prev.status !== "running") return null; + return { + ...prev, + artifacts: prev.artifacts.map((artifact) => + artifact.id === artifactId + ? { ...artifact, object: content } + : artifact, + ), + }; + }); + }, }); setExecution(finalExecution); }, @@ -679,6 +690,19 @@ export function ExecutionProvider({ stepId, initialExecution.artifacts, ), + updateArtifactCallback: (artifactId, content) => { + setExecution((prev) => { + if (!prev || prev.status !== "running") return null; + return { + ...prev, + artifacts: prev.artifacts.map((artifact) => + artifact.id === artifactId + ? { ...artifact, object: content } + : artifact, + ), + }; + }); + }, }); setExecution(finalExecution); }, @@ -719,10 +743,30 @@ export function ExecutionProvider({ connections: graph.connections, executeStepCallback: (stepId) => executeNodeAction(executionId, node.id), + updateArtifactCallback: (artifactId, content) => { + dispatch({ + type: "upsertArtifact", + input: { + nodeId, + artifact: { + id: artifactId, + type: "streamArtifact", + creatorNodeId: nodeId, + object: content, + }, + }, + }); + }, }); setExecution(finalExecution); }, - [executeNodeAction, graph.connections, graph.nodes, performFlowExecution], + [ + executeNodeAction, + graph.connections, + graph.nodes, + performFlowExecution, + dispatch, + ], ); return ( From b0f243aa04d1b1e2efbab1e091e0a30dc0dd6c0c Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Thu, 19 Dec 2024 00:21:59 +0900 Subject: [PATCH 10/13] refactor(actions): Remove unused action function and imports - Eliminate the action function and its associated logic, as it is no longer needed - Clean up imports in actions, properties panel, and execution contexts to reflect the removal - Streamline codebase by removing redundant artifact management logic in execution --- app/(playground)/p/[agentId]/actions.ts | 350 +----------------- .../[agentId]/components/properties-panel.tsx | 2 +- .../p/[agentId]/contexts/execution.tsx | 96 ----- app/(playground)/p/[agentId]/page.tsx | 8 +- 4 files changed, 4 insertions(+), 452 deletions(-) diff --git a/app/(playground)/p/[agentId]/actions.ts b/app/(playground)/p/[agentId]/actions.ts index 10594435..458275fa 100644 --- a/app/(playground)/p/[agentId]/actions.ts +++ b/app/(playground)/p/[agentId]/actions.ts @@ -1,370 +1,24 @@ "use server"; -import { db } from "@/drizzle"; import { ExternalServiceName, VercelBlobOperation, createLogger, waitForTelemetryExport, withCountMeasurement, - withTokenMeasurement, } from "@/lib/opentelemetry"; -import { anthropic } from "@ai-sdk/anthropic"; -import { google } from "@ai-sdk/google"; -import { openai } from "@ai-sdk/openai"; -import { toJsonSchema } from "@valibot/to-json-schema"; import { type ListBlobResult, del, list, put } from "@vercel/blob"; -import { type LanguageModelV1, jsonSchema, streamObject } from "ai"; -import { createStreamableValue } from "ai/rsc"; -import { MockLanguageModelV1, simulateReadableStream } from "ai/test"; -import HandleBars from "handlebars"; -import Langfuse from "langfuse"; import { UnstructuredClient } from "unstructured-client"; import { Strategy } from "unstructured-client/sdk/models/shared"; -import * as v from "valibot"; -import { vercelBlobFileFolder, vercelBlobGraphFolder } from "./constants"; +import { vercelBlobFileFolder } from "./constants"; -import { textGenerationPrompt } from "./lib/prompts"; import { buildFileFolderPath, buildGraphPath, elementsToMarkdown, - langfuseModel, pathJoin, - toErrorWithMessage, } from "./lib/utils"; -import type { - AgentId, - ArtifactId, - FileData, - FileId, - Graph, - GraphId, - NodeHandle, - NodeHandleId, - NodeId, - TextArtifactObject, - TextGenerateActionContent, -} from "./types"; - -function resolveLanguageModel( - llm: TextGenerateActionContent["llm"], -): LanguageModelV1 { - const [provider, model] = llm.split(":"); - if (provider === "openai") { - return openai(model); - } - if (provider === "anthropic") { - return anthropic(model); - } - if (provider === "google") { - return google(model); - } - if (provider === "dev") { - return new MockLanguageModelV1({ - defaultObjectGenerationMode: "json", - doStream: async () => ({ - stream: simulateReadableStream({ - chunks: [{ type: "error", error: "a" }], - }), - rawCall: { rawPrompt: null, rawSettings: {} }, - }), - }); - } - throw new Error("Unsupported model provider"); -} - -const artifactSchema = v.object({ - plan: v.pipe( - v.string(), - v.description( - "How you think about the content of the artefact (purpose, structure, essentials) and how you intend to output it", - ), - ), - title: v.pipe(v.string(), v.description("The title of the artefact")), - content: v.pipe( - v.string(), - v.description("The content of the artefact formatted markdown."), - ), - description: v.pipe( - v.string(), - v.description( - "Explanation of the Artifact and what the intention was in creating this Artifact. Add any suggestions for making it even better.", - ), - ), -}); - -interface ActionSourceBase { - type: string; - nodeId: NodeId; -} - -interface TextSource extends ActionSourceBase { - type: "text"; - content: string; -} -interface FileSource extends ActionSourceBase { - type: "file"; - title: string; - content: string; -} -interface TextGenerationSource extends ActionSourceBase { - type: "textGeneration"; - title: string; - content: string; -} - -type ActionSource = TextSource | TextGenerationSource | FileSource; - -export async function action( - artifactId: ArtifactId, - agentId: AgentId, - nodeId: NodeId, -) { - const startTime = Date.now(); - const lf = new Langfuse(); - const trace = lf.trace({ - sessionId: artifactId, - }); - - const agent = await db.query.agents.findFirst({ - where: (agents, { eq }) => eq(agents.id, agentId), - }); - if (agent === undefined || agent.graphUrl === null) { - throw new Error(`Agent with id ${agentId} not found`); - } - - const graph = await fetch(agent.graphUrl).then( - (res) => res.json() as unknown as Graph, - ); - const node = graph.nodes.find((node) => node.id === nodeId); - if (node === undefined) { - throw new Error("Node not found"); - } - /** - * This function is a helper that retrieves a node from the graph - * based on its NodeHandleId. It looks for a connection in the - * graph that matches the provided handleId and returns the - * corresponding node if found, or null if no such node exists. - */ - function findNode(handleId: NodeHandleId) { - const connection = graph.connections.find( - (connection) => connection.targetNodeHandleId === handleId, - ); - const node = graph.nodes.find( - (node) => node.id === connection?.sourceNodeId, - ); - if (node === undefined) { - return null; - } - return node; - } - - /** - * The resolveSources function maps over an array of NodeHandles, - * finds the corresponding nodes in the graph, and returns an - * array of ActionSources. It handles both text and text generation - * sources and filters out any null results. If a text node is - * found, it extracts the text content; if a textGeneration node - * is found, it retrieves the corresponding generatedArtifact. - */ - async function resolveSources(sources: NodeHandle[]) { - return Promise.all( - sources.map(async (source) => { - const node = findNode(source.id); - switch (node?.content.type) { - case "text": - return { - type: "text", - content: node.content.text, - nodeId: node.id, - } satisfies ActionSource; - case "file": { - if (node.content.data == null) { - throw new Error("File not found"); - } - if (node.content.data.status === "uploading") { - /** @todo Let user know file is uploading*/ - throw new Error("File is uploading"); - } - if (node.content.data.status === "processing") { - /** @todo Let user know file is processing*/ - throw new Error("File is processing"); - } - if (node.content.data.status === "failed") { - return null; - } - const text = await fetch(node.content.data.textDataUrl).then( - (res) => res.text(), - ); - return { - type: "file", - title: node.content.data.name, - content: text, - nodeId: node.id, - } satisfies ActionSource; - } - - case "files": { - return await Promise.all( - node.content.data.map(async (file) => { - if (file == null) { - throw new Error("File not found"); - } - if (file.status === "uploading") { - /** @todo Let user know file is uploading*/ - throw new Error("File is uploading"); - } - if (file.status === "processing") { - /** @todo Let user know file is processing*/ - throw new Error("File is processing"); - } - if (file.status === "failed") { - return null; - } - const text = await fetch(file.textDataUrl).then((res) => - res.text(), - ); - return { - type: "file", - title: file.name, - content: text, - nodeId: node.id, - } satisfies ActionSource; - }), - ); - } - case "textGeneration": { - const generatedArtifact = graph.artifacts.find( - (artifact) => artifact.creatorNodeId === node.id, - ); - if ( - generatedArtifact === undefined || - generatedArtifact.type !== "generatedArtifact" - ) { - return null; - } - return { - type: "textGeneration", - title: generatedArtifact.object.title, - content: generatedArtifact.object.content, - nodeId: node.id, - } satisfies ActionSource; - } - default: - return null; - } - }), - ).then((sources) => sources.filter((source) => source !== null).flat()); - } - - /** - * The resolveRequirement function retrieves the content of a - * specified requirement node, if it exists. It looks for - * the node in the graph based on the given NodeHandle. - * If the node is of type "text", it returns the text - * content; if it is of type "textGeneration", it looks - * for the corresponding generated artifact and returns - * its content. If the node is not found or does not match - * the expected types, it returns null. - */ - function resolveRequirement(requirement?: NodeHandle) { - if (requirement === undefined) { - return null; - } - const node = findNode(requirement.id); - switch (node?.content.type) { - case "text": - return node.content.text; - case "textGeneration": { - const generatedArtifact = graph.artifacts.find( - (artifact) => artifact.creatorNodeId === node.id, - ); - if ( - generatedArtifact === undefined || - generatedArtifact.type === "generatedArtifact" - ) { - return null; - } - return generatedArtifact.object.content; - } - default: - return null; - } - } - - // The main switch statement handles the different types of nodes - switch (node.content.type) { - case "textGeneration": { - const actionSources = await resolveSources(node.content.sources); - const requirement = resolveRequirement(node.content.requirement); - const model = resolveLanguageModel(node.content.llm); - const promptTemplate = HandleBars.compile( - node.content.system ?? textGenerationPrompt, - ); - const prompt = promptTemplate({ - instruction: node.content.instruction, - requirement, - sources: actionSources, - }); - const topP = node.content.topP; - const temperature = node.content.temperature; - const stream = createStreamableValue(); - - const generationTracer = trace.generation({ - name: "generate-text", - input: prompt, - model: langfuseModel(node.content.llm), - modelParameters: { - topP: node.content.topP, - temperature: node.content.temperature, - }, - }); - (async () => { - const { partialObjectStream, object, usage } = streamObject({ - model, - prompt, - schema: jsonSchema>( - toJsonSchema(artifactSchema), - ), - topP, - temperature, - }); - - for await (const partialObject of partialObjectStream) { - stream.update({ - type: "text", - title: partialObject.title ?? "", - content: partialObject.content ?? "", - messages: { - plan: partialObject.plan ?? "", - description: partialObject.description ?? "", - }, - }); - } - const result = await object; - - await withTokenMeasurement( - createLogger(node.content.type), - async () => { - generationTracer.end({ output: result }); - await lf.shutdownAsync(); - waitForTelemetryExport(); - return { usage: await usage }; - }, - model, - startTime, - ); - stream.done(); - })().catch((error) => { - stream.error(error); - }); - return stream.value; - } - default: - throw new Error("Invalid node type"); - } -} +import type { FileData, FileId, Graph } from "./types"; export async function parse(id: FileId, name: string, blobUrl: string) { const startTime = Date.now(); diff --git a/app/(playground)/p/[agentId]/components/properties-panel.tsx b/app/(playground)/p/[agentId]/components/properties-panel.tsx index 54bfeace..0121bc93 100644 --- a/app/(playground)/p/[agentId]/components/properties-panel.tsx +++ b/app/(playground)/p/[agentId]/components/properties-panel.tsx @@ -29,7 +29,7 @@ import { useMemo, useState, } from "react"; -import { action, parse, remove } from "../actions"; +import { parse, remove } from "../actions"; import { vercelBlobFileFolder } from "../constants"; import { useDeveloperMode } from "../contexts/developer-mode"; import { useExecution } from "../contexts/execution"; diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 7c9d0134..b49fc5dc 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -338,7 +338,6 @@ const executeJob = async ( interface ExecutionContextType { execution: Execution | null; - execute: (nodeId: NodeId) => Promise; executeNode: (nodeId: NodeId) => Promise; executeFlow: (flowId: FlowId) => Promise; retryFlowExecution: ( @@ -366,10 +365,6 @@ type RetryStepAction = ( ) => Promise>; interface ExecutionProviderProps { children: ReactNode; - executeAction: ( - artifactId: ArtifactId, - nodeId: NodeId, - ) => Promise>; executeStepAction: ExecuteStepAction; putExecutionAction: ( executionSnapshot: ExecutionSnapshot, @@ -383,7 +378,6 @@ interface ExecutionProviderProps { export function ExecutionProvider({ children, - executeAction, executeStepAction, putExecutionAction, retryStepAction, @@ -395,95 +389,6 @@ export function ExecutionProvider({ const { setPlaygroundMode } = usePlaygroundMode(); const [execution, setExecution] = useState(null); - const execute = useCallback( - async (nodeId: NodeId) => { - const artifactId = createArtifactId(); - dispatch({ - type: "upsertArtifact", - input: { - nodeId, - artifact: { - id: artifactId, - type: "streamArtifact", - creatorNodeId: nodeId, - object: { - type: "text", - title: "", - content: "", - messages: { - plan: "", - description: "", - }, - }, - }, - }, - }); - setTab("Result"); - await flush(); - try { - const stream = await executeAction(artifactId, nodeId); - - let textArtifactObject: TextArtifactObject = { - type: "text", - title: "", - content: "", - messages: { - plan: "", - description: "", - }, - }; - for await (const streamContent of readStreamableValue(stream)) { - if (streamContent === undefined) { - continue; - } - dispatch({ - type: "upsertArtifact", - input: { - nodeId, - artifact: { - id: artifactId, - type: "streamArtifact", - creatorNodeId: nodeId, - object: streamContent, - }, - }, - }); - textArtifactObject = { - ...textArtifactObject, - ...streamContent, - }; - } - dispatch({ - type: "upsertArtifact", - input: { - nodeId, - artifact: { - id: artifactId, - type: "generatedArtifact", - creatorNodeId: nodeId, - createdAt: Date.now(), - object: textArtifactObject, - }, - }, - }); - } catch (error) { - addToast({ - type: "error", - title: "Execution failed", - message: toErrorWithMessage(error).message, - }); - dispatch({ - type: "upsertArtifact", - input: { - nodeId, - artifact: null, - }, - }); - } - }, - [executeAction, dispatch, flush, setTab, addToast], - ); - interface ExecuteFlowParams { initialExecution: Execution; flow: Flow; @@ -773,7 +678,6 @@ export function ExecutionProvider({ Date: Thu, 19 Dec 2024 01:01:41 +0900 Subject: [PATCH 11/13] feat(execution): Enhance step and job execution with callbacks - Refactor executeStep and executeJob to accept callback functions for success and failure handling - Introduce onStepFinish and onStepFail callbacks for better integration of execution outcomes - Update the ExecutionProvider to manage artifact updates and provide step execution feedback - Improve overall artifact management during node processing and execution flow --- .../p/[agentId]/contexts/execution.tsx | 120 +++++++++++++----- app/(playground)/p/[agentId]/types.ts | 2 +- 2 files changed, 88 insertions(+), 34 deletions(-) diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index b49fc5dc..6cd096a6 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -32,12 +32,14 @@ import type { FailedStepExecution, Flow, FlowId, + GeneratedArtifact, JobExecution, Node, NodeId, SkippedJobExecution, StepExecution, StepId, + TextArtifact, TextArtifactObject, } from "../types"; import { useGraph } from "./graph"; @@ -138,16 +140,28 @@ const processStreamContent = async ( return textArtifactObject; }; -const executeStep = async ( - stepExecution: StepExecution, +const executeStep = async ({ + stepExecution, + executeStepAction, + updateExecution, + updateArtifact, + onStepFinish, + onStepFail, +}: { + stepExecution: StepExecution; executeStepAction: ( stepId: StepId, - ) => Promise>, + ) => Promise>; updateExecution: ( updater: (prev: Execution | null) => Execution | null, - ) => void, - updateArtifact: (artifactId: ArtifactId, content: TextArtifactObject) => void, -): Promise => { + ) => void; + updateArtifact: (artifactId: ArtifactId, content: TextArtifactObject) => void; + onStepFinish?: ( + stepExecution: CompletedStepExecution, + artifact: TextArtifact, + ) => void; + onStepFail?: (stepExecution: FailedStepExecution) => void; +}): Promise => { if (stepExecution.status === "completed") { return stepExecution; } @@ -194,12 +208,19 @@ const executeStep = async ( // Complete step execution const stepDurationMs = Date.now() - stepRunStartedAt; - const successStepExecution: CompletedStepExecution = { + const completedStepExecution: CompletedStepExecution = { ...stepExecution, status: "completed", runStartedAt: stepRunStartedAt, durationMs: stepDurationMs, }; + const generatedArtifact = { + id: artifactId, + type: "generatedArtifact", + creatorNodeId: stepExecution.nodeId, + createdAt: Date.now(), + object: finalArtifact, + } satisfies TextArtifact; updateExecution((prev) => { if (!prev || prev.status !== "running") return null; @@ -208,24 +229,16 @@ const executeStep = async ( jobExecutions: prev.jobExecutions.map((job) => ({ ...job, stepExecutions: job.stepExecutions.map((step) => - step.id === stepExecution.id ? successStepExecution : step, + step.id === stepExecution.id ? completedStepExecution : step, ), })), artifacts: prev.artifacts.map((artifact) => - artifact.id === artifactId - ? { - id: artifactId, - type: "generatedArtifact", - creatorNodeId: stepExecution.nodeId, - createdAt: Date.now(), - object: finalArtifact, - } - : artifact, + artifact.id === artifactId ? generatedArtifact : artifact, ), }; }); - - return successStepExecution; + onStepFinish?.(completedStepExecution, generatedArtifact); + return completedStepExecution; } catch (unknownError) { const error = toErrorWithMessage(unknownError).message; const stepDurationMs = Date.now() - stepRunStartedAt; @@ -252,20 +265,33 @@ const executeStep = async ( ), }; }); + onStepFail?.(failedStepExecution); return failedStepExecution; } }; -const executeJob = async ( - jobExecution: JobExecution, +const executeJob = async ({ + jobExecution, + executeStepAction, + updateArtifact, + updateExecution, + onStepFinish, + onStepFail, +}: { + jobExecution: JobExecution; executeStepAction: ( stepId: StepId, - ) => Promise>, + ) => Promise>; updateExecution: ( updater: (prev: Execution | null) => Execution | null, - ) => void, - updateArtifact: (artifactId: ArtifactId, content: TextArtifactObject) => void, -): Promise => { + ) => void; + updateArtifact: (artifactId: ArtifactId, content: TextArtifactObject) => void; + onStepFinish?: ( + stepExecution: CompletedStepExecution, + artifact: TextArtifact, + ) => void; + onStepFail?: (stepExecution: FailedStepExecution) => void; +}): Promise => { const jobRunStartedAt = Date.now(); // Start job execution @@ -283,8 +309,15 @@ const executeJob = async ( // Execute all steps in parallel const stepExecutions = await Promise.all( - jobExecution.stepExecutions.map((step) => - executeStep(step, executeStepAction, updateExecution, updateArtifact), + jobExecution.stepExecutions.map((stepExecution) => + executeStep({ + stepExecution, + executeStepAction, + updateExecution, + updateArtifact, + onStepFinish, + onStepFail, + }), ), ); @@ -401,6 +434,10 @@ export function ExecutionProvider({ artifactId: ArtifactId, content: TextArtifactObject, ) => void; + onStepFinish?: ( + stepExecution: CompletedStepExecution, + artifact: TextArtifact, + ) => void; } const performFlowExecution = useCallback( async ({ @@ -410,6 +447,7 @@ export function ExecutionProvider({ connections, executeStepCallback, updateArtifactCallback, + onStepFinish, }: ExecuteFlowParams) => { let currentExecution = initialExecution; let totalFlowDurationMs = 0; @@ -441,18 +479,25 @@ export function ExecutionProvider({ continue; } - const executedJob = await executeJob( + const executedJob = await executeJob({ jobExecution, - executeStepCallback, - (updater) => { + executeStepAction: executeStepCallback, + updateExecution: (updater) => { const updated = updater(currentExecution); if (updated) { currentExecution = updated; setExecution(updated); } }, - updateArtifactCallback, - ); + updateArtifact: updateArtifactCallback, + onStepFinish, + onStepFail: (failedStep) => { + addToast({ + type: "error", + message: failedStep.error, + }); + }, + }); totalFlowDurationMs += executedJob.durationMs; if (executedJob.status === "failed") { @@ -499,7 +544,7 @@ export function ExecutionProvider({ return currentExecution; }, - [dispatch, putExecutionAction], + [dispatch, putExecutionAction, addToast], ); const executeFlow = useCallback( @@ -662,6 +707,15 @@ export function ExecutionProvider({ }, }); }, + onStepFinish: (execution, artifact) => { + dispatch({ + type: "upsertArtifact", + input: { + nodeId, + artifact, + }, + }); + }, }); setExecution(finalExecution); }, diff --git a/app/(playground)/p/[agentId]/types.ts b/app/(playground)/p/[agentId]/types.ts index 67f8db59..52606eb3 100644 --- a/app/(playground)/p/[agentId]/types.ts +++ b/app/(playground)/p/[agentId]/types.ts @@ -162,7 +162,7 @@ export interface TextArtifactObject extends ArtifactObjectBase { completionTokens: number; }; } -interface TextArtifact extends GeneratedArtifact { +export interface TextArtifact extends GeneratedArtifact { object: TextArtifactObject; } interface TextStreamArtifact extends StreamAtrifact { From 0a50751e456291f86f3877f2491e9ad436c99b5e Mon Sep 17 00:00:00 2001 From: toyamarinyon Date: Thu, 19 Dec 2024 01:06:18 +0900 Subject: [PATCH 12/13] feat(execution): Set tab on node execution invocation - Update executeNode function to set the active tab to "Result" when a node is executed - Improve user experience by providing immediate feedback in the UI during execution --- app/(playground)/p/[agentId]/contexts/execution.tsx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/(playground)/p/[agentId]/contexts/execution.tsx b/app/(playground)/p/[agentId]/contexts/execution.tsx index 6cd096a6..9b1af535 100644 --- a/app/(playground)/p/[agentId]/contexts/execution.tsx +++ b/app/(playground)/p/[agentId]/contexts/execution.tsx @@ -661,6 +661,7 @@ export function ExecutionProvider({ const executeNode = useCallback( async (nodeId: NodeId) => { + setTab("Result"); const executionId = createExecutionId(); const flowRunStartedAt = Date.now(); const node = graph.nodes.find((node) => node.id === nodeId); @@ -720,6 +721,7 @@ export function ExecutionProvider({ setExecution(finalExecution); }, [ + setTab, executeNodeAction, graph.connections, graph.nodes, From 40439245152ac17f01d012997f41473bb2658a6d Mon Sep 17 00:00:00 2001 From: Satoshi Ebisawa Date: Thu, 19 Dec 2024 09:58:57 +0900 Subject: [PATCH 13/13] Make @/services/external/stripe be lazy initilization --- services/external/stripe/config.ts | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/services/external/stripe/config.ts b/services/external/stripe/config.ts index b58ffc4a..dfaa4bff 100644 --- a/services/external/stripe/config.ts +++ b/services/external/stripe/config.ts @@ -1,6 +1,21 @@ import { Stripe } from "stripe"; -export const stripe = new Stripe(process.env.STRIPE_SECRET_KEY as string, { - // https://github.com/stripe/stripe-node#configuration - apiVersion: "2024-11-20.acacia", -}); +let stripeInstance: Stripe | null = null; + +const handler: ProxyHandler = { + get: (_target, prop: keyof Stripe | symbol) => { + if (!stripeInstance) { + const key = process.env.STRIPE_SECRET_KEY; + if (!key) { + throw new Error("STRIPE_SECRET_KEY is not configured"); + } + stripeInstance = new Stripe(key, { + // https://github.com/stripe/stripe-node#configuration + apiVersion: "2024-11-20.acacia", + }); + } + return stripeInstance[prop as keyof Stripe]; + }, +}; + +export const stripe: Stripe = new Proxy(new Stripe("dummy"), handler);