diff --git a/app/(playground)/p/[agentId]/actions.ts b/app/(playground)/p/[agentId]/actions.ts index 9c3fe792..7a6ecf19 100644 --- a/app/(playground)/p/[agentId]/actions.ts +++ b/app/(playground)/p/[agentId]/actions.ts @@ -4,6 +4,7 @@ import { ExternalServiceName, VercelBlobOperation, createLogger, + wrappedPartition as partition, waitForTelemetryExport, withCountMeasurement, } from "@/lib/opentelemetry"; @@ -22,43 +23,31 @@ import { import type { FileData, FileId, Graph } from "./types"; export async function parse(id: FileId, name: string, blobUrl: string) { - const startTime = Date.now(); const logger = createLogger("parse"); + + const measureParameters = { + startTime: Date.now(), + logger, + }; if (process.env.UNSTRUCTURED_API_KEY === undefined) { throw new Error("UNSTRUCTURED_API_KEY is not set"); } - const client = new UnstructuredClient({ - security: { - apiKeyAuth: process.env.UNSTRUCTURED_API_KEY, - }, - }); - const pdf = await getDocument(blobUrl).promise; - const content = await pdf.getData(); - const strategy = Strategy.Fast; - const partitionResponse = await withCountMeasurement( - logger, - async () => { - const result = await client.general.partition({ - partitionParameters: { - files: { - fileName: name, - content, - }, - strategy, - splitPdfPage: false, - splitPdfConcurrencyLevel: 1, - }, - }); - return result; - }, - ExternalServiceName.Unstructured, - startTime, + const partitionResponse = await partition( + new UnstructuredClient({ + security: { + apiKeyAuth: process.env.UNSTRUCTURED_API_KEY, + }, + }), + blobUrl, { - strategy, - pdf, + fileName: name, + strategy: Strategy.Fast, + splitPdfPage: false, + splitPdfConcurrencyLevel: 1, }, + measureParameters, ); - const startTimePut = Date.now(); + const startTime = Date.now(); if (partitionResponse.statusCode !== 200) { console.error(partitionResponse.rawResponse); throw new Error(`Failed to parse file: ${partitionResponse.statusCode}`); @@ -80,7 +69,7 @@ export async function parse(id: FileId, name: string, blobUrl: string) { }; }, ExternalServiceName.VercelBlob, - startTimePut, + startTime, VercelBlobOperation.Put, ); diff --git a/lib/opentelemetry/wrapper.ts b/lib/opentelemetry/wrapper.ts index d03d0e0b..81b4b986 100644 --- a/lib/opentelemetry/wrapper.ts +++ b/lib/opentelemetry/wrapper.ts @@ -3,6 +3,9 @@ import { waitUntil } from "@vercel/functions"; import type { LanguageModelUsage } from "ai"; import type { LanguageModelV1 } from "ai"; import type { PDFDocumentProxy } from "pdfjs-dist"; +import { getDocument } from "pdfjs-dist"; +import type { UnstructuredClient } from "unstructured-client"; +import type { PartitionResponse } from "unstructured-client/sdk/models/operations/partition"; import type { Strategy } from "unstructured-client/sdk/models/shared"; import { captureError } from "./log"; import type { LogSchema, OtelLoggerWrapper } from "./types"; @@ -251,6 +254,51 @@ export function withCountMeasurement( return withMeasurement(logger, operation, measurement, measurementStartTime); } +export type PartitionParameters = { + fileName: string; + strategy: Strategy; + splitPdfPage: boolean; + splitPdfConcurrencyLevel: number; +}; + +export type MeasureParameters = { + logger: OtelLoggerWrapper; + startTime: number; +}; + +export async function wrappedPartition( + client: UnstructuredClient, + blobUrl: string, + { + fileName, + strategy, + splitPdfPage, + splitPdfConcurrencyLevel, + }: PartitionParameters, + { logger, startTime }: MeasureParameters, +): Promise { + const pdf = await getDocument(blobUrl).promise; + const pdfContent = new Blob([await pdf.getData()], { + type: "application/pdf", + }); + + return withCountMeasurement( + logger, + async () => + client.general.partition({ + partitionParameters: { + files: { fileName, content: pdfContent }, + strategy, + splitPdfPage, + splitPdfConcurrencyLevel, + }, + }), + ExternalServiceName.Unstructured, + startTime, + { strategy, pdf }, + ).finally(() => waitForTelemetryExport()); +} + export function withTokenMeasurement( logger: OtelLoggerWrapper, operation: () => Promise,