diff --git a/README.md b/README.md index 8893af6..5e3c628 100644 --- a/README.md +++ b/README.md @@ -4,32 +4,21 @@ [![Integration Tests](https://github.com/langchain-ai/memory-template/actions/workflows/integration-tests.yml/badge.svg)](https://github.com/langchain-ai/memory-template/actions/workflows/integration-tests.yml) [![Open in - LangGraph Studio](https://img.shields.io/badge/Open_in-LangGraph_Studio-00324d.svg?logo=)](https://langgraph-studio.vercel.app/templates/open?githubUrl=https://github.com/langchain-ai/memory-template) -This repo provides a simple example of a long-term memory service you can build and deploy using LangGraph. +## Motivation -This graph extracts memories from chat interactions and persists them to its store. This information can later be read via the API to provide personalized context when your bot is responding to a particular user. +Memory is a powerful way to improve and personalize AI applications. As an example, memory can be used to store user-specific information across multiple interactions with that user. But, it can also extend to any information that you may want to preserve across multiple interactions with an application. This template show how you can build and deploy a long-term memory service using LangGraph by combining a memory service with a simple chatbot application. -The memory graph handles debouncing when processing individual conversations (to help deduplicate work) and supports continuous updates to a single "memory schema" as well as "event-based" memories that can be fetched by recency and filtered. +![Motivation](./static/memory_motivation.png) -This repo also provides an example chat bot (in this case, also a simple graph) that connects to the memory graph via the SDK. -Any time you send a message to the chat bot, it will query the memory service to fetch the most up-to-date memories (if any) for the configured user. These memories are put in the system prompt. After responding, it will post the conversation to the memory service to schedule long-term memory formation. +## Quickstart -This separation of concerns provides minimal overhead, allows deduplication of memory processing, and ensures you can optimize for better recall. - -![Memory Diagram](./static/memory_graph.png) - -## Getting Started - -This quickstart will get your memory service deployed on [LangGraph Cloud](https://langchain-ai.github.io/langgraph/cloud/). Once created, you can interact with it from any API. - -Assuming you have already [installed LangGraph Studio](https://github.com/langchain-ai/langgraph-studio?tab=readme-ov-file#download), to set up: - -1. Create a `.env` file. +Create a `.env` file. ```bash cp .env.example .env ``` -2. Define required API keys in your `.env` file. +Set the required API keys in your `.env` file. +If you want to test the memory service locally, [install the LangGraph Studio desktop app](https://github.com/langchain-ai/langgraph-studio?tab=readme-ov-file#download). +If you want to test in the cloud, [follow these instructions to deploy this repository to LangGraph Cloud](https://langchain-ai.github.io/langgraph/cloud/) and use Studio in your browser. +Open this repository in LangGraph studio and navigate to the `chatbot` graph. - +Optionally, you can set your `user_id`, `model`, or other configurations directly in the Studio UI. + +![Flow](./static/studio.png) + +Try sending some messages saying your name and other things the bot should remember. + +Wait ~10-20 seconds for memories to be created and saved. -3. Open in LangGraph studio. Navigate to the "`chatbot`" graph and have a conversation with it! Try sending some messages saying your name and other things the bot should remember. +Create a *new* thread using the `+` icon. -Wait ~10-20 seconds and then create a *new* thread using the `+` icon. Then chat with the bot again - if you've completed your setup correctly, the bot should now have access to the memories you've saved! +Then chat with the bot again. + +The bot should have access to the memories you've saved, and will use them to personalize its responses. ## How it works -This chat bot reads from your memory graph's `Store` to easily list extracted memories. +There are several problems to solve when building a memory service: -Connecting to this type of memory service typically follows an interaction pattern similar to the one outlined below: +1. What should each memory contain? +2. How should memories be updated? +3. How frequently should memories be updated or created? +4. Where should memories be stored? +5. How to call the memory service from our application? -![Interaction Pattern](./static/memory_interactions.png) +We'll address these challenges below, and explain how this LangGraph template approaches them. + +### Memory Schema + +The memory schema defines what each memory will contain. + +By default, this template uses two memory schemas: `User` and `Note`. Both are JSON schemas. + +The schemas are defined in [memory_graph/configuration.py](./src/memory_graph/configuration.py). + +The `User` schema is used to store a single profile for a user with a set of predefined properties (e.g., name, age, interests). + +The `Note` schema is more flexible, containing a `context` and `content` field to capture any type of information as a memory. + +These schemas are customizable! You can create new schemas, or add / remove properties as needed for your application. + +### Memory Updates + +These memory schemas need to be updated with new information over time. + +The `User` schema is a single JSON object. + +We want to update it with new information about the user as the conversation progresses. + +Each `Note`, in contrast, is captured in a list. + +We want the flexibility to update existing `Note` schemas or update the list with new ones. + +We use the [`trustcall` library](https://github.com/hinthornw/trustcall) to do both of these types of updates. + +This is a library that we created for updating JSON schemas via a LLM. + +These updates are performed in [memory_graph/graph.py](./src/memory_graph/graph.py). + +The `memory_graph` saves both types of memories. + +We can see the graph here in the LangGraph Studio, with a branch for each of the defined memory schemas: + +* `handle_patch_memory` is for `User` schema memories. +* `handle_insert_memory` is for `Note` schema memories. + +![Memory Diagram](./static/memory_graph.png) -The service waits for a pre-determined interval before it considers the thread "complete". If the user queries a second time within that interval, the memory run is cancelled to avoid duplicate processing of a thread. +### Memory Scheduling + +Memory updates need to be scheduled to avoid duplicate processing. + +Ideally, we want to wait until a chat is complete before we create memories. + +But, we don't know when a chat session will end. + +So, we wait a pre-determined interval before invoking the memory graph to memories to the storage layer. + +If the chatbot makes a call second time within that interval, the initial memory is cancelled. + +Scheduling is handled by the LangGraph SDK's `after_seconds` parameter. + +We call the `memory_graph` from our application (e.g., `chatbot`) using the LangGraph SDK in [chatbot/graph.py](./src/chatbot/graph.py). + +![DeBounce](./static/scheduling.png) + +### Memory Storage + +The LangGraph API comes with a built-in memory storage layer that can be used to store and retrieve information across threads. + +Learn more about the Memory Storage layer [here](https://langchain-ai.github.io/langgraph/how-tos/memory/shared-state/). + +Importantly, the memory storage layer is namespaced by a tuple; in this case, we use the `user_id` as well as the schema name. + +In addition, the memory storage layer is accessible to both the `chatbot` and the `memory_graph` in all graph nodes. + +This diagram shows how these pieces fit together: + +![Memory types](./static/memory_types.png) + +### Calling the Memory Service + +Studio uses the LangGraph API as its backend, packaging the specified code repository with the storage layer. + +The `langgraph.json` file is used to configure the LangGraph API specifies the graphs to be run in Studio: + +```json + "graphs": { + "chatbot": "./src/chatbot/graph.py:graph", + "memory_graph": "./src/memory_graph/graph.py:graph" + }, +``` + +The chatbot can directly access all stored memories when it's preparing responses for the user. + +You can see this in the in the `bot` node in [chatbot/graph.py](./src/chatbot/graph.py): + +```python +items = await store.asearch(namespace) +``` + +To schedule creation of new memories, the chatbot can use the LangGraph SDK to access the memory graph. + +This is done in the `schedule_memories` node in [chatbot/graph.py](./src/chatbot/graph.py) + +This passes the chatbot's interaction with the user along with the scheduling parameter, `after_seconds`, to the `memory_graph`. + +![Flow](./static/memory_template_flow.png) + +## Benefits + +The separation of concerns between the application logic (chatbot) and the memory (the memory graph) a few advantages: + +(1) minimal overhead by removing memory creation logic from the hotpath of the application (e.g., no latency cost for memory creation) + +(2) memory creation logic is handled in a background job, separate from the chatbot, with scheduling to avoid duplicate processing + +(3) memory graph can be updated and / or hosted (as a service) independently of the application (chatbot) + +Here is a schematic of the interaction pattern: + +![Interaction Pattern](./static/memory_interactions.png) ## How to evaluate diff --git a/src/memory_graph/graph.py b/src/memory_graph/graph.py index 9509698..a0b686a 100644 --- a/src/memory_graph/graph.py +++ b/src/memory_graph/graph.py @@ -23,31 +23,49 @@ async def handle_patch_memory( state: ProcessorState, config: RunnableConfig, *, store: BaseStore ) -> dict: """Extract the user's state from the conversation and update the memory.""" + # Get the overall configuration configurable = configuration.Configuration.from_runnable_config(config) + + # Namespace for memory events, where function_name is the name of the memory schema namespace = (configurable.user_id, "user_states", state.function_name) + + # Fetch existing memories from the store for this (patch) memory schema existing_item = await store.aget(namespace, "memory") existing = {existing_item.key: existing_item.value} if existing_item else None + + # Get the configuration for this memory schema (identified by function_name) memory_config = next( conf for conf in configurable.memory_types if conf.name == state.function_name ) + + # This is what we use to generate new memories extractor = create_extractor( utils.init_model(configurable.model), + # We pass the specified (patch) memory schema as a tool tools=[ { + # Tool name "name": memory_config.name, + # Tool description "description": memory_config.description, + # Schema for patch memory "parameters": memory_config.parameters, } ], tool_choice=memory_config.name, ) + + # Prepare the messages prepared_messages = utils.prepare_messages( state.messages, memory_config.system_prompt ) + + # Pass messages and existing patch to the extractor inputs = {"messages": prepared_messages, "existing": existing} + # Update the patch memory result = await extractor.ainvoke(inputs, config) extracted = result["responses"][0].model_dump(mode="json") - # Upsert the memory to storage + # Save to storage await store.aput(namespace, "memory", extracted) return {"messages": []} @@ -55,30 +73,48 @@ async def handle_patch_memory( async def handle_insertion_memory( state: ProcessorState, config: RunnableConfig, *, store: BaseStore ) -> dict[str, list]: - """Upsert memory events.""" + """Handle insertion memory events.""" + # Get the overall configuration configurable = configuration.Configuration.from_runnable_config(config) + + # Namespace for memory events, where function_name is the name of the memory schema namespace = (configurable.user_id, "events", state.function_name) + + # Fetch existing memories from the store (5 most recent ones) for the this (insert) memory schema existing_items = await store.asearch(namespace, limit=5) + + # Get the configuration for this memory schema (identified by function_name) memory_config = next( conf for conf in configurable.memory_types if conf.name == state.function_name ) + + # This is what we use to generate new memories extractor = create_extractor( utils.init_model(configurable.model), + # We pass the specified (insert) memory schema as a tool tools=[ { + # Tool name "name": memory_config.name, + # Tool description "description": memory_config.description, + # Schema for insert memory "parameters": memory_config.parameters, } ], tool_choice="any", + # This allows the extractor to insert new memories enable_inserts=True, ) + + # Generate new memories or update existing memories extracted = await extractor.ainvoke( { + # Prepare the messages "messages": utils.prepare_messages( state.messages, memory_config.system_prompt ), + # Prepare the existing memories "existing": ( [ (existing_item.key, state.function_name, existing_item.value) @@ -90,6 +126,8 @@ async def handle_insertion_memory( }, config, ) + + # Add the memories to storage await asyncio.gather( *( store.aput( @@ -103,31 +141,40 @@ async def handle_insertion_memory( return {"messages": []} -# Create the graph + all nodes +# Create the graph and all nodes builder = StateGraph(State, config_schema=configuration.Configuration) - builder.add_node(handle_patch_memory, input=ProcessorState) builder.add_node(handle_insertion_memory, input=ProcessorState) - def scatter_schemas(state: State, config: RunnableConfig) -> list[Send]: - """Route the memory_types for the memory assistant. + """Iterate over all memory types in the configuration. + + It will route each memory type from configuration to the corresponding memory update node. - These will be executed in parallel. + The memory update nodes will be executed in parallel. """ + # Get the configuration configurable = configuration.Configuration.from_runnable_config(config) sends = [] current_state = asdict(state) + + # Loop over all memory types specified in the configuration for v in configurable.memory_types: update_mode = v.update_mode + + # This specifies the type of memory update to perform from the configuration match update_mode: case "patch": + # This is the corresponding node in the graph for the patch-based memory update target = "handle_patch_memory" case "insert": + # This is the corresponding node in the graph for the insert-based memory update target = "handle_insertion_memory" case _: raise ValueError(f"Unknown update mode: {update_mode}") + # Use Send API to route to the target node and pass the name of the memory schema as function_name + # Send API allows each memory node to be executed in parallel sends.append( Send( target, @@ -137,11 +184,11 @@ def scatter_schemas(state: State, config: RunnableConfig) -> list[Send]: return sends +# Add conditional edges to the graph builder.add_conditional_edges( "__start__", scatter_schemas, ["handle_patch_memory", "handle_insertion_memory"] ) +# Compile the graph graph = builder.compile() - - __all__ = ["graph"] diff --git a/static/memory_motivation.png b/static/memory_motivation.png new file mode 100644 index 0000000..5d1ee54 Binary files /dev/null and b/static/memory_motivation.png differ diff --git a/static/memory_template_flow.png b/static/memory_template_flow.png new file mode 100644 index 0000000..3671191 Binary files /dev/null and b/static/memory_template_flow.png differ diff --git a/static/memory_types.png b/static/memory_types.png new file mode 100644 index 0000000..fa85df3 Binary files /dev/null and b/static/memory_types.png differ diff --git a/static/scheduling.png b/static/scheduling.png new file mode 100644 index 0000000..074740e Binary files /dev/null and b/static/scheduling.png differ diff --git a/static/studio.png b/static/studio.png new file mode 100644 index 0000000..4ba7d0b Binary files /dev/null and b/static/studio.png differ diff --git a/tests/integration_tests/test_graph.py b/tests/integration_tests/test_graph.py index 759b16b..f0ee4a5 100644 --- a/tests/integration_tests/test_graph.py +++ b/tests/integration_tests/test_graph.py @@ -5,9 +5,8 @@ import langsmith as ls import pytest from langgraph.store.memory import InMemoryStore -from pydantic import BaseModel, Field - from memory_graph.graph import builder +from pydantic import BaseModel, Field class User(BaseModel):