Skip to content

Commit

Permalink
set the limit for scraping concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
toyamarinyon committed Oct 21, 2024
1 parent 97a3c2f commit 3066e22
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 31 deletions.
1 change: 1 addition & 0 deletions app/(playground)/p/[agentId]/beta-proto/graph/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ export const generateText =
url: item.url,
title: item.title,
content: webSearchData,
relevance: item.relevance,
});
}
}),
Expand Down
106 changes: 76 additions & 30 deletions app/(playground)/p/[agentId]/beta-proto/web-search/server-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,39 @@ import Langfuse from "langfuse";
import type { GiselleNode } from "../giselle-node/types";
import { webSearchSchema } from "./schema";
import { search } from "./tavily";
import { type WebSearch, webSearchItemStatus, webSearchStatus } from "./types";
import {
type WebSearch,
type WebSearchItemReference,
webSearchItemStatus,
webSearchStatus,
} from "./types";

async function limitConcurrency<T>(
tasks: (() => Promise<T>)[],
maxConcurrent: number,
): Promise<T[]> {
const results: T[] = [];
const runningTasks = new Set<Promise<void>>();

for (const task of tasks) {
if (runningTasks.size >= maxConcurrent) {
await Promise.race(runningTasks);
}

const runningTask = (async () => {
try {
results.push(await task());
} finally {
runningTasks.delete(runningTask);
}
})();

runningTasks.add(runningTask);
}

await Promise.all(runningTasks);
return results;
}

interface GenerateWebSearchStreamInputs {
userPrompt: string;
Expand Down Expand Up @@ -99,37 +131,51 @@ export async function generateWebSearchStream(
}
const app = new FirecrawlApp({ apiKey: process.env.FIRECRAWL_API_KEY });
let mutableItems = webSearch.items;
const numberOfSubArrays = 5;
const subArrayLength = Math.ceil(
webSearch.items.length / numberOfSubArrays,
);
const chunkedArray: WebSearchItemReference[][] = [];

for (let i = 0; i < numberOfSubArrays; i++) {
chunkedArray.push(
webSearch.items.slice(i * subArrayLength, (i + 1) * subArrayLength),
);
}

await Promise.all(
webSearch.items.map(async (webSearchItem) => {
const scrapeResponse = await app.scrapeUrl(webSearchItem.url, {
formats: ["markdown"],
});
if (scrapeResponse.success) {
const blob = await put(
`webSearch/${webSearchItem.id}.md`,
scrapeResponse.markdown ?? "",
{
access: "public",
contentType: "text/markdown",
},
);
mutableItems = mutableItems.map((item) => {
if (item.id !== webSearchItem.id) {
return item;
}
return {
...webSearchItem,
contentBlobUrl: blob.url,
status: webSearchItemStatus.completed,
};
});
stream.update({
...result,
webSearch: {
...webSearch,
items: mutableItems,
},
chunkedArray.map(async (webSearchItems) => {
for (const webSearchItem of webSearchItems) {
const scrapeResponse = await app.scrapeUrl(webSearchItem.url, {
formats: ["markdown"],
});
if (scrapeResponse.success) {
const blob = await put(
`webSearch/${webSearchItem.id}.md`,
scrapeResponse.markdown ?? "",
{
access: "public",
contentType: "text/markdown",
},
);
mutableItems = mutableItems.map((item) => {
if (item.id !== webSearchItem.id) {
return item;
}
return {
...webSearchItem,
contentBlobUrl: blob.url,
status: webSearchItemStatus.completed,
};
});
stream.update({
...result,
webSearch: {
...webSearch,
items: mutableItems,
},
});
}
}
}),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export async function search(query: string) {
include_images: false,
include_image_descriptions: false,
include_raw_content: false,
max_results: 1,
max_results: 3,
include_domains: [],
exclude_domains: [],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export interface WebSearchItem {
title: string;
content: string;
url: string;
relevance: number;
}
interface PendingWebSearchItemReference {
id: WebSearchContentId;
Expand Down

0 comments on commit 3066e22

Please sign in to comment.