Skip to content

Commit

Permalink
Simplify partition() by using wrapper function
Browse files Browse the repository at this point in the history
  • Loading branch information
Rindrics committed Dec 20, 2024
1 parent 7e14cbb commit 91de0ed
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 31 deletions.
51 changes: 20 additions & 31 deletions app/(playground)/p/[agentId]/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
ExternalServiceName,
VercelBlobOperation,
createLogger,
wrappedPartition as partition,
waitForTelemetryExport,
withCountMeasurement,
} from "@/lib/opentelemetry";
Expand All @@ -22,43 +23,31 @@ import {
import type { FileData, FileId, Graph } from "./types";

export async function parse(id: FileId, name: string, blobUrl: string) {
const startTime = Date.now();
const logger = createLogger("parse");

const measureParameters = {
startTime: Date.now(),
logger,
};
if (process.env.UNSTRUCTURED_API_KEY === undefined) {
throw new Error("UNSTRUCTURED_API_KEY is not set");
}
const client = new UnstructuredClient({
security: {
apiKeyAuth: process.env.UNSTRUCTURED_API_KEY,
},
});
const pdf = await getDocument(blobUrl).promise;
const content = await pdf.getData();
const strategy = Strategy.Fast;
const partitionResponse = await withCountMeasurement(
logger,
async () => {
const result = await client.general.partition({
partitionParameters: {
files: {
fileName: name,
content,
},
strategy,
splitPdfPage: false,
splitPdfConcurrencyLevel: 1,
},
});
return result;
},
ExternalServiceName.Unstructured,
startTime,
const partitionResponse = await partition(
new UnstructuredClient({
security: {
apiKeyAuth: process.env.UNSTRUCTURED_API_KEY,
},
}),
blobUrl,
{
strategy,
pdf,
fileName: name,
strategy: Strategy.Fast,
splitPdfPage: false,
splitPdfConcurrencyLevel: 1,
},
measureParameters,
);
const startTimePut = Date.now();
const startTime = Date.now();
if (partitionResponse.statusCode !== 200) {
console.error(partitionResponse.rawResponse);
throw new Error(`Failed to parse file: ${partitionResponse.statusCode}`);
Expand All @@ -80,7 +69,7 @@ export async function parse(id: FileId, name: string, blobUrl: string) {
};
},
ExternalServiceName.VercelBlob,
startTimePut,
startTime,
VercelBlobOperation.Put,
);

Expand Down
48 changes: 48 additions & 0 deletions lib/opentelemetry/wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { waitUntil } from "@vercel/functions";
import type { LanguageModelUsage } from "ai";
import type { LanguageModelV1 } from "ai";
import type { PDFDocumentProxy } from "pdfjs-dist";
import { getDocument } from "pdfjs-dist";
import type { UnstructuredClient } from "unstructured-client";
import type { PartitionResponse } from "unstructured-client/sdk/models/operations/partition";
import type { Strategy } from "unstructured-client/sdk/models/shared";
import { captureError } from "./log";
import type { LogSchema, OtelLoggerWrapper } from "./types";
Expand Down Expand Up @@ -251,6 +254,51 @@ export function withCountMeasurement<T>(
return withMeasurement(logger, operation, measurement, measurementStartTime);
}

export type PartitionParameters = {
fileName: string;
strategy: Strategy;
splitPdfPage: boolean;
splitPdfConcurrencyLevel: number;
};

export type MeasureParameters = {
logger: OtelLoggerWrapper;
startTime: number;
};

export async function wrappedPartition(
client: UnstructuredClient,
blobUrl: string,
{
fileName,
strategy,
splitPdfPage,
splitPdfConcurrencyLevel,
}: PartitionParameters,
{ logger, startTime }: MeasureParameters,
): Promise<PartitionResponse> {
const pdf = await getDocument(blobUrl).promise;
const pdfContent = new Blob([await pdf.getData()], {
type: "application/pdf",
});

return withCountMeasurement(
logger,
async () =>
client.general.partition({
partitionParameters: {
files: { fileName, content: pdfContent },
strategy,
splitPdfPage,
splitPdfConcurrencyLevel,
},
}),
ExternalServiceName.Unstructured,
startTime,
{ strategy, pdf },
).finally(() => waitForTelemetryExport());
}

export function withTokenMeasurement<T extends { usage: LanguageModelUsage }>(
logger: OtelLoggerWrapper,
operation: () => Promise<T>,
Expand Down

0 comments on commit 91de0ed

Please sign in to comment.