Skip to content

Commit

Permalink
refactor[summary]: clean up code to remove deprecated processes (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArslanSaleem authored and gventuri committed Oct 23, 2024
1 parent 274d310 commit bb1b581
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 221 deletions.
95 changes: 3 additions & 92 deletions backend/app/processing/process_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from functools import wraps
import os
from typing import List
from app.database import SessionLocal
from app.exceptions import CreditLimitExceededException
Expand All @@ -11,11 +10,8 @@
from app.models import ProcessStatus
from app.requests import (
extract_data,
extract_summary_of_summaries,
highlight_sentences_in_pdf,
)
from datetime import datetime
from app.requests import extract_summary
from app.models.process_step import ProcessStepStatus
from app.repositories import user_repository
from app.config import settings
Expand Down Expand Up @@ -72,21 +68,7 @@ def process_step_task(
# Move the expensive external operations out of the DB session
while retries < settings.max_retries and not success:
try:
if process.type == "extractive_summary":
data = extractive_summary_process(
api_key, process, process_step, asset_content
)

if data["summary"]:
summaries.append(data["summary"])

# Update process step output outside the expensive operations
with SessionLocal() as db:
update_process_step_status(
db, process_step, ProcessStepStatus.COMPLETED, output=data
)

elif process.type == "extract":
if process.type == "extract":
# Handle non-extractive summary process
data = extract_process(
api_key, process, process_step, asset_content
Expand Down Expand Up @@ -153,6 +135,7 @@ def process_task(process_id: int):

all_process_steps_ready = len(ready_process_steps) == len(process_steps) # Check if all process steps are ready

# Step 3: Concurrently process all process steps
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(
Expand All @@ -168,33 +151,12 @@ def process_task(process_id: int):
# Wait for all submitted tasks to complete
concurrent.futures.wait(futures)

# Step 3: Handle summary extraction (expensive operation) outside the DB session
summary_of_summaries = None
if (
"show_final_summary" in process.details
and process.details["show_final_summary"]
):
logger.log("Extracting summary from summaries")

if process.output:
return

# Extract summary outside the DB session to avoid holding connection
data = extract_summary_of_summaries(
api_key, summaries, process.details["transformation_prompt"]
)
summary_of_summaries = data.get("summary", "")
logger.log("Extracting summary from summaries completed")

# Step 4: After all steps are processed, update the process status and output in the DB
with SessionLocal() as db:
process = process_repository.get_process(db, process_id)

if process.status != ProcessStatus.STOPPED:
# If summary extraction was performed, add it to the process output
if summary_of_summaries:
process.output = {"summary": summary_of_summaries}

if not all_process_steps_ready:
logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue")
process_execution_scheduler.add_process_to_queue(process.id)
Expand All @@ -206,7 +168,7 @@ def process_task(process_id: int):
)
process.completed_at = datetime.utcnow()

db.commit() # Commit the final status and output
db.commit()

except Exception as e:
logger.error(traceback.format_exc())
Expand Down Expand Up @@ -234,57 +196,6 @@ def wrapper(*args, **kwargs):
return wrapper


@handle_exceptions
def extractive_summary_process(api_key, process, process_step, asset_content):
try:
data = extract_summary(
api_token=api_key,
config=process.details,
file_path=(
process_step.asset.path
if not asset_content or not asset_content.content
else None
),
pdf_content=(
asset_content.content
if asset_content and asset_content.content
else None
),
)
except Exception as e:
logger.error(f"Error in extract_summary: {str(e)}")
return {
"highlighted_pdf": None,
"summary": "",
}

summary = data.get("summary", "")
summary_sentences = data.get("summary_sentences", "")

# Create directory for highlighted PDF
highlighted_file_dir = os.path.join(
settings.process_dir, str(process.id), str(process_step.id)
)
os.makedirs(highlighted_file_dir, exist_ok=True)

highlighted_file_path = os.path.join(
highlighted_file_dir,
f"highlighted_{process_step.asset.filename}",
)

highlight_sentences_in_pdf(
api_token=api_key,
sentences=summary_sentences,
file_path=process_step.asset.path,
output_path=highlighted_file_path,
)

return {
"highlighted_pdf": highlighted_file_path,
"summary": summary,
}


@handle_exceptions
def extract_process(api_key, process, process_step, asset_content):
pdf_content = ""
Expand Down
104 changes: 0 additions & 104 deletions backend/app/requests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import os
from typing import List
from app.exceptions import CreditLimitExceededException
import requests
from app.config import settings
Expand Down Expand Up @@ -125,72 +124,6 @@ def extract_field_descriptions(api_token, fields):
raise Exception("Unable to process file!")


def extract_summary(api_token, config, file_path=None, pdf_content=None):
config_data = config if isinstance(config, str) else json.dumps(config)
pdf_content_data = (
config if isinstance(pdf_content, str) else json.dumps(pdf_content)
)

# Prepare the headers with the Bearer token
headers = {"x-authorization": f"Bearer {api_token}"}

# Prepare the data and files dictionaries
data = {"config": config_data}
files = {}

if file_path:
if not os.path.isfile(file_path):
raise FileNotFoundError(f"The file at {file_path} does not exist.")

file = open(file_path, "rb")
files["file"] = (os.path.basename(file_path), file)

elif pdf_content:
data["content"] = pdf_content_data

# Send the request
response = requests.post(
f"{settings.pandaetl_server_url}/v1/extract/summary",
files=files if files else None,
data=data,
headers=headers,
timeout=360,
)
# Check the response status code
if response.status_code == 201 or response.status_code == 200:
return response.json()
else:
logger.error(
f"Unable to process file ${file_path} during summary generation. It returned {response.status_code} code: {response.text}"
)
raise Exception("Unable to process file!")


def extract_summary_of_summaries(api_token: str, summaries: List[str], prompt: str):

# Prepare the headers with the Bearer token
headers = {"x-authorization": f"Bearer {api_token}"}

# Prepare the data and files dictionaries
data = {"summaries": summaries, "prompt": prompt}

# Send the request
response = requests.post(
f"{settings.pandaetl_server_url}/v1/extract/summary-of-summaries",
json=data,
headers=headers,
timeout=360,
)
# Check the response status code
if response.status_code == 201 or response.status_code == 200:
return response.json()
else:
logger.error(
f"Unable to process files during summary of summaries generation. It returned {response.status_code} code: {response.text}"
)
raise Exception("Unable to process file!")


def highlight_sentences_in_pdf(api_token, sentences, file_path, output_path):
# Prepare the headers with the Bearer token
headers = {"x-authorization": f"Bearer {api_token}"}
Expand Down Expand Up @@ -228,43 +161,6 @@ def highlight_sentences_in_pdf(api_token, sentences, file_path, output_path):
raise Exception("Unable to process file!")


def extract_file_segmentation(api_token, file_path=None, pdf_content=None):

# Prepare the headers with the Bearer token
headers = {"x-authorization": f"Bearer {api_token}"}

# Prepare the data and files dictionaries
data = {}
files = {}

if file_path:
if not os.path.isfile(file_path):
raise FileNotFoundError(f"The file at {file_path} does not exist.")

file = open(file_path, "rb")
files["file"] = (os.path.basename(file_path), file)

elif pdf_content:
data["pdf_content"] = json.dumps(pdf_content)

# Send the request
response = requests.post(
f"{settings.pandaetl_server_url}/v1/extract/file/segment",
files=files if files else None,
data=data,
headers=headers,
timeout=360,
)
# Check the response status code
if response.status_code == 201 or response.status_code == 200:
return response.json()
else:
logger.error(
f"Unable to process file ${file_path} during file segmentation. It returned {response.status_code} code: {response.text}"
)
raise Exception("Unable to process file!")


def chat_query(api_token, query, docs):

# Prepare the headers with the Bearer token
Expand Down
26 changes: 1 addition & 25 deletions backend/tests/processing/test_process_queue.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import pytest
from unittest.mock import Mock, patch, mock_open
from unittest.mock import Mock, patch
from app.processing.process_queue import (
handle_exceptions,
extractive_summary_process,
extract_process,
update_process_step_status,
find_best_match_for_short_reference,
Expand Down Expand Up @@ -35,29 +34,6 @@ def test_function():

mock_logger.error.assert_called_with("Credit limit exceeded")

@patch('app.processing.process_queue.extract_summary')
@patch('app.processing.process_queue.highlight_sentences_in_pdf')
@patch('os.path.isfile')
@patch('builtins.open', new_callable=mock_open, read_data="test file content")
def test_extractive_summary_process(mock_file, mock_isfile, mock_highlight, mock_extract_summary):
mock_isfile.return_value = True # Mock the file existence check
mock_extract_summary.return_value = {
"summary": "Test summary",
"summary_sentences": ["Sentence 1", "Sentence 2"]
}
mock_highlight.return_value = b"highlighted pdf content"

process = Mock(id=1, details={})
process_step = Mock(id=1, asset=Mock(path="/test/path", filename="test.pdf"))
asset_content = Mock(content=None)

result = extractive_summary_process("api_key", process, process_step, asset_content)

assert result["summary"] == "Test summary"
assert "highlighted_pdf" in result
mock_extract_summary.assert_called_once()
mock_highlight.assert_called_once()

@patch('app.processing.process_queue.extract_data')
@patch('app.processing.process_queue.ChromaDB')
def test_extract_process(mock_chroma, mock_extract_data):
Expand Down

0 comments on commit bb1b581

Please sign in to comment.