diff --git a/app/(playground)/p/[agentId]/actions.ts b/app/(playground)/p/[agentId]/actions.ts index 458275fa..290fd684 100644 --- a/app/(playground)/p/[agentId]/actions.ts +++ b/app/(playground)/p/[agentId]/actions.ts @@ -4,6 +4,7 @@ import { ExternalServiceName, VercelBlobOperation, createLogger, + wrappedPartition as partition, waitForTelemetryExport, withCountMeasurement, } from "@/lib/opentelemetry"; @@ -21,29 +22,31 @@ import { import type { FileData, FileId, Graph } from "./types"; export async function parse(id: FileId, name: string, blobUrl: string) { - const startTime = Date.now(); const logger = createLogger("parse"); + + const measureParameters = { + startTime: Date.now(), + logger, + }; if (process.env.UNSTRUCTURED_API_KEY === undefined) { throw new Error("UNSTRUCTURED_API_KEY is not set"); } - const client = new UnstructuredClient({ - security: { - apiKeyAuth: process.env.UNSTRUCTURED_API_KEY, - }, - }); - const response = await fetch(blobUrl); - const content = await response.blob(); - const partitionResponse = await client.general.partition({ - partitionParameters: { - files: { - fileName: name, - content, + const partitionResponse = await partition( + new UnstructuredClient({ + security: { + apiKeyAuth: process.env.UNSTRUCTURED_API_KEY, }, + }), + blobUrl, + { + fileName: name, strategy: Strategy.Fast, splitPdfPage: false, splitPdfConcurrencyLevel: 1, }, - }); + measureParameters, + ); + const startTime = Date.now(); if (partitionResponse.statusCode !== 200) { console.error(partitionResponse.rawResponse); throw new Error(`Failed to parse file: ${partitionResponse.statusCode}`); diff --git a/app/(playground)/p/[agentId]/prev/beta-proto/files/server-actions.ts b/app/(playground)/p/[agentId]/prev/beta-proto/files/server-actions.ts index d208271e..439d6536 100644 --- a/app/(playground)/p/[agentId]/prev/beta-proto/files/server-actions.ts +++ b/app/(playground)/p/[agentId]/prev/beta-proto/files/server-actions.ts @@ -8,6 +8,7 @@ import { withCountMeasurement, } from "@/lib/opentelemetry"; import { put } from "@vercel/blob"; +import { PDFDocument } from "pdf-lib"; import { UnstructuredClient } from "unstructured-client"; import { Strategy } from "unstructured-client/sdk/models/shared"; import { elementsToMarkdown } from "../utils/unstructured"; @@ -80,8 +81,8 @@ export async function parseFile(args: ParseFileInput) { const strategy = Strategy.Fast; const partitionResponse = await withCountMeasurement( logger, - () => - client.general.partition({ + async () => { + const result = await client.general.partition({ partitionParameters: { files: { fileName: args.name, @@ -91,10 +92,15 @@ export async function parseFile(args: ParseFileInput) { splitPdfPage: false, splitPdfConcurrencyLevel: 1, }, - }), + }); + return result; + }, ExternalServiceName.Unstructured, startTime, - strategy, + { + strategy, + pdf: await PDFDocument.load(await content.arrayBuffer()), + }, ); if (partitionResponse.statusCode !== 200) { console.error(partitionResponse.rawResponse); diff --git a/bun.lockb b/bun.lockb index f1df90c6..c635246a 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/lib/opentelemetry/types.ts b/lib/opentelemetry/types.ts index 89751226..46063121 100644 --- a/lib/opentelemetry/types.ts +++ b/lib/opentelemetry/types.ts @@ -60,6 +60,7 @@ const BasicRequestCountSchema = RequestCount.extend({ const UnstructuredRequestCountSchema = RequestCount.extend({ externalServiceName: z.literal(ExternalServiceName.Unstructured), strategy: z.nativeEnum(Strategy), + numPages: z.number(), // number of PDF pages to handle }); const VercelBlobPutSchema = RequestCount.extend({ diff --git a/lib/opentelemetry/wrapper.ts b/lib/opentelemetry/wrapper.ts index 82596fbc..a01955c8 100644 --- a/lib/opentelemetry/wrapper.ts +++ b/lib/opentelemetry/wrapper.ts @@ -2,6 +2,9 @@ import { getCurrentMeasurementScope, isRoute06User } from "@/app/(auth)/lib"; import { waitUntil } from "@vercel/functions"; import type { LanguageModelUsage } from "ai"; import type { LanguageModelV1 } from "ai"; +import { PDFDocument } from "pdf-lib"; +import type { UnstructuredClient } from "unstructured-client"; +import type { PartitionResponse } from "unstructured-client/sdk/models/operations/partition"; import type { Strategy } from "unstructured-client/sdk/models/shared"; import { captureError } from "./log"; import type { LogSchema, OtelLoggerWrapper } from "./types"; @@ -148,19 +151,30 @@ export const VercelBlobOperation = { type VercelBlobOperationType = (typeof VercelBlobOperation)[keyof typeof VercelBlobOperation]; +interface UnstructuredOptions { + strategy: Strategy; + pdf: PDFDocument; +} + +type ServiceOptions = UnstructuredOptions | VercelBlobOperationType | undefined; + +function getNumPages(pdf: PDFDocument) { + return pdf.getPages().length; +} + export function withCountMeasurement( logger: OtelLoggerWrapper, operation: () => Promise, externalServiceName: typeof APICallBasedService.Unstructured, measurementStartTime: number | undefined, - strategy: Strategy, + serviceOptions: UnstructuredOptions, ): Promise; export function withCountMeasurement( logger: OtelLoggerWrapper, operation: () => Promise, externalServiceName: typeof APICallBasedService.VercelBlob, measurementStartTime: number | undefined, - blobOperation: VercelBlobOperationType, + serviceOptions: VercelBlobOperationType, ): Promise; export function withCountMeasurement( logger: OtelLoggerWrapper, @@ -175,7 +189,7 @@ export function withCountMeasurement( operation: () => Promise, externalServiceName: (typeof APICallBasedService)[keyof typeof APICallBasedService], measurementStartTime?: number, - strategyOrOptions?: Strategy | VercelBlobOperationType | undefined, + serviceOptions?: ServiceOptions, ): Promise { const measurement: MeasurementSchema = ( result, @@ -189,23 +203,38 @@ export function withCountMeasurement( isR06User, requestCount: 1, }; - if (externalServiceName === APICallBasedService.Unstructured) { - if (!strategyOrOptions) { + const unstructuredOptions = serviceOptions as UnstructuredOptions; + if ( + !unstructuredOptions || + !unstructuredOptions.strategy || + !unstructuredOptions.pdf + ) { logger.error( - new Error("'strategy' is required for Unstructured service"), + new Error( + "'strategy' and 'numPages' are required for Unstructured service", + ), "missing required strategy parameter", ); } return { ...baseMetrics, externalServiceName, - strategy: strategyOrOptions as Strategy, + strategy: unstructuredOptions.strategy, + numPages: getNumPages(unstructuredOptions.pdf), }; } if (externalServiceName === APICallBasedService.VercelBlob) { - const operation = strategyOrOptions as VercelBlobOperationType; + const operation = serviceOptions as VercelBlobOperationType; + if (!operation) { + logger.error( + new Error( + "'VercelBlobOperationType' is required for VercelBlob service", + ), + "missing required VercelBlobOperationType parameter", + ); + } const operationResult = operation.measure(result as { size: number }); return { ...baseMetrics, @@ -224,6 +253,48 @@ export function withCountMeasurement( return withMeasurement(logger, operation, measurement, measurementStartTime); } +export type PartitionParameters = { + fileName: string; + strategy: Strategy; + splitPdfPage: boolean; + splitPdfConcurrencyLevel: number; +}; + +export type MeasureParameters = { + logger: OtelLoggerWrapper; + startTime: number; +}; + +export async function wrappedPartition( + client: UnstructuredClient, + blobUrl: string, + { + fileName, + strategy, + splitPdfPage, + splitPdfConcurrencyLevel, + }: PartitionParameters, + { logger, startTime }: MeasureParameters, +): Promise { + const content = await fetch(blobUrl).then((response) => response.blob()); + + return withCountMeasurement( + logger, + async () => + client.general.partition({ + partitionParameters: { + files: { fileName, content }, + strategy, + splitPdfPage, + splitPdfConcurrencyLevel, + }, + }), + ExternalServiceName.Unstructured, + startTime, + { strategy, pdf: await PDFDocument.load(await content.arrayBuffer()) }, + ).finally(() => waitForTelemetryExport()); +} + export function withTokenMeasurement( logger: OtelLoggerWrapper, operation: () => Promise, diff --git a/package.json b/package.json index c2e6c5d7..1fe1cd86 100644 --- a/package.json +++ b/package.json @@ -80,6 +80,7 @@ "next-auth": "^5.0.0-beta.20", "next-themes": "0.3.0", "openai": "4.64.0", + "pdf-lib": "1.17.1", "pino": "9.5.0", "posthog-js": "1.194.2", "react": "19.0.0-rc-4d577fd2-20241104",