From 429f6fae96634b277477b0796dd5d9b72d8cb3e3 Mon Sep 17 00:00:00 2001 From: Connor <36115510+ScarFX@users.noreply.github.com> Date: Mon, 30 Sep 2024 13:13:41 -0400 Subject: [PATCH 1/2] refactor: Async batch processing and env var variables for limits/configuring batching --- README.md | 5 +++++ config.py | 6 ++++++ main.py | 14 ++++++++++---- process_docs.py | 38 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 4 deletions(-) create mode 100644 process_docs.py diff --git a/README.md b/README.md index 37bdcdf8..c0499cc7 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,11 @@ The following environment variables are required to run the application: - `COLLECTION_NAME`: (Optional) The name of the collection in the vector store. Default value is "testcollection". - `CHUNK_SIZE`: (Optional) The size of the chunks for text processing. Default value is "1500". - `CHUNK_OVERLAP`: (Optional) The overlap between chunks during text processing. Default value is "100". +- `MAX_CHUNKS`: (Optional) The max number of chunks to process in one file. Default is unlimited. +- `EMBEDDING_TIMEOUT`: (Optional) The time limit for processsing (embedding) chunks. Default value is "100000" (100s) +- `BATCH_SIZE`: (Optional) The number of chunks to embed and add to vector storage in each api call. Default value is "75" + - Note: Ideal (fastest total embedding time) `BATCH_SIZE` will depend on embeddings provider, model, and file size +- `CONCURRENT_LIMIT`: The max number of async embedding api calls allowed concurrently. Default value is 20 - `RAG_UPLOAD_DIR`: (Optional) The directory where uploaded files are stored. Default value is "./uploads/". - `PDF_EXTRACT_IMAGES`: (Optional) A boolean value indicating whether to extract images from PDF files. Default value is "False". - `DEBUG_RAG_API`: (Optional) Set to "True" to show more verbose logging output in the server console, and to enable postgresql database routes diff --git a/config.py b/config.py index 7b94a976..118b9e09 100644 --- a/config.py +++ b/config.py @@ -62,6 +62,12 @@ def get_env_variable( ) # Deprecated, backwards compatability CHUNK_SIZE = int(get_env_variable("CHUNK_SIZE", "1500")) CHUNK_OVERLAP = int(get_env_variable("CHUNK_OVERLAP", "100")) +maxChunks = get_env_variable("MAX_CHUNKS") +MAX_CHUNKS = int(maxChunks) if maxChunks else None +EMBEDDING_TIMEOUT = int(get_env_variable("EMBEDDING_TIMEOUT",100000))#default 100 second timeout + +BATCH_SIZE = int(get_env_variable("BATCH_SIZE","75")) +CONCURRENT_LIMIT = int(get_env_variable("CONCURRENT_LIMIT","20")) env_value = get_env_variable("PDF_EXTRACT_IMAGES", "False").lower() PDF_EXTRACT_IMAGES = True if env_value == "true" else False diff --git a/main.py b/main.py index c82b550a..c1632356 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import os +import time import hashlib import aiofiles import aiofiles.os @@ -50,6 +51,8 @@ from mongo import mongo_health_check from constants import ERROR_MESSAGES from store import AsyncPgVector +from process_docs import store_documents + load_dotenv(find_dotenv()) @@ -58,6 +61,7 @@ debug_mode, CHUNK_SIZE, CHUNK_OVERLAP, + MAX_CHUNKS, vector_store, RAG_UPLOAD_DIR, known_source_ext, @@ -239,7 +243,11 @@ async def store_data_in_vector_db( chunk_size=app.state.CHUNK_SIZE, chunk_overlap=app.state.CHUNK_OVERLAP ) documents = text_splitter.split_documents(data) - + if MAX_CHUNKS and len(documents) > MAX_CHUNKS: + raise HTTPException( + status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, + detail=f"Too big file, Attempted to process {len(documents)} chunks, but MAX_CHUNKS is set to {MAX_CHUNKS} chunks", + ) # If `clean_content` is True, clean the page_content of each document (remove null bytes) if clean_content: for doc in documents: @@ -261,9 +269,7 @@ async def store_data_in_vector_db( try: if isinstance(vector_store, AsyncPgVector): - ids = await vector_store.aadd_documents( - docs, ids=[file_id] * len(documents) - ) + ids = await store_documents(docs, ids=[file_id]*len(documents)) else: ids = vector_store.add_documents(docs, ids=[file_id] * len(documents)) diff --git a/process_docs.py b/process_docs.py new file mode 100644 index 00000000..9395ce03 --- /dev/null +++ b/process_docs.py @@ -0,0 +1,38 @@ +from langchain.schema import Document +import time +import asyncio +from config import ( + CONCURRENT_LIMIT, + BATCH_SIZE, + EMBEDDING_TIMEOUT, + vector_store, + logger +) + + +#Prepare documents to be async added to vectorstore async in batches +async def store_documents( + docs: list[Document], ids:list[str] +): + semaphore = asyncio.Semaphore(CONCURRENT_LIMIT) + tasks = [] + logger.info(f"Processing list of documents of length: {len(docs)}") + start_time = time.perf_counter() + for i in range(0, len(docs), BATCH_SIZE): + batch = docs[i : min(i + BATCH_SIZE, len(docs))] + #logger.info(f"Sending batch {i} to {i+len(batch)} / {len(docs)}") + task = asyncio.create_task(process_batch(batch, ids, semaphore)) + tasks.append(task) + try: + idList = await asyncio.wait_for(asyncio.gather(*tasks), timeout=(EMBEDDING_TIMEOUT/1000)) + end_time = time.perf_counter() + elapsed = end_time - start_time + logger.info(f"SUCCESS: processed {len(docs)} documents in time: {elapsed}") + except asyncio.TimeoutError: + raise Exception(f"TIMEOUT: embedding process took over the time limit of {EMBEDDING_TIMEOUT}ms. Partially added to database") + return [id for sublist in idList for id in sublist] + +#Helper for process_documents +async def process_batch(batch: list[Document], ids: list[str], semaphore): + async with semaphore: + return await vector_store.aadd_documents(batch,ids=ids) \ No newline at end of file From 65b9614dd28b6b0586482c72a5507df8581ff961 Mon Sep 17 00:00:00 2001 From: Connor <36115510+ScarFX@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:13:10 -0400 Subject: [PATCH 2/2] refactor: debugger print --- config.py | 1 + process_docs.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/config.py b/config.py index 118b9e09..7d66b990 100644 --- a/config.py +++ b/config.py @@ -248,6 +248,7 @@ def init_embeddings(provider, model): elif EMBEDDINGS_PROVIDER == EmbeddingsProvider.OLLAMA: EMBEDDINGS_MODEL = get_env_variable("EMBEDDINGS_MODEL", "nomic-embed-text") elif EMBEDDINGS_PROVIDER == EmbeddingsProvider.BEDROCK: + BATCH_SIZE = int(get_env_variable("BATCH_SIZE","20")) EMBEDDINGS_MODEL = get_env_variable( "EMBEDDINGS_MODEL", "amazon.titan-embed-text-v1" ) diff --git a/process_docs.py b/process_docs.py index 9395ce03..37e62fec 100644 --- a/process_docs.py +++ b/process_docs.py @@ -20,14 +20,14 @@ async def store_documents( start_time = time.perf_counter() for i in range(0, len(docs), BATCH_SIZE): batch = docs[i : min(i + BATCH_SIZE, len(docs))] - #logger.info(f"Sending batch {i} to {i+len(batch)} / {len(docs)}") + logger.debug(f"Sending batch {i} to {i+len(batch)} / {len(docs)}") task = asyncio.create_task(process_batch(batch, ids, semaphore)) tasks.append(task) try: idList = await asyncio.wait_for(asyncio.gather(*tasks), timeout=(EMBEDDING_TIMEOUT/1000)) end_time = time.perf_counter() elapsed = end_time - start_time - logger.info(f"SUCCESS: processed {len(docs)} documents in time: {elapsed}") + logger.debug(f"SUCCESS: processed {len(docs)} documents in time: {elapsed}") except asyncio.TimeoutError: raise Exception(f"TIMEOUT: embedding process took over the time limit of {EMBEDDING_TIMEOUT}ms. Partially added to database") return [id for sublist in idList for id in sublist]