Skip to content

Commit

Permalink
fix: support /index endpoint async
Browse files Browse the repository at this point in the history
  • Loading branch information
ishaansehgal99 committed Jan 23, 2025
1 parent 4913522 commit c111179
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 22 deletions.
4 changes: 2 additions & 2 deletions presets/ragengine/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ async def health_check():
raise HTTPException(status_code=500, detail=str(e))

@app.post("/index", response_model=List[DocumentResponse])
async def index_documents(request: IndexRequest): # TODO: Research async/sync what to use (inference is calling)
async def index_documents(request: IndexRequest):
try:
doc_ids = rag_ops.index(request.index_name, request.documents)
doc_ids = await rag_ops.index(request.index_name, request.documents)
documents = [
DocumentResponse(doc_id=doc_id, text=doc.text, metadata=doc.metadata)
for doc_id, doc in zip(doc_ids, request.documents)
Expand Down
29 changes: 15 additions & 14 deletions presets/ragengine/vector_store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,36 @@ def generate_doc_id(text: str) -> str:
"""Generates a unique document ID based on the hash of the document text."""
return hashlib.sha256(text.encode('utf-8')).hexdigest()

def index_documents(self, index_name: str, documents: List[Document]) -> List[str]:
async def index_documents(self, index_name: str, documents: List[Document]) -> List[str]:
"""Common indexing logic for all vector stores."""
if index_name in self.index_map:
return self._append_documents_to_index(index_name, documents)
return await self._append_documents_to_index(index_name, documents)
else:
return self._create_new_index(index_name, documents)
return await self._create_new_index(index_name, documents)

def _append_documents_to_index(self, index_name: str, documents: List[Document]) -> List[str]:
async def _append_documents_to_index(self, index_name: str, documents: List[Document]) -> List[str]:
"""Common logic for appending documents to existing index."""
logger.info(f"Index {index_name} already exists. Appending documents to existing index.")
indexed_doc_ids = set()

for doc in documents:
doc_id = self.generate_doc_id(doc.text)
if not self.document_exists(index_name, doc, doc_id):
self.add_document_to_index(index_name, doc, doc_id)
await self.add_document_to_index(index_name, doc, doc_id)
indexed_doc_ids.add(doc_id)
else:
logger.info(f"Document {doc_id} already exists in index {index_name}. Skipping.")

if indexed_doc_ids:
self._persist(index_name)
await self._persist(index_name)
return list(indexed_doc_ids)

@abstractmethod
def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
async def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
"""Create a new index - implementation specific to each vector store."""
pass

def _create_index_common(self, index_name: str, documents: List[Document], vector_store) -> List[str]:
async def _create_index_common(self, index_name: str, documents: List[Document], vector_store) -> List[str]:
"""Common logic for creating a new index with documents."""
storage_context = StorageContext.from_defaults(vector_store=vector_store)
llama_docs = []
Expand All @@ -80,11 +80,12 @@ def _create_index_common(self, index_name: str, documents: List[Document], vecto
llama_docs,
storage_context=storage_context,
embed_model=self.embed_model,
use_async=True,
)
index.set_index_id(index_name)
await index.set_index_id(index_name)
self.index_map[index_name] = index
self.index_store.add_index_struct(index.index_struct)
self._persist(index_name)
await self._persist(index_name)
return list(indexed_doc_ids)

async def query(self,
Expand Down Expand Up @@ -151,12 +152,12 @@ async def query(self,
"metadata": query_result.metadata,
}

def add_document_to_index(self, index_name: str, document: Document, doc_id: str):
async def add_document_to_index(self, index_name: str, document: Document, doc_id: str):
"""Common logic for adding a single document."""
if index_name not in self.index_map:
raise ValueError(f"No such index: '{index_name}' exists.")
llama_doc = LlamaDocument(text=document.text, metadata=document.metadata, id_=doc_id)
self.index_map[index_name].insert(llama_doc)
await self.index_map[index_name].insert(llama_doc)

def list_all_indexed_documents(self) -> Dict[str, Dict[str, Dict[str, str]]]:
"""Common logic for listing all documents."""
Expand Down Expand Up @@ -184,15 +185,15 @@ def _persist_all(self):
for idx in self.index_store.index_structs():
self._persist(idx.index_id)

def _persist(self, index_name: str):
async def _persist(self, index_name: str):
"""Common persistence logic for individual index."""
try:
logger.info(f"Persisting index {index_name}.")
self.index_store.persist(os.path.join(VECTOR_DB_PERSIST_DIR, "store.json"))
assert index_name in self.index_map, f"No such index: '{index_name}' exists."
storage_context = self.index_map[index_name].storage_context
# Persist the specific index
storage_context.persist(persist_dir=os.path.join(VECTOR_DB_PERSIST_DIR, index_name))
await storage_context.persist(persist_dir=os.path.join(VECTOR_DB_PERSIST_DIR, index_name))
logger.info(f"Successfully persisted index {index_name}.")
except Exception as e:
logger.error(f"Failed to persist index {index_name}. Error: {str(e)}")
4 changes: 2 additions & 2 deletions presets/ragengine/vector_store/chromadb_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ def __init__(self, embedding_manager):
super().__init__(embedding_manager)
self.chroma_client = chromadb.EphemeralClient()

def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
async def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
chroma_collection = self.chroma_client.create_collection(index_name)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
return self._create_index_common(index_name, documents, vector_store)
return await self._create_index_common(index_name, documents, vector_store)

def document_exists(self, index_name: str, doc: Document, doc_id: str) -> bool:
"""ChromaDB for checking document existence."""
Expand Down
4 changes: 2 additions & 2 deletions presets/ragengine/vector_store/faiss_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self, embedding_manager):
super().__init__(embedding_manager)
self.dimension = self.embedding_manager.get_embedding_dimension()

def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
async def _create_new_index(self, index_name: str, documents: List[Document]) -> List[str]:
faiss_index = faiss.IndexFlatL2(self.dimension)
vector_store = FaissVectorStore(faiss_index=faiss_index)
return self._create_index_common(index_name, documents, vector_store)
return await self._create_index_common(index_name, documents, vector_store)
4 changes: 2 additions & 2 deletions presets/ragengine/vector_store_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ class VectorStoreManager:
def __init__(self, vector_store: BaseVectorStore):
self.vector_store = vector_store

def index(self, index_name: str, documents: List[Document]) -> List[str]:
async def index(self, index_name: str, documents: List[Document]) -> List[str]:
"""Index new documents."""
return self.vector_store.index_documents(index_name, documents)
return await self.vector_store.index_documents(index_name, documents)

async def query(self,
index_name: str,
Expand Down

0 comments on commit c111179

Please sign in to comment.