diff --git a/vortex/api/data_models/__init__.py b/vortex/api/data_models/__init__.py index 5ee4d24..0ea2153 100644 --- a/vortex/api/data_models/__init__.py +++ b/vortex/api/data_models/__init__.py @@ -2,7 +2,8 @@ import os from datetime import datetime -from sqlalchemy import Column, DateTime, Integer, String, Text, create_engine +from sqlalchemy import (JSON, Column, DateTime, Integer, String, Text, + create_engine) from sqlalchemy.engine import URL from sqlalchemy.orm import declarative_base, sessionmaker @@ -31,6 +32,12 @@ class Conversation(Base): response = Column(String) created_at = Column(DateTime, default=datetime.utcnow) +class ChatsHistory(Base): + __tablename__ = 'chats_history' + # id = Column(Integer, primary_key=True, index=True) + sender = Column(String, primary_key=True, index=True) + history = Column(Text) + updated_at = Column(DateTime, default=datetime.utcnow) Base.metadata.create_all(engine) diff --git a/vortex/io/twilio.py b/vortex/io/twilio.py index a18eb29..ef54d4c 100644 --- a/vortex/io/twilio.py +++ b/vortex/io/twilio.py @@ -1,19 +1,28 @@ +#%% +import base64 import logging import os +import pickle import weakref from typing import Dict from dotenv import load_dotenv from fastapi import Depends, FastAPI, Form, Request +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from twilio.rest import Client from vortex.ai.agents import VortexAgent -from vortex.api.data_models import Conversation, SessionLocal, get_db +from vortex.api.data_models import (ChatsHistory, Conversation, SessionLocal, + get_db) load_dotenv() +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + # Find your Account SID and Auth Token at twilio.com/console # and set the environment variables. See http://twil.io/secure account_sid = os.environ.get("TWILIO_ACCOUNT_SID") @@ -21,20 +30,21 @@ client = Client(account_sid, auth_token) twilio_number = "+14155238886" or os.environ.get("TWILIO_NUMBER") -# Set up logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# agent = VortexAgent() # TODO: should be linked to a specific user/context/session - agents: Dict[str, weakref.ref] = weakref.WeakValueDictionary() def get_or_create_agent(phone_number: str, db) -> VortexAgent: agent = agents.get(phone_number) - if agent is None: - chat_history = get_chat_history(db, phone_number) or [] + chat_history = get_chat_history(db, phone_number) + if agent is not None and chat_history: # Same session stil kept + print(f'Using existing agent {agent}') + ... + elif agent is None and chat_history: # New session but existing user agent = VortexAgent(context=chat_history) # Initialize a new agent instance - agents[phone_number] = agent # Store it using the phone number as the key + print(f'using reloaded agent with history {chat_history}') + elif agent is None and not chat_history: + agent = VortexAgent() + print('using a new agent') + agents[phone_number] = agent return agent @@ -47,14 +57,41 @@ def store_message(whatsapp_number, Body, langchain_response, db): logger.info(f"Conversation #{conversation.id} stored in database") +def store_chat_history(whatsapp_number, agent_history, db): + history = pickle.dumps(agent_history) + # Upsert statement + stmt = ( + insert(ChatsHistory) + .values( + sender=whatsapp_number, + history=history, + ) + .on_conflict_do_update( + index_elements=["sender"], # Specify the conflict target + set_={"history": history}, # Update these fields upon conflict + ) + ) + # Execute the upsert + db.execute(stmt) + db.commit() + logger.info(f"Upsert chat history for user {whatsapp_number}") + + def get_chat_history(db_session, phone_number: str) -> list: - return ( - db_session.query(Conversation) - .filter(Conversation.phone_number == phone_number) - .order_by(Conversation.created_at.asc()) + history = ( + db_session.query(ChatsHistory) + .filter(ChatsHistory.sender == phone_number) + .order_by(ChatsHistory.updated_at.asc()) .all() or [] ) + if not history: + return [] + chat_history = str(history[0]) + print(chat_history) + loaded = pickle.loads(chat_history) + print(f'loaded history {loaded}') + return loaded def send_message(to_number, body_text): @@ -82,9 +119,12 @@ async def handle_wapp_message( # Store the conversation in the database try: store_message(whatsapp_number, Body, langchain_response, db) + store_chat_history(whatsapp_number, agent.chat_history, db) except SQLAlchemyError as e: db.rollback() logger.error(f"Error storing conversation in database: {e}") # Lastly, send message back to user send_message(whatsapp_number, langchain_response) return {"response": langchain_response} + +# %%