Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GraphRAG #6

Merged
merged 9 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified app/__pycache__/azure_openai.cpython-311.pyc
Binary file not shown.
Binary file modified app/__pycache__/queue_processor.cpython-311.pyc
Binary file not shown.
Binary file modified app/__pycache__/research.cpython-311.pyc
Binary file not shown.
Binary file modified app/__pycache__/routes.cpython-311.pyc
Binary file not shown.
72 changes: 48 additions & 24 deletions app/routes.py → app/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,40 @@
import os
import PyPDF2
from io import BytesIO
import base64
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceNotFoundError
import asyncio

from .utils import get_user_id
from .blob_service import (
from app.ingestion.indexing_queue import queue_indexing_job
from app.integration.identity import get_user_id
from app.integration.blob_service import (
upload_file_to_lz, create_index_containers, list_files_in_container,
delete_file_from_blob, list_indexes, delete_index, initialize_blob_service,
get_blob_url
)
from .queue_processor import queue_file_for_processing
from .ingestion_job import create_ingestion_job, check_job_status, delete_ingestion_index
from .research import research_with_data
from .chat_service import chat_with_data, refine_message
from .index_manager import create_index_manager, ContainerNameTooLongError, IndexConfig
from .utils import easyauth_enabled
from .ask import AskService
from .pdf_processing import get_pdf_page_count
from app.ingestion.upload_queue import queue_file_for_processing
from app.ingestion.ingestion_job import create_ingestion_job, check_job_status, delete_ingestion_index
from app.query.research import research_with_data
from app.query.chat_service import chat_with_data, refine_message
from app.integration.index_manager import create_index_manager, ContainerNameTooLongError, IndexConfig
from app.integration.identity import easyauth_enabled
from app.query.ask import AskService
from app.ingestion.pdf_processing import get_pdf_page_count
from app.query.voice_chat_service import intro_message, voice_chat_with_data

def are_operations_restricted():
return os.getenv('RESTRICT_OPERATIONS', 'false').lower() == 'true'

class RouteConfigurator:
def __init__(self, app: Flask, socketio: SocketIO, blob_service=None, ingestion_job=None, research=None, chat_service=None):
def __init__(self, app: Flask, socketio: SocketIO, blob_service=None, ingestion_job=None, research=None, chat_service=None, voice_chat_service=None):
self.app = app
self.socketio = socketio
self.blob_service = blob_service or initialize_blob_service()
self.ingestion_job = ingestion_job or create_ingestion_job
self.research = research or research_with_data
self.chat_service = chat_service or chat_with_data
self.voice_chat_service = voice_chat_service or voice_chat_with_data
self.refine_service = refine_message
self.operations_restricted = are_operations_restricted()

Expand All @@ -44,8 +49,12 @@ def configure_routes(self) -> Flask:
self._add_pdf_route()
self._add_config_route()
self._add_ask_route()
self._add_voice_chat_route()
self._add_intro_route()

return self.app


def _add_ask_route(self):
self.app.route('/ask', methods=['POST'])(self._handle_ask)

Expand All @@ -69,14 +78,31 @@ def _add_chat_routes(self):
self.app.route('/chat', methods=['POST'])(self._chat)
self.app.route('/refine', methods=['POST'])(self._refine)

def _add_voice_chat_route(self):
@self.app.route('/voice_chat', methods=['POST'])
def voice_chat():
user_id = get_user_id(request)
return voice_chat_with_data(request.form, user_id)

def _add_intro_route(self):
self.app.route('/intro', methods=['POST'])(self._intro)

def _voice_chat(self):
user_id = get_user_id(request)
return self.voice_chat_service(request.form, user_id)

def _add_pdf_route(self):
self.app.route('/pdf/<index_name>/<path:filename>', methods=['GET'])(self._get_pdf)

def _handle_ask(self):
def _intro(self):
return intro_message()

async def _handle_ask(self):
user_id = get_user_id(request)
data = request.json
ask_service = self._get_ask_service()
response, status_code = ask_service.ask_question(data, user_id)

response, status_code = await ask_service.ask_question(data, user_id)
return jsonify(response), status_code

def _get_indexes(self):
Expand All @@ -92,7 +118,7 @@ def _create_index(self):
return jsonify({"error": "Operation not allowed"}), 403

user_id = get_user_id(request)
data = request.json
data = request.get_json()
index_config = self._validate_index_creation_data(data, user_id)

if isinstance(index_config, tuple):
Expand Down Expand Up @@ -154,20 +180,15 @@ def _upload_file(self, index_name: str):

filename = secure_filename(file.filename)

# Save file to memory
file_bytes = file.read()
file_buffer = BytesIO(file_bytes)

# Get number of pages
num_pages = get_pdf_page_count(file_buffer)

# Reset buffer position
file_buffer.seek(0)

# Upload to landing zone
blob_url = upload_file_to_lz(file_buffer, filename, user_id, index_name, is_restricted, self.blob_service)

# Queue the file for processing
queue_file_for_processing(filename, user_id, index_name, is_restricted, num_pages, blob_url, is_multimodal)

return jsonify({
Expand Down Expand Up @@ -230,13 +251,12 @@ def _index_files(self, index_name: str):
index_manager = self._get_index_manager(user_id, index_name, is_restricted)
if isinstance(index_manager, tuple):
return index_manager

ingestion_container = index_manager.get_ingestion_container()

try:
result = create_ingestion_job(ingestion_container)
return jsonify(result), 202
queue_indexing_job(ingestion_container, user_id, index_name, is_restricted)
return jsonify({"status": "initiated", "job_id": ingestion_container, "message": "Indexing job initiated successfully"}), 202
except Exception as e:
print(f"Error initiating indexing job: {str(e)}")
return jsonify({"error": str(e)}), 500

def _check_index_status(self, index_name: str):
Expand All @@ -260,6 +280,11 @@ def _chat(self):
data = request.json
return self.chat_service(data, user_id)

def _voice_chat(self):
user_id = get_user_id(request)
data = request.json
return self.voice_chat_service(data, user_id)

def _refine(self):
user_id = get_user_id(request)
data = request.json
Expand Down Expand Up @@ -297,7 +322,6 @@ def _get_pdf(self, index_name: str, filename: str) -> Tuple[Response, int]:
self.app.logger.error(f"Error retrieving PDF: {str(e)}")
return jsonify({"error": f"Error retrieving PDF: {str(e)}"}), 500


def _validate_index_creation_data(self, data, user_id):
index_name = data.get('name')
is_restricted = data.get('is_restricted', True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from azure.ai.documentintelligence.models import ContentFormat
from azure.core.credentials import AzureKeyCredential
from PIL import Image
from .azure_openai import get_azure_openai_client, analyze_image
from app.integration.azure_openai import get_azure_openai_client, analyze_image
from .table_postprocessor import enhance_markdown

def refine_figures(content, png_path: str) -> str:
Expand Down
27 changes: 27 additions & 0 deletions app/ingestion/graphrag_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging
from graphrag.config import create_graphrag_config
from graphrag.index import create_pipeline_config
from graphrag.index.run import run_pipeline_with_config
from graphrag.index.progress import PrintProgressReporter
from app.integration.graphrag_config import GraphRagConfig

logger = logging.getLogger(__name__)

class GraphRagIngestion:
def __init__(self, config: GraphRagConfig):
self.config = config

async def process(self):
config = self.config.get_config()
parameters = create_graphrag_config(config, ".")
pipeline_config = create_pipeline_config(parameters, True)

logger.info(f"Starting GraphRAG processing for index: {self.config.index_name}")
async for workflow_result in run_pipeline_with_config(
config_or_path=pipeline_config,
progress_reporter=PrintProgressReporter("Running GraphRAG pipeline..."),
):
if workflow_result.errors:
logger.error(f"Errors found in GraphRAG workflow result for index {self.config.index_name}: {workflow_result.errors}")
else:
logger.info(f"GraphRAG processing completed successfully for index: {self.config.index_name}")
117 changes: 117 additions & 0 deletions app/ingestion/indexing_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import asyncio
import os
import json
import logging
from typing import Dict, Any
from azure.storage.queue import QueueClient
from azure.data.tables import TableServiceClient
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import ResourceExistsError

from dotenv import load_dotenv
load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IndexingQueueSettings:
STORAGE_ACCOUNT_NAME = os.getenv('STORAGE_ACCOUNT_NAME')
STORAGE_ACCOUNT_KEY = os.getenv('STORAGE_ACCOUNT_KEY')
INDEXING_QUEUE_NAME = "indexing"
INDEXING_TABLE_NAME = "indexing"
MAX_MESSAGES = 32
VISIBILITY_TIMEOUT = 600
SLEEP_TIME = 10

def get_env_variable(name: str) -> str:
value = getattr(IndexingQueueSettings, name, None)
if not value:
raise ValueError(f"{name} environment variable is not set")
return value

class AzureClientManager:
@staticmethod
def initialize_queue_client(queue_name: str) -> QueueClient:
account_name = get_env_variable('STORAGE_ACCOUNT_NAME')
storage_key = IndexingQueueSettings.STORAGE_ACCOUNT_KEY
credential = storage_key if storage_key else DefaultAzureCredential()
queue_client = QueueClient(
account_url=f"https://{account_name}.queue.core.windows.net",
queue_name=queue_name,
credential=credential
)

AzureClientManager._create_if_not_exists(queue_client.create_queue, f"Queue '{queue_name}'")
return queue_client

@staticmethod
def initialize_table_client(table_name: str) -> TableServiceClient:
account_name = get_env_variable('STORAGE_ACCOUNT_NAME')
storage_key = IndexingQueueSettings.STORAGE_ACCOUNT_KEY
connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={storage_key};EndpointSuffix=core.windows.net"
table_service_client = TableServiceClient.from_connection_string(connection_string)

AzureClientManager._create_if_not_exists(lambda: table_service_client.create_table(table_name), f"Table '{table_name}'")
return table_service_client.get_table_client(table_name)

@staticmethod
def _create_if_not_exists(create_func, resource_name: str):
try:
create_func()
logger.info(f"{resource_name} created successfully.")
except ResourceExistsError:
logger.debug(f"{resource_name} already exists.")

class IndexingJobManager:
def __init__(self):
self.queue_client = AzureClientManager.initialize_queue_client(IndexingQueueSettings.INDEXING_QUEUE_NAME)
self.table_client = AzureClientManager.initialize_table_client(IndexingQueueSettings.INDEXING_TABLE_NAME)

def queue_indexing_job(self, container_name: str, user_id: str, index_name: str, is_restricted: bool) -> str:
job_id = container_name
message_content = json.dumps({
"job_id": job_id,
"container_name": container_name,
"user_id": user_id,
"index_name": index_name,
"is_restricted": is_restricted
})
self.queue_client.send_message(message_content)

self.table_client.upsert_entity({
"PartitionKey": "indexing",
"RowKey": job_id,
"status": "queued"
})

logger.info(f"Queued indexing job for container: {container_name}, job_id: {job_id}")
return job_id

async def process_indexing_queue(self, process_job_func):
logger.info("Indexing queue processor started. Waiting for messages...")
while True:
messages = self.queue_client.receive_messages(
max_messages=IndexingQueueSettings.MAX_MESSAGES,
visibility_timeout=IndexingQueueSettings.VISIBILITY_TIMEOUT
)
for message in messages:
await self._process_message(message, process_job_func)
await asyncio.sleep(IndexingQueueSettings.SLEEP_TIME)

async def _process_message(self, message, process_job_func):
try:
job_info = json.loads(message.content)
logger.info(f"Processing indexing job for container: {job_info['container_name']}")
await process_job_func(job_info)
self.queue_client.delete_message(message)
logger.info(f"Completed and deleted message for container: {job_info['container_name']}")
except Exception as e:
logger.error(f"Error processing indexing job: {str(e)}")

def queue_indexing_job(container_name: str, user_id: str, index_name: str, is_restricted: bool) -> str:
job_manager = IndexingJobManager()
return job_manager.queue_indexing_job(container_name, user_id, index_name, is_restricted)

async def process_indexing_queue(process_job_func):
job_manager = IndexingJobManager()
await job_manager.process_indexing_queue(process_job_func)
Loading
Loading