From 29d8f303b0f8d8cf78e3ff797d85cb99315d5512 Mon Sep 17 00:00:00 2001 From: HendricksJudy <61645034+HendricksJudy@users.noreply.github.com> Date: Sat, 14 Dec 2024 23:41:24 +0800 Subject: [PATCH] RAG_sys_BackBone_0.0.6 --- OvStudent/app.py | 363 ++++++++++++++++++++++++++++++++++++++++ OvStudent/rag_system.py | 350 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 713 insertions(+) create mode 100644 OvStudent/app.py create mode 100644 OvStudent/rag_system.py diff --git a/OvStudent/app.py b/OvStudent/app.py new file mode 100644 index 00000000..a5478c1d --- /dev/null +++ b/OvStudent/app.py @@ -0,0 +1,363 @@ +# --- START OF FILE app.py --- +import streamlit as st +import json +from datetime import datetime, timezone +import os +import subprocess +import time +import requests +import getpass +import psutil +from pathlib import Path +import logging +from logging.handlers import RotatingFileHandler +from collections import OrderedDict + +from rag_system import RAGSystem +from config_manager import ConfigManager +from system_monitor import SystemMonitor +from rate_limiter import RateLimiter +from query_cache import QueryCache +from query_manager import QueryManager + + +# Set up logging with rotating file handler +def setup_logging(): + log_dir = Path("logs") + log_dir.mkdir(exist_ok=True) + + handler = RotatingFileHandler( + log_dir / 'streamlit_app.log', + maxBytes=10 * 1024 * 1024, # 10 MB + backupCount=5 + ) + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + handler + ] + ) + + +setup_logging() + + +# Initialize session state +def initialize_session_state(): + default_state = { + 'ollama_ready': False, + 'models_installed': False, + 'query_history': [], + 'rate_limiter': None, + 'query_cache': None, + 'config': { + 'file_selection_model': 'qwen2.5-coder:3b', + 'query_processing_model': 'qwen2.5-coder:7b', + 'rate_limit': 5, # seconds between queries + # Add your default directories here + 'converted_jsons_directory': "/Users/kq_m3m/PycharmProjects/OVMaster/Converted_Jsons", + 'annotated_scripts_directory': "/Users/kq_m3m/PycharmProjects/OVMaster/Converted_Scripts_Annotated" + }, + 'current_user': getpass.getuser() # Get the current username + } + + for key, value in default_state.items(): + if key not in st.session_state: + st.session_state[key] = value + + # Initialize RateLimiter if not already set + if st.session_state['rate_limiter'] is None: + st.session_state['rate_limiter'] = RateLimiter(st.session_state['config']['rate_limit']) + + # Initialize QueryCache if not already set + if st.session_state['query_cache'] is None: + st.session_state['query_cache'] = QueryCache() + + +initialize_session_state() + + +# Cache for RAGSystem using @st.cache_resource to ensure singleton +@st.cache_resource +def get_rag_system(converted_jsons_directory, annotated_scripts_directory): + try: + return RAGSystem(converted_jsons_directory, annotated_scripts_directory) + except Exception as e: + logging.error(f"Failed to initialize RAG system: {str(e)}") + return None + + +# Function to display the header +def show_header(): + col1, col2, col3 = st.columns([2, 1, 1]) + with col1: + st.title("Agentic OmicVerse 🧬") + with col2: + # Display current time using a placeholder that will update automatically + current_time = datetime.now(timezone.utc) # Get the current time in UTC + st.info(f"📅 UTC: {current_time.strftime('%Y-%m-%d %H:%M:%S')}") + with col3: + # Display the current username + st.info(f"👤 User: {st.session_state['current_user']}") + + +# Function to display system status +def show_system_status(): + stats = SystemMonitor.get_system_stats() + with st.sidebar: + st.header("System Status 📊") + col1, col2 = st.columns(2) + with col1: + st.metric("Memory (MB)", f"{stats['memory_usage']:.1f}") + st.metric("CPU %", f"{stats['cpu_percent']:.1f}") + with col2: + st.metric("Uptime", SystemMonitor.format_uptime(stats['uptime'])) + st.metric("Memory Usage %", f"{stats['system_memory']['percent']:.1f}") + st.progress(stats['system_memory']['percent'] / 100) + + +# Function to check if Ollama server is running +def check_ollama_server() -> bool: + try: + response = requests.get("http://localhost:11434/api/version", timeout=5) + return response.status_code == 200 + except requests.RequestException: + return False + + +# Function to display health status +def display_health_status(): + healthy, checks = check_system_health() + with st.sidebar: + st.header("System Health ✅" if healthy else "System Health ⚠️") + for component, status in checks.items(): + if status: + st.success(f"{component} is running") + else: + st.error(f"{component} is not running") + + +# Function to perform health checks +def check_system_health(): + health_checks = { + 'Ollama Server': check_ollama_server(), + } + all_healthy = all(health_checks.values()) + return all_healthy, health_checks + + +# Function to display configuration settings +def show_configuration(): + with st.sidebar: + st.header("Configuration ⚙️") + with st.expander("Model Settings"): + file_selection_model = st.selectbox( + "File Selection Model", + ["qwen2.5-coder:3b", "qwen2.5-coder:7b", "gemini-pro", "gemini-1.5-flash-8b", "gemini-2.0-flash-exp"], + index=["qwen2.5-coder:3b", "qwen2.5-coder:7b", "gemini-pro", "gemini-1.5-flash-8b", + "gemini-2.0-flash-exp"].index( + st.session_state['config'].get('file_selection_model', "qwen2.5-coder:3b") + ) + ) + query_processing_model = st.selectbox( + "Query Processing Model", + ["qwen2.5-coder:7b", "qwen2.5-coder:3b", "gemini-pro", "gemini-1.5-flash-8b", "gemini-2.0-flash-exp"], + index=["qwen2.5-coder:7b", "qwen2.5-coder:3b", "gemini-pro", "gemini-1.5-flash-8b", + "gemini-2.0-flash-exp"].index( + st.session_state['config'].get('query_processing_model', "qwen2.5-coder:7b") + ) + ) + + # If using Gemini, request the API key (optional if not needed anymore) + # gemini_api_key = None # The redesigned rag_system.py doesn't require this + + rate_limit = st.slider( + "Rate Limit (seconds)", + min_value=1, + max_value=30, + value=st.session_state['config']['rate_limit'] + ) + current_user = st.text_input("Username", value=st.session_state['current_user']) + + # Directories + converted_jsons_directory = st.text_input("Converted JSONs Directory", + value=st.session_state['config']['converted_jsons_directory']) + annotated_scripts_directory = st.text_input("Annotated Scripts Directory", + value=st.session_state['config']['annotated_scripts_directory']) + + if st.button("Save Configuration"): + st.session_state['config'].update({ + 'file_selection_model': file_selection_model, + 'query_processing_model': query_processing_model, + 'rate_limit': rate_limit, + 'converted_jsons_directory': converted_jsons_directory, + 'annotated_scripts_directory': annotated_scripts_directory + }) + st.session_state['current_user'] = current_user + ConfigManager.save_config(st.session_state['config']) + st.session_state['rate_limiter'] = RateLimiter(rate_limit) + st.session_state['query_cache'] = QueryCache() + st.success("Configuration saved successfully.") + + +# Function to process query with progress tracking using the new RAG logic +def process_query_with_progress(query, rag_system): + if not query or not isinstance(query, str): + raise ValueError("Invalid query: Query must be a non-empty string") + + progress_bar = st.progress(0) + status_text = st.empty() + try: + logging.info(f"Processing query: {query}") + logging.info(f"RAG System State: {rag_system is not None}") + + status_text.text("Finding relevant documents...") + progress_bar.progress(25) + + # Pass query directly as a string + relevant_files = rag_system.find_relevant_files(query) + + status_text.text("Generating answer from annotated scripts...") + progress_bar.progress(50) + answer = rag_system.answer_query_with_annotated_scripts(query, relevant_files) + logging.info(f"Answer: {answer}") + + status_text.text("Updating history...") + progress_bar.progress(75) + + query_time = datetime.now(timezone.utc) + st.session_state.query_history.append({ + 'query': query, + 'file': relevant_files, + 'answer': answer, + 'timestamp': query_time, + 'user': st.session_state['current_user'] + }) + + st.session_state['rate_limiter'].record_request() + progress_bar.progress(100) + status_text.text("Complete!") + time.sleep(1) + progress_bar.empty() + status_text.empty() + return relevant_files, answer + except Exception as e: + logging.error(f"Query processing error: {str(e)}", exc_info=True) + progress_bar.empty() + status_text.text(f"Error: {e}") + raise e + + +# Function to display query history +def show_query_history(): + with st.sidebar: + st.header("Query History 📜") + for idx, item in enumerate(reversed(st.session_state.query_history[-10:])): + with st.expander(f"Query {len(st.session_state.query_history) - idx}: {item['query'][:30]}..."): + st.markdown(f"**Time:** {item['timestamp'].strftime('%Y-%m-%d %H:%M:%S')} UTC") + st.markdown(f"**User:** {item['user']}") + st.markdown(f"**Document(s):** {item['file']}") + st.markdown(f"**Answer:** {item['answer']}") + st.markdown("---") + + +# Main function +def main(): + show_header() + show_system_status() + display_health_status() + show_configuration() + + if st.button("Reset System"): + st.session_state.query_history = [] + st.session_state['rate_limiter'] = RateLimiter(st.session_state['config']['rate_limit']) + st.session_state['query_cache'] = QueryCache() + st.rerun() + + if not st.session_state['ollama_ready']: + if not check_ollama_server(): + st.error("❌ Ollama server is not running") + if st.button("🚀 Start Ollama Server"): + try: + subprocess.Popen(['ollama', 'serve']) + time.sleep(5) + if check_ollama_server(): + st.session_state['ollama_ready'] = True + st.success("✅ Ollama server started successfully") + st.rerun() + except FileNotFoundError: + st.error("❌ Ollama is not installed") + return + else: + st.session_state['ollama_ready'] = True + + # Initialize RAGSystem via cached function + converted_jsons_directory = st.session_state['config']['converted_jsons_directory'] + annotated_scripts_directory = st.session_state['config']['annotated_scripts_directory'] + rag_system = get_rag_system(converted_jsons_directory, annotated_scripts_directory) + + if rag_system is None: + st.error("Failed to initialize RAG system.") + return + + st.markdown("### Query Interface 🔍") + query = st.text_area( + "Enter your query:", + height=100, + placeholder="Enter your question about the documents..." + ) + + col1, col2 = st.columns([1, 5]) + with col1: + submit = st.button("🚀 Submit") + with col2: + if st.button("🗑️ Clear History"): + st.session_state.query_history = [] + st.rerun() + + if submit and query: + # Add validation to ensure query is not empty or just whitespace + if not query.strip(): + st.error("Query cannot be empty. Please enter a valid query.") + return + + is_valid, error_message = QueryManager.validate_query(query) + if not is_valid: + st.error(error_message) + return + + if not st.session_state['rate_limiter'].can_make_request(): + wait_time = st.session_state['rate_limiter'].time_until_next_request() + st.warning(f"Please wait {wait_time:.1f} seconds before making another query.") + return + + try: + with st.spinner("Processing query..."): + # Log the query and state before processing + logging.info(f"Processing query: {query!r}") + logging.info(f"RAG system state: initialized={rag_system is not None}") + + relevant_files, answer = process_query_with_progress(query, rag_system) + st.success(f"📄 Selected document(s): {relevant_files}") + st.markdown("### Answer 💡") + st.markdown(answer) + except ValueError as ve: + st.error(f"Invalid query: {str(ve)}") + logging.error(f"ValueError in query processing: {str(ve)}", exc_info=True) + except Exception as e: + st.error(f"An error occurred: {str(e)}") + logging.error("Query processing error", exc_info=True) + + show_query_history() + + +if __name__ == "__main__": + try: + main() + except Exception as e: + logging.error(f"Application error: {str(e)}", exc_info=True) + st.error(f"An unexpected error occurred: {str(e)}") +# --- END OF FILE app.py --- \ No newline at end of file diff --git a/OvStudent/rag_system.py b/OvStudent/rag_system.py new file mode 100644 index 00000000..22e9908b --- /dev/null +++ b/OvStudent/rag_system.py @@ -0,0 +1,350 @@ +# --- START OF FILE rag_system.py --- +import os +import time +import json +from datetime import datetime +from typing import List, Optional, Dict, Any, Union +import logging +from logging.handlers import RotatingFileHandler + +import chromadb +from langchain.chains import RetrievalQA +from langchain.docstore.document import Document +from langchain.prompts import PromptTemplate +from langchain.callbacks.manager import CallbackManager +from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler + +# Use the original GPT4AllEmbeddings and Ollama LLM from the initial code. +from langchain_community.embeddings import GPT4AllEmbeddings +from langchain_community.llms import Ollama +from langchain_community.vectorstores import Chroma +from langchain.text_splitter import CharacterTextSplitter + +def setup_logging(): + logger = logging.getLogger('rag_system_code_optimized') + logger.setLevel(logging.INFO) + + os.makedirs('logs', exist_ok=True) + + file_handler = RotatingFileHandler( + 'logs/rag_system_code_optimized.log', + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5 + ) + file_handler.setFormatter( + logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ) + logger.addHandler(file_handler) + + console_handler = logging.StreamHandler() + console_handler.setFormatter( + logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ) + logger.addHandler(console_handler) + + return logger + +logger = setup_logging() + + +class CodeAwareTextSplitter(CharacterTextSplitter): + """ + A custom text splitter that tries to split code more gracefully. + You can enhance this by: + - Splitting on `def `, `class ` boundaries. + - Avoiding splitting in the middle of a function. + For now, this is a placeholder that uses line-based splitting + but could be improved as needed. + """ + def __init__(self, chunk_size=2000, chunk_overlap=200): + super().__init__(separator="\n", chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + +class FirstStageRAG: + def __init__( + self, + converted_jsons_directory: str, + persist_dir: str, + file_selection_model: str, + chroma_client: chromadb.Client, + top_k_files: int = 3 + ): + self.converted_jsons_directory = converted_jsons_directory + self.persist_dir = persist_dir + self.file_selection_model = file_selection_model + self.chroma_client = chroma_client + self.top_k_files = top_k_files + + logger.info("Initializing FirstStageRAG for code retrieval...") + + self.collection_name = "file_descriptions" + self.collection = self._load_or_create_collection() + + if self.collection.count() == 0: + logger.info("Populating first-stage collection with file descriptions...") + self._index_file_descriptions() + logger.info("File descriptions indexed successfully.") + + logger.info("FirstStageRAG initialized successfully.") + + def _load_or_create_collection(self): + try: + collection = self.chroma_client.get_collection(name=self.collection_name) + return collection + except: + logger.info(f"Creating a new Chroma collection: {self.collection_name}") + collection = self.chroma_client.create_collection(name=self.collection_name) + return collection + + def _index_file_descriptions(self): + embeddings = GPT4AllEmbeddings(model=self.file_selection_model) + + docs = [] + metadatas = [] + ids = [] + + for fname in os.listdir(self.converted_jsons_directory): + if fname.endswith(".json"): + fpath = os.path.join(self.converted_jsons_directory, fname) + with open(fpath, 'r', encoding='utf-8') as f: + try: + data = json.load(f) + description = data.get("description", "") + file_name = data.get("file", fname) + + if not description: + continue + + docs.append(description) + metadatas.append({"file": file_name}) + ids.append(file_name) + except Exception as e: + logger.error(f"Error reading {fpath}: {str(e)}", exc_info=True) + + if docs: + doc_embeddings = embeddings.embed_documents(docs) + self.collection.add(documents=docs, metadatas=metadatas, ids=ids, embeddings=doc_embeddings) + else: + logger.warning("No documents to index in FirstStageRAG.") + + def find_relevant_files(self, query: Union[str, Dict[str, Any]]) -> List[str]: + if isinstance(query, dict): + query_text = query.get("query", "") + else: + query_text = query + + if not query_text.strip(): + logger.warning("Empty query received in FirstStageRAG.") + return [] + + embeddings = GPT4AllEmbeddings(model=self.file_selection_model) + query_embedding = embeddings.embed_query(query_text) + + results = self.collection.query(query_embeddings=[query_embedding], n_results=self.top_k_files) + + matched_files = [] + if results and "metadatas" in results and results["metadatas"]: + for meta in results["metadatas"][0]: + file_name = meta.get("file", None) + if file_name: + matched_files.append(file_name) + + logger.info(f"Top {self.top_k_files} matched files for query '{query_text}': {matched_files}") + return matched_files + + +class SecondStageRAG: + def __init__( + self, + annotated_scripts_directory: str, + query_processing_model: str, + chroma_client: chromadb.Client, + code_chunk_size: int = 2000, + code_chunk_overlap: int = 200, + top_k_chunks: int = 3 + ): + self.annotated_scripts_directory = annotated_scripts_directory + self.query_processing_model = query_processing_model + self.chroma_client = chroma_client + self.code_chunk_size = code_chunk_size + self.code_chunk_overlap = code_chunk_overlap + self.top_k_chunks = top_k_chunks + + logger.info("Initializing SecondStageRAG for code generation...") + + logger.info("SecondStageRAG initialized successfully.") + + def answer_query_with_annotated_scripts(self, query: str, relevant_files: List[str]) -> str: + if not relevant_files: + logger.info("No relevant files provided to second stage. Returning fallback answer.") + return "I could not find relevant code files for your request." + + documents = [] + text_splitter = CodeAwareTextSplitter(chunk_size=self.code_chunk_size, chunk_overlap=self.code_chunk_overlap) + + for file_name in relevant_files: + file_path = os.path.join(self.annotated_scripts_directory, file_name) + if os.path.exists(file_path): + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + docs = text_splitter.split_text(content) + for i, doc_chunk in enumerate(docs): + documents.append( + Document( + page_content=doc_chunk, + metadata={"source_file": file_name, "chunk_id": i} + ) + ) + else: + logger.warning(f"File {file_name} not found in annotated scripts directory.") + + if not documents: + logger.info("No documents found to answer the query in second stage.") + return "I could not find relevant code content." + + embeddings = GPT4AllEmbeddings(model=self.query_processing_model) + + collection_name = f"annotated_docs_{int(time.time())}" + vectorstore = Chroma.from_documents( + documents=documents, + embedding=embeddings, + collection_name=collection_name, + client=self.chroma_client + ) + + retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": self.top_k_chunks}) + + # System prompt and template for code generation + system_prompt = ( + "You are a Python code generation assistant. " + "You will be provided with context code snippets related to the user query. " + "Please produce clear, correct, and well-commented Python code that addresses the user's request. " + "Follow PEP8 standards. If you propose changes, ensure they run without syntax errors." + ) + + prompt_template = PromptTemplate( + input_variables=["context", "question"], + template=( + "{context}\n\n" + "User Request: {question}\n\n" + "Now generate the best possible Python code solution given the above context. " + "If appropriate, include function definitions, classes, or usage examples. " + "Make sure the final answer is strictly Python code." + ) + ) + + llm = Ollama( + model=self.query_processing_model, + callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]) + ) + + chain = RetrievalQA.from_chain_type( + llm=llm, + chain_type="stuff", + retriever=retriever, + return_source_documents=False, + chain_type_kwargs={"prompt": prompt_template} + ) + + logger.info(f"Running retrieval QA for code generation query: {query}") + answer = chain.run(query) + + # Cleanup the temporary collection after use + try: + self.chroma_client.delete_collection(name=collection_name) + except Exception as e: + logger.warning(f"Failed to cleanup temporary collection {collection_name}: {e}") + + return answer + + +class RAGSystem: + def __init__( + self, + converted_jsons_directory: str, + annotated_scripts_directory: str, + persist_dir: str = "./chroma_db_code", + file_selection_model: str = "qwen2.5-coder:3b", + query_processing_model: str = "qwen2.5-coder:7b", + top_k_files: int = 3, + top_k_chunks: int = 3, + code_chunk_size: int = 2000, + code_chunk_overlap: int = 200 + ): + logger.info("Initializing RAG System for Python code generation...") + + self.converted_jsons_directory = converted_jsons_directory + self.annotated_scripts_directory = annotated_scripts_directory + self.persist_directory = persist_dir + self.file_selection_model = file_selection_model + self.query_processing_model = query_processing_model + self.top_k_files = top_k_files + self.top_k_chunks = top_k_chunks + self.code_chunk_size = code_chunk_size + self.code_chunk_overlap = code_chunk_overlap + + self._validate_directories() + + os.makedirs(self.persist_directory, exist_ok=True) + + self.chroma_client = chromadb.Client( + chromadb.config.Settings( + anonymized_telemetry=False, + is_persistent=True, + persist_directory=self.persist_directory + ) + ) + + self.first_stage = FirstStageRAG( + converted_jsons_directory=self.converted_jsons_directory, + persist_dir=self.persist_directory, + file_selection_model=self.file_selection_model, + chroma_client=self.chroma_client, + top_k_files=self.top_k_files + ) + + self.second_stage = SecondStageRAG( + annotated_scripts_directory=self.annotated_scripts_directory, + query_processing_model=self.query_processing_model, + chroma_client=self.chroma_client, + top_k_chunks=self.top_k_chunks, + code_chunk_size=self.code_chunk_size, + code_chunk_overlap=self.code_chunk_overlap + ) + + logger.info("RAG System for code generation initialized successfully") + + def _validate_directories(self): + for directory in [self.converted_jsons_directory, self.annotated_scripts_directory]: + if not os.path.exists(directory): + raise ValueError(f"Directory does not exist: {directory}") + if not os.path.isdir(directory): + raise ValueError(f"Path is not a directory: {directory}") + if not os.access(directory, os.R_OK): + raise ValueError(f"Directory is not readable: {directory}") + logger.info("All directories validated successfully.") + + def find_relevant_files(self, query: Union[str, Dict[str, Any]]) -> List[str]: + return self.first_stage.find_relevant_files(query) + + def answer_query_with_annotated_scripts( + self, + query: str, + relevant_files: List[str] + ) -> str: + return self.second_stage.answer_query_with_annotated_scripts(query, relevant_files) + + def cleanup(self): + logger.info("Cleaning up resources...") + try: + self.chroma_client.reset() + if os.path.exists(self.persist_directory): + import shutil + shutil.rmtree(self.persist_directory, ignore_errors=True) + logger.info("Cleanup completed.") + except Exception as e: + logger.error(f"Error during cleanup: {str(e)}", exc_info=True) + raise + +# --- END OF FILE rag_system_code.py ---