Skip to content

Commit

Permalink
Merge pull request #238 from giselles-ai/report-agent-time-usage-to-s…
Browse files Browse the repository at this point in the history
…tripe-2

Send meterEvent to Stripe (Agent v1)
  • Loading branch information
satococoa authored Dec 17, 2024
2 parents acf8b15 + 6e29980 commit 8402587
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ export async function saveAgentActivity(activity: AgentActivity) {
agentDbId,
startedAt: activity.startedAt,
endedAt: activity.endedAt,
totalDurationMs: activity.totalDurationMs(),
totalDurationMs: activity.totalDurationMs().toString(),
});
}
31 changes: 30 additions & 1 deletion app/(playground)/p/[agentId]/beta-proto/flow/server-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import {
AgentActivity,
hasEnoughAgentTimeCharge,
} from "@/services/agents/activities";
import { stripe } from "@/services/external/stripe";
import { fetchCurrentTeam, isProPlan } from "@/services/teams";
import { processUnreportedActivities } from "@/services/usage-based-billing";
import { AgentTimeUsageDAO } from "@/services/usage-based-billing/agent-time-usage-dao";
import { captureException } from "@sentry/nextjs";
import { put } from "@vercel/blob";
import { createStreamableValue } from "ai/rsc";
import { eq } from "drizzle-orm";
Expand Down Expand Up @@ -191,11 +196,35 @@ export async function executeFlow(
stream.done();
agentActivity.end();
await saveAgentActivity(agentActivity);
})();
if (agentActivity.endedAt == null) {
throw new Error("Activity must be ended before reporting");
}
await reportActivityToStripe(agentActivity.endedAt);
})().catch((error) => {
console.error(error);
captureException(error);
});

return { streamableValue: stream.value };
}

async function reportActivityToStripe(targetDate: Date) {
const currentTeam = await fetchCurrentTeam();
if (!isProPlan(currentTeam)) {
return;
}
return processUnreportedActivities(
{
teamDbId: currentTeam.dbId,
targetDate: targetDate,
},
{
dao: new AgentTimeUsageDAO(db),
stripe: stripe,
},
);
}

interface PutFlowInput {
flow: Flow;
}
Expand Down
93 changes: 93 additions & 0 deletions app/webhooks/stripe/billing-meter/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Handle Stripe Billing Meter webhooks
*
* Error codes: https://docs.stripe.com/billing/subscriptions/usage-based/recording-usage-api#error-codes
*/
import { stripe } from "@/services/external/stripe";
import { captureEvent, captureException } from "@sentry/nextjs";
import type Stripe from "stripe";

const relevantEvents = new Set([
"v1.billing.meter.error_report_triggered",
"v1.billing.meter.no_meter_found",
]);

export async function POST(req: Request) {
const body = await req.text();
const sig = req.headers.get("stripe-signature") as string;
const webhookSecret = process.env.STRIPE_BILLING_METER_WEBHOOK_SECRET;
let thinEvent: Stripe.ThinEvent;

try {
if (!sig || !webhookSecret)
return new Response("Webhook secret not found.", { status: 400 });
thinEvent = stripe.parseThinEvent(body, sig, webhookSecret);
console.log(`🔔 Webhook received: ${thinEvent.type}`);
} catch (err: unknown) {
console.log(`❌ Error: ${err}`);
captureException(err);
return new Response(`Webhook Error: ${err}`, { status: 400 });
}

if (!relevantEvents.has(thinEvent.type)) {
return new Response(`Unsupported event type: ${thinEvent.type}`, {
status: 400,
});
}

const event = await stripe.v2.core.events.retrieve(thinEvent.id);
try {
console.error(
`
Stripe Billing Meter Error Report:
Period: ${event.data.validation_start} - ${event.data.validation_end}
Summary: ${event.data.developer_message_summary}
Error Count: ${event.data.reason.error_count}`
.trim()
.replace(/^\s+/gm, " "),
);

for (const errorType of event.data.reason.error_types) {
console.error(
`Error Type: ${errorType.code} (${errorType.error_count} occurrences)`,
);
for (const error of errorType.sample_errors) {
console.error(` - ${error.error_message}`);
}
}
if ("related_object" in event && event.related_object != null) {
console.error(
`
Related Object:
ID: ${event.related_object.id}
Type: ${event.related_object.type}
URL: ${event.related_object.url}`
.trim()
.replace(/^\s+/gm, " "),
);
}

captureEvent({
message: "Stripe Billing Meter Error",
level: "error",
extra: {
validationPeriod: {
start: event.data.validation_start,
end: event.data.validation_end,
},
summary: event.data.developer_message_summary,
errors: event.data.reason.error_types,
relatedObject: "related_object" in event ? event.related_object : null,
},
});
} catch (error) {
console.log(error);
return new Response(
"Webhook handler failed. View your Next.js function logs.",
{
status: 400,
},
);
}
return new Response(JSON.stringify({ received: true }));
}
10 changes: 2 additions & 8 deletions drizzle/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,7 @@ export const agentActivities = pgTable(
.references(() => agents.dbId, { onDelete: "cascade" }),
startedAt: timestamp("started_at").notNull(),
endedAt: timestamp("ended_at").notNull(),
// This would be greater than int32 max, but will not be greater than int64 max.
// so, we can safely use number type.
totalDurationMs: numeric("total_duration_ms").$type<number>().notNull(),
totalDurationMs: numeric("total_duration_ms").notNull(),
usageReportDbId: integer("usage_report_db_id").references(
() => agentTimeUsageReports.dbId,
),
Expand All @@ -497,11 +495,7 @@ export const agentTimeUsageReports = pgTable(
teamDbId: integer("team_db_id")
.notNull()
.references(() => teams.dbId, { onDelete: "cascade" }),
// This would be greater than int32 max, but will not be greater than int64 max.
// so, we can safely use number type.
accumulatedDurationMs: numeric("accumulated_duration_ms")
.$type<number>()
.notNull(),
accumulatedDurationMs: numeric("accumulated_duration_ms").notNull(),
minutesIncrement: integer("minutes_increment").notNull(),
stripeMeterEventId: text("stripe_meter_event_id").notNull(),
timestamp: timestamp("created_at").defaultNow().notNull(),
Expand Down
37 changes: 33 additions & 4 deletions services/usage-based-billing/agent-time-usage-dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess {
),
)
.orderBy(agentActivities.dbId);
return activities;
return activities.map((activity) => ({
dbId: activity.dbId,
totalDurationMs: safeStringToNumber(activity.totalDurationMs),
endedAt: activity.endedAt,
usageReportDbId: activity.usageReportDbId,
}));
}

async findLastUsageReport(
Expand All @@ -102,7 +107,16 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess {
if (reports.length === 0) {
return null;
}
return reports[0];
return {
dbId: reports[0].dbId,
teamDbId: reports[0].teamDbId,
accumulatedDurationMs: safeStringToNumber(
reports[0].accumulatedDurationMs,
),
minutesIncrement: reports[0].minutesIncrement,
stripeMeterEventId: reports[0].stripeMeterEventId,
timestamp: reports[0].timestamp,
};
}

async createUsageReport(params: {
Expand All @@ -116,7 +130,7 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess {
.insert(agentTimeUsageReports)
.values({
teamDbId: params.teamDbId,
accumulatedDurationMs: params.accumulatedDurationMs,
accumulatedDurationMs: params.accumulatedDurationMs.toString(),
minutesIncrement: params.minutesIncrement,
stripeMeterEventId: params.stripeMeterEventId,
timestamp: params.timestamp,
Expand All @@ -125,7 +139,14 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess {
if (report == null) {
throw new Error("Failed to create usage report");
}
return report;
return {
dbId: report.dbId,
teamDbId: report.teamDbId,
accumulatedDurationMs: safeStringToNumber(report.accumulatedDurationMs),
minutesIncrement: report.minutesIncrement,
stripeMeterEventId: report.stripeMeterEventId,
timestamp: report.timestamp,
};
}

async markActivitiesAsProcessed(
Expand All @@ -143,3 +164,11 @@ export class AgentTimeUsageDAO implements AgentTimeUsageDataAccess {
.where(inArray(agentActivities.dbId, activityIds));
}
}

function safeStringToNumber(value: string): number {
const num = Number(value);
if (num > Number.MAX_SAFE_INTEGER) {
throw new Error(`Value exceeds MAX_SAFE_INTEGER: ${value}`);
}
return num;
}

0 comments on commit 8402587

Please sign in to comment.