Skip to content

Commit

Permalink
Define processStream()
Browse files Browse the repository at this point in the history
  • Loading branch information
Rindrics committed Oct 18, 2024
1 parent 50bc3b3 commit 52bac99
Showing 1 changed file with 20 additions and 22 deletions.
42 changes: 20 additions & 22 deletions app/(playground)/p/[agentId]/beta-proto/graph/server-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const flushMetricsAndShutdown = async (lf: Langfuse, metricReader: any) => {
reject(new Error("Metric flush timeout after 20 seconds"));
}, 20000);

console.log("inside waitUntil()-----")
console.log("inside waitUntil()-----");
Promise.all([metricReader.forceFlush(), lf.shutdownAsync()])
.then(() => {
clearTimeout(timeoutId);
Expand All @@ -54,7 +54,7 @@ export async function generateArtifactStream(
});
const stream = createStreamableValue();

(async () => {
const processStream = async () => {
const model = "gpt-4o-mini";
const generation = trace.generation({
input: params.userPrompt,
Expand All @@ -76,23 +76,6 @@ export async function generateArtifactStream(
subscriptionId,
isR06User,
});
console.log("before waitUntil()-----")
waitUntil(
flushMetricsAndShutdown(lf, metricReader).catch((error) => {
if (error.message === "Metric flush timeout after 20 seconds") {
console.error(
"Metric flush and Langfuse shutdown timed out:",
error,
);
} else {
console.error(
"Error during metric flush and Langfuse shutdown:",
error,
);
}
}),
);
console.log("after waitUntil()-----")
generation.end({
output: result,
});
Expand All @@ -106,9 +89,24 @@ export async function generateArtifactStream(
const result = await object;

stream.done();
})();

console.log("before return-----")
};

await processStream();

console.log("before waitUntil()-----");
waitUntil(
flushMetricsAndShutdown(lf, metricReader).catch((error) => {
if (error.message === "Metric flush timeout after 20 seconds") {
console.error("Metric flush and Langfuse shutdown timed out:", error);
} else {
console.error(
"Error during metric flush and Langfuse shutdown:",
error,
);
}
}),
);
console.log("returning");

return { object: stream.value };
}
Expand Down

0 comments on commit 52bac99

Please sign in to comment.