Skip to content

Commit

Permalink
Wait until telemetry data are flushed and exported
Browse files Browse the repository at this point in the history
  • Loading branch information
Rindrics committed Nov 4, 2024
1 parent d1c1439 commit ebfe2d3
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ import { streamObject } from "ai";
import { createStreamableValue } from "ai/rsc";

import { getUserSubscriptionId, isRoute06User } from "@/app/(auth)/lib";
import {
flushTelemetry,
log,
loggerProvider,
metricReader,
} from "@/instrumentation.node";
import { metrics } from "@opentelemetry/api";
import { waitUntil } from "@vercel/functions";
import { Langfuse } from "langfuse";
import { schema as artifactSchema } from "../artifact/schema";
import type { SourceIndex } from "../source/types";
Expand Down Expand Up @@ -77,6 +84,8 @@ ${sourcesToText(sources)}
output: result,
});
await lf.shutdownAsync();

waitUntil(flushTelemetry());
},
});

Expand Down
50 changes: 50 additions & 0 deletions instrumentation.node.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import {
DiagConsoleLogger,
DiagLogLevel,
diag,
metrics,
trace,
} from "@opentelemetry/api";
import { SeverityNumber } from "@opentelemetry/api-logs";
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http";
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
Expand Down Expand Up @@ -69,3 +77,45 @@ registerOTel({

export const logger = loggerProvider.getLogger("giselle");
console.log("-- OTEL registered with metrics, traces, and logs --");

diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

export function log(
severity: SeverityNumber,
message: string,
attributes?: Record<string, string>,
) {
const consoleMethod =
severity <= SeverityNumber.INFO ? console.log : console.error;
consoleMethod(message, attributes);

logger.emit({ severityNumber: severity, body: message, attributes });
}

export async function flushTelemetry() {
try {
log(SeverityNumber.INFO, "Exporting telemetry data", {
runtime: process.env.NEXT_RUNTIME ?? "",
environment: process.env.VERCEL_ENV ?? "",
});

await Promise.all([
metricReader.forceFlush(),
loggerProvider.forceFlush(),
spanProcessor.forceFlush(),
new Promise((resolve) => setTimeout(resolve, 10000)), // wait for exporting
]);

log(SeverityNumber.INFO, "flushTelemetry() completed", {
runtime: process.env.NEXT_RUNTIME ?? "",
environment: process.env.VERCEL_ENV ?? "",
});
} catch (error) {
log(SeverityNumber.ERROR, "Error in flushTelemetry():", {
error: error instanceof Error ? error.message : String(error),
runtime: process.env.NEXT_RUNTIME ?? "",
environment: process.env.VERCEL_ENV ?? "",
});
throw error;
}
}

0 comments on commit ebfe2d3

Please sign in to comment.