diff --git a/app/(playground)/p/[agentId]/beta-proto/flow/save-agent-activity.ts b/app/(playground)/p/[agentId]/beta-proto/flow/save-agent-activity.ts index af5967f7..6344cd2e 100644 --- a/app/(playground)/p/[agentId]/beta-proto/flow/save-agent-activity.ts +++ b/app/(playground)/p/[agentId]/beta-proto/flow/save-agent-activity.ts @@ -19,6 +19,6 @@ export async function saveAgentActivity(activity: AgentActivity) { agentDbId, startedAt: activity.startedAt, endedAt: activity.endedAt, - totalDurationMs: activity.totalDurationMs(), + totalDurationMs: activity.totalDurationMs().toString(), }); } diff --git a/app/(playground)/p/[agentId]/beta-proto/flow/server-action.ts b/app/(playground)/p/[agentId]/beta-proto/flow/server-action.ts index 76a05764..5eadb3c6 100644 --- a/app/(playground)/p/[agentId]/beta-proto/flow/server-action.ts +++ b/app/(playground)/p/[agentId]/beta-proto/flow/server-action.ts @@ -5,6 +5,11 @@ import { AgentActivity, hasEnoughAgentTimeCharge, } from "@/services/agents/activities"; +import { stripe } from "@/services/external/stripe"; +import { fetchCurrentTeam, isProPlan } from "@/services/teams"; +import { processUnreportedActivities } from "@/services/usage-based-billing"; +import { AgentTimeUsageDAO } from "@/services/usage-based-billing/agent-time-usage-dao"; +import { captureException } from "@sentry/nextjs"; import { put } from "@vercel/blob"; import { createStreamableValue } from "ai/rsc"; import { eq } from "drizzle-orm"; @@ -191,11 +196,35 @@ export async function executeFlow( stream.done(); agentActivity.end(); await saveAgentActivity(agentActivity); - })(); + if (agentActivity.endedAt == null) { + throw new Error("Activity must be ended before reporting"); + } + await reportActivityToStripe(agentActivity.endedAt); + })().catch((error) => { + console.error(error); + captureException(error); + }); return { streamableValue: stream.value }; } +async function reportActivityToStripe(targetDate: Date) { + const currentTeam = await fetchCurrentTeam(); + if (!isProPlan(currentTeam)) { + return; + } + return processUnreportedActivities( + { + teamDbId: currentTeam.dbId, + targetDate: targetDate, + }, + { + dao: new AgentTimeUsageDAO(db), + stripe: stripe, + }, + ); +} + interface PutFlowInput { flow: Flow; } diff --git a/app/webhooks/stripe/billing-meter/route.ts b/app/webhooks/stripe/billing-meter/route.ts new file mode 100644 index 00000000..e221ac97 --- /dev/null +++ b/app/webhooks/stripe/billing-meter/route.ts @@ -0,0 +1,93 @@ +/** + * Handle Stripe Billing Meter webhooks + * + * Error codes: https://docs.stripe.com/billing/subscriptions/usage-based/recording-usage-api#error-codes + */ +import { stripe } from "@/services/external/stripe"; +import { captureEvent, captureException } from "@sentry/nextjs"; +import type Stripe from "stripe"; + +const relevantEvents = new Set([ + "v1.billing.meter.error_report_triggered", + "v1.billing.meter.no_meter_found", +]); + +export async function POST(req: Request) { + const body = await req.text(); + const sig = req.headers.get("stripe-signature") as string; + const webhookSecret = process.env.STRIPE_BILLING_METER_WEBHOOK_SECRET; + let thinEvent: Stripe.ThinEvent; + + try { + if (!sig || !webhookSecret) + return new Response("Webhook secret not found.", { status: 400 }); + thinEvent = stripe.parseThinEvent(body, sig, webhookSecret); + console.log(`🔔 Webhook received: ${thinEvent.type}`); + } catch (err: unknown) { + console.log(`❌ Error: ${err}`); + captureException(err); + return new Response(`Webhook Error: ${err}`, { status: 400 }); + } + + if (!relevantEvents.has(thinEvent.type)) { + return new Response(`Unsupported event type: ${thinEvent.type}`, { + status: 400, + }); + } + + const event = await stripe.v2.core.events.retrieve(thinEvent.id); + try { + console.error( + ` + Stripe Billing Meter Error Report: + Period: ${event.data.validation_start} - ${event.data.validation_end} + Summary: ${event.data.developer_message_summary} + Error Count: ${event.data.reason.error_count}` + .trim() + .replace(/^\s+/gm, " "), + ); + + for (const errorType of event.data.reason.error_types) { + console.error( + `Error Type: ${errorType.code} (${errorType.error_count} occurrences)`, + ); + for (const error of errorType.sample_errors) { + console.error(` - ${error.error_message}`); + } + } + if ("related_object" in event && event.related_object != null) { + console.error( + ` + Related Object: + ID: ${event.related_object.id} + Type: ${event.related_object.type} + URL: ${event.related_object.url}` + .trim() + .replace(/^\s+/gm, " "), + ); + } + + captureEvent({ + message: "Stripe Billing Meter Error", + level: "error", + extra: { + validationPeriod: { + start: event.data.validation_start, + end: event.data.validation_end, + }, + summary: event.data.developer_message_summary, + errors: event.data.reason.error_types, + relatedObject: "related_object" in event ? event.related_object : null, + }, + }); + } catch (error) { + console.log(error); + return new Response( + "Webhook handler failed. View your Next.js function logs.", + { + status: 400, + }, + ); + } + return new Response(JSON.stringify({ received: true })); +} diff --git a/drizzle/schema.ts b/drizzle/schema.ts index 4fe39b07..74f3b974 100644 --- a/drizzle/schema.ts +++ b/drizzle/schema.ts @@ -477,9 +477,7 @@ export const agentActivities = pgTable( .references(() => agents.dbId, { onDelete: "cascade" }), startedAt: timestamp("started_at").notNull(), endedAt: timestamp("ended_at").notNull(), - // This would be greater than int32 max, but will not be greater than int64 max. - // so, we can safely use number type. - totalDurationMs: numeric("total_duration_ms").$type().notNull(), + totalDurationMs: numeric("total_duration_ms").notNull(), usageReportDbId: integer("usage_report_db_id").references( () => agentTimeUsageReports.dbId, ), @@ -497,11 +495,7 @@ export const agentTimeUsageReports = pgTable( teamDbId: integer("team_db_id") .notNull() .references(() => teams.dbId, { onDelete: "cascade" }), - // This would be greater than int32 max, but will not be greater than int64 max. - // so, we can safely use number type. - accumulatedDurationMs: numeric("accumulated_duration_ms") - .$type() - .notNull(), + accumulatedDurationMs: numeric("accumulated_duration_ms").notNull(), minutesIncrement: integer("minutes_increment").notNull(), stripeMeterEventId: text("stripe_meter_event_id").notNull(), timestamp: timestamp("created_at").defaultNow().notNull(), diff --git a/services/usage-based-billing/agent-time-usage-dao.ts b/services/usage-based-billing/agent-time-usage-dao.ts index d082410d..37e5b6be 100644 --- a/services/usage-based-billing/agent-time-usage-dao.ts +++ b/services/usage-based-billing/agent-time-usage-dao.ts @@ -79,7 +79,12 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess { ), ) .orderBy(agentActivities.dbId); - return activities; + return activities.map((activity) => ({ + dbId: activity.dbId, + totalDurationMs: safeStringToNumber(activity.totalDurationMs), + endedAt: activity.endedAt, + usageReportDbId: activity.usageReportDbId, + })); } async findLastUsageReport( @@ -102,7 +107,16 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess { if (reports.length === 0) { return null; } - return reports[0]; + return { + dbId: reports[0].dbId, + teamDbId: reports[0].teamDbId, + accumulatedDurationMs: safeStringToNumber( + reports[0].accumulatedDurationMs, + ), + minutesIncrement: reports[0].minutesIncrement, + stripeMeterEventId: reports[0].stripeMeterEventId, + timestamp: reports[0].timestamp, + }; } async createUsageReport(params: { @@ -116,7 +130,7 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess { .insert(agentTimeUsageReports) .values({ teamDbId: params.teamDbId, - accumulatedDurationMs: params.accumulatedDurationMs, + accumulatedDurationMs: params.accumulatedDurationMs.toString(), minutesIncrement: params.minutesIncrement, stripeMeterEventId: params.stripeMeterEventId, timestamp: params.timestamp, @@ -125,7 +139,14 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess { if (report == null) { throw new Error("Failed to create usage report"); } - return report; + return { + dbId: report.dbId, + teamDbId: report.teamDbId, + accumulatedDurationMs: safeStringToNumber(report.accumulatedDurationMs), + minutesIncrement: report.minutesIncrement, + stripeMeterEventId: report.stripeMeterEventId, + timestamp: report.timestamp, + }; } async markActivitiesAsProcessed( @@ -143,3 +164,11 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess { .where(inArray(agentActivities.dbId, activityIds)); } } + +function safeStringToNumber(value: string): number { + const num = Number(value); + if (num > Number.MAX_SAFE_INTEGER) { + throw new Error(`Value exceeds MAX_SAFE_INTEGER: ${value}`); + } + return num; +}