diff --git a/backend/app/processing/process_queue.py b/backend/app/processing/process_queue.py index 453e6c4..37140cf 100644 --- a/backend/app/processing/process_queue.py +++ b/backend/app/processing/process_queue.py @@ -1,5 +1,4 @@ from functools import wraps -import os from typing import List from app.database import SessionLocal from app.exceptions import CreditLimitExceededException @@ -11,11 +10,8 @@ from app.models import ProcessStatus from app.requests import ( extract_data, - extract_summary_of_summaries, - highlight_sentences_in_pdf, ) from datetime import datetime -from app.requests import extract_summary from app.models.process_step import ProcessStepStatus from app.repositories import user_repository from app.config import settings @@ -72,21 +68,7 @@ def process_step_task( # Move the expensive external operations out of the DB session while retries < settings.max_retries and not success: try: - if process.type == "extractive_summary": - data = extractive_summary_process( - api_key, process, process_step, asset_content - ) - - if data["summary"]: - summaries.append(data["summary"]) - - # Update process step output outside the expensive operations - with SessionLocal() as db: - update_process_step_status( - db, process_step, ProcessStepStatus.COMPLETED, output=data - ) - - elif process.type == "extract": + if process.type == "extract": # Handle non-extractive summary process data = extract_process( api_key, process, process_step, asset_content @@ -153,6 +135,7 @@ def process_task(process_id: int): all_process_steps_ready = len(ready_process_steps) == len(process_steps) # Check if all process steps are ready + # Step 3: Concurrently process all process steps with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit( @@ -168,33 +151,12 @@ def process_task(process_id: int): # Wait for all submitted tasks to complete concurrent.futures.wait(futures) - # Step 3: Handle summary extraction (expensive operation) outside the DB session - summary_of_summaries = None - if ( - "show_final_summary" in process.details - and process.details["show_final_summary"] - ): - logger.log("Extracting summary from summaries") - - if process.output: - return - - # Extract summary outside the DB session to avoid holding connection - data = extract_summary_of_summaries( - api_key, summaries, process.details["transformation_prompt"] - ) - summary_of_summaries = data.get("summary", "") - logger.log("Extracting summary from summaries completed") - # Step 4: After all steps are processed, update the process status and output in the DB with SessionLocal() as db: process = process_repository.get_process(db, process_id) if process.status != ProcessStatus.STOPPED: # If summary extraction was performed, add it to the process output - if summary_of_summaries: - process.output = {"summary": summary_of_summaries} - if not all_process_steps_ready: logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue") process_execution_scheduler.add_process_to_queue(process.id) @@ -206,7 +168,7 @@ def process_task(process_id: int): ) process.completed_at = datetime.utcnow() - db.commit() # Commit the final status and output + db.commit() except Exception as e: logger.error(traceback.format_exc()) @@ -234,57 +196,6 @@ def wrapper(*args, **kwargs): return wrapper -@handle_exceptions -def extractive_summary_process(api_key, process, process_step, asset_content): - try: - data = extract_summary( - api_token=api_key, - config=process.details, - file_path=( - process_step.asset.path - if not asset_content or not asset_content.content - else None - ), - pdf_content=( - asset_content.content - if asset_content and asset_content.content - else None - ), - ) - except Exception as e: - logger.error(f"Error in extract_summary: {str(e)}") - return { - "highlighted_pdf": None, - "summary": "", - } - - summary = data.get("summary", "") - summary_sentences = data.get("summary_sentences", "") - - # Create directory for highlighted PDF - highlighted_file_dir = os.path.join( - settings.process_dir, str(process.id), str(process_step.id) - ) - os.makedirs(highlighted_file_dir, exist_ok=True) - - highlighted_file_path = os.path.join( - highlighted_file_dir, - f"highlighted_{process_step.asset.filename}", - ) - - highlight_sentences_in_pdf( - api_token=api_key, - sentences=summary_sentences, - file_path=process_step.asset.path, - output_path=highlighted_file_path, - ) - - return { - "highlighted_pdf": highlighted_file_path, - "summary": summary, - } - - @handle_exceptions def extract_process(api_key, process, process_step, asset_content): pdf_content = "" diff --git a/backend/app/requests.py b/backend/app/requests.py index a0f4236..cd42fd1 100644 --- a/backend/app/requests.py +++ b/backend/app/requests.py @@ -1,6 +1,5 @@ import json import os -from typing import List from app.exceptions import CreditLimitExceededException import requests from app.config import settings @@ -125,72 +124,6 @@ def extract_field_descriptions(api_token, fields): raise Exception("Unable to process file!") -def extract_summary(api_token, config, file_path=None, pdf_content=None): - config_data = config if isinstance(config, str) else json.dumps(config) - pdf_content_data = ( - config if isinstance(pdf_content, str) else json.dumps(pdf_content) - ) - - # Prepare the headers with the Bearer token - headers = {"x-authorization": f"Bearer {api_token}"} - - # Prepare the data and files dictionaries - data = {"config": config_data} - files = {} - - if file_path: - if not os.path.isfile(file_path): - raise FileNotFoundError(f"The file at {file_path} does not exist.") - - file = open(file_path, "rb") - files["file"] = (os.path.basename(file_path), file) - - elif pdf_content: - data["content"] = pdf_content_data - - # Send the request - response = requests.post( - f"{settings.pandaetl_server_url}/v1/extract/summary", - files=files if files else None, - data=data, - headers=headers, - timeout=360, - ) - # Check the response status code - if response.status_code == 201 or response.status_code == 200: - return response.json() - else: - logger.error( - f"Unable to process file ${file_path} during summary generation. It returned {response.status_code} code: {response.text}" - ) - raise Exception("Unable to process file!") - - -def extract_summary_of_summaries(api_token: str, summaries: List[str], prompt: str): - - # Prepare the headers with the Bearer token - headers = {"x-authorization": f"Bearer {api_token}"} - - # Prepare the data and files dictionaries - data = {"summaries": summaries, "prompt": prompt} - - # Send the request - response = requests.post( - f"{settings.pandaetl_server_url}/v1/extract/summary-of-summaries", - json=data, - headers=headers, - timeout=360, - ) - # Check the response status code - if response.status_code == 201 or response.status_code == 200: - return response.json() - else: - logger.error( - f"Unable to process files during summary of summaries generation. It returned {response.status_code} code: {response.text}" - ) - raise Exception("Unable to process file!") - - def highlight_sentences_in_pdf(api_token, sentences, file_path, output_path): # Prepare the headers with the Bearer token headers = {"x-authorization": f"Bearer {api_token}"} @@ -228,43 +161,6 @@ def highlight_sentences_in_pdf(api_token, sentences, file_path, output_path): raise Exception("Unable to process file!") -def extract_file_segmentation(api_token, file_path=None, pdf_content=None): - - # Prepare the headers with the Bearer token - headers = {"x-authorization": f"Bearer {api_token}"} - - # Prepare the data and files dictionaries - data = {} - files = {} - - if file_path: - if not os.path.isfile(file_path): - raise FileNotFoundError(f"The file at {file_path} does not exist.") - - file = open(file_path, "rb") - files["file"] = (os.path.basename(file_path), file) - - elif pdf_content: - data["pdf_content"] = json.dumps(pdf_content) - - # Send the request - response = requests.post( - f"{settings.pandaetl_server_url}/v1/extract/file/segment", - files=files if files else None, - data=data, - headers=headers, - timeout=360, - ) - # Check the response status code - if response.status_code == 201 or response.status_code == 200: - return response.json() - else: - logger.error( - f"Unable to process file ${file_path} during file segmentation. It returned {response.status_code} code: {response.text}" - ) - raise Exception("Unable to process file!") - - def chat_query(api_token, query, docs): # Prepare the headers with the Bearer token diff --git a/backend/tests/processing/test_process_queue.py b/backend/tests/processing/test_process_queue.py index 8c2b3d5..5668aaa 100644 --- a/backend/tests/processing/test_process_queue.py +++ b/backend/tests/processing/test_process_queue.py @@ -1,8 +1,7 @@ import pytest -from unittest.mock import Mock, patch, mock_open +from unittest.mock import Mock, patch from app.processing.process_queue import ( handle_exceptions, - extractive_summary_process, extract_process, update_process_step_status, find_best_match_for_short_reference, @@ -35,29 +34,6 @@ def test_function(): mock_logger.error.assert_called_with("Credit limit exceeded") -@patch('app.processing.process_queue.extract_summary') -@patch('app.processing.process_queue.highlight_sentences_in_pdf') -@patch('os.path.isfile') -@patch('builtins.open', new_callable=mock_open, read_data="test file content") -def test_extractive_summary_process(mock_file, mock_isfile, mock_highlight, mock_extract_summary): - mock_isfile.return_value = True # Mock the file existence check - mock_extract_summary.return_value = { - "summary": "Test summary", - "summary_sentences": ["Sentence 1", "Sentence 2"] - } - mock_highlight.return_value = b"highlighted pdf content" - - process = Mock(id=1, details={}) - process_step = Mock(id=1, asset=Mock(path="/test/path", filename="test.pdf")) - asset_content = Mock(content=None) - - result = extractive_summary_process("api_key", process, process_step, asset_content) - - assert result["summary"] == "Test summary" - assert "highlighted_pdf" in result - mock_extract_summary.assert_called_once() - mock_highlight.assert_called_once() - @patch('app.processing.process_queue.extract_data') @patch('app.processing.process_queue.ChromaDB') def test_extract_process(mock_chroma, mock_extract_data):