Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update comments #2

Merged
merged 6 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 145 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,21 @@
[![Integration Tests](https://github.com/langchain-ai/memory-template/actions/workflows/integration-tests.yml/badge.svg)](https://github.com/langchain-ai/memory-template/actions/workflows/integration-tests.yml)
[![Open in - LangGraph Studio](https://img.shields.io/badge/Open_in-LangGraph_Studio-00324d.svg?logo=data:image/svg%2bxml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI4NS4zMzMiIGhlaWdodD0iODUuMzMzIiB2ZXJzaW9uPSIxLjAiIHZpZXdCb3g9IjAgMCA2NCA2NCI+PHBhdGggZD0iTTEzIDcuOGMtNi4zIDMuMS03LjEgNi4zLTYuOCAyNS43LjQgMjQuNi4zIDI0LjUgMjUuOSAyNC41QzU3LjUgNTggNTggNTcuNSA1OCAzMi4zIDU4IDcuMyA1Ni43IDYgMzIgNmMtMTIuOCAwLTE2LjEuMy0xOSAxLjhtMzcuNiAxNi42YzIuOCAyLjggMy40IDQuMiAzLjQgNy42cy0uNiA0LjgtMy40IDcuNkw0Ny4yIDQzSDE2LjhsLTMuNC0zLjRjLTQuOC00LjgtNC44LTEwLjQgMC0xNS4ybDMuNC0zLjRoMzAuNHoiLz48cGF0aCBkPSJNMTguOSAyNS42Yy0xLjEgMS4zLTEgMS43LjQgMi41LjkuNiAxLjcgMS44IDEuNyAyLjcgMCAxIC43IDIuOCAxLjYgNC4xIDEuNCAxLjkgMS40IDIuNS4zIDMuMi0xIC42LS42LjkgMS40LjkgMS41IDAgMi43LS41IDIuNy0xIDAtLjYgMS4xLS44IDIuNi0uNGwyLjYuNy0xLjgtMi45Yy01LjktOS4zLTkuNC0xMi4zLTExLjUtOS44TTM5IDI2YzAgMS4xLS45IDIuNS0yIDMuMi0yLjQgMS41LTIuNiAzLjQtLjUgNC4yLjguMyAyIDEuNyAyLjUgMy4xLjYgMS41IDEuNCAyLjMgMiAyIDEuNS0uOSAxLjItMy41LS40LTMuNS0yLjEgMC0yLjgtMi44LS44LTMuMyAxLjYtLjQgMS42LS41IDAtLjYtMS4xLS4xLTEuNS0uNi0xLjItMS42LjctMS43IDMuMy0yLjEgMy41LS41LjEuNS4yIDEuNi4zIDIuMiAwIC43LjkgMS40IDEuOSAxLjYgMi4xLjQgMi4zLTIuMy4yLTMuMi0uOC0uMy0yLTEuNy0yLjUtMy4xLTEuMS0zLTMtMy4zLTMtLjUiLz48L3N2Zz4=)](https://langgraph-studio.vercel.app/templates/open?githubUrl=https://github.com/langchain-ai/memory-template)

This repo provides a simple example of a long-term memory service you can build and deploy using LangGraph.
## Motivation

This graph extracts memories from chat interactions and persists them to its store. This information can later be read via the API to provide personalized context when your bot is responding to a particular user.
Memory is a powerful way to improve and personalize AI applications. As an example, memory can be used to store user-specific information across multiple interactions with that user. But, it can also extend to any information that you may want to preserve across multiple interactions with an application. This template show how you can build and deploy a long-term memory service using LangGraph by combining a memory service with a simple chatbot application.

The memory graph handles debouncing when processing individual conversations (to help deduplicate work) and supports continuous updates to a single "memory schema" as well as "event-based" memories that can be fetched by recency and filtered.
![Motivation](./static/memory_motivation.png)

This repo also provides an example chat bot (in this case, also a simple graph) that connects to the memory graph via the SDK.
Any time you send a message to the chat bot, it will query the memory service to fetch the most up-to-date memories (if any) for the configured user. These memories are put in the system prompt. After responding, it will post the conversation to the memory service to schedule long-term memory formation.
## Quickstart

This separation of concerns provides minimal overhead, allows deduplication of memory processing, and ensures you can optimize for better recall.

![Memory Diagram](./static/memory_graph.png)

## Getting Started

This quickstart will get your memory service deployed on [LangGraph Cloud](https://langchain-ai.github.io/langgraph/cloud/). Once created, you can interact with it from any API.

Assuming you have already [installed LangGraph Studio](https://github.com/langchain-ai/langgraph-studio?tab=readme-ov-file#download), to set up:

1. Create a `.env` file.
Create a `.env` file.

```bash
cp .env.example .env
```

2. Define required API keys in your `.env` file.
Set the required API keys in your `.env` file.

<!--
Setup instruction auto-generated by `langgraph template lock`. DO NOT EDIT MANUALLY.
Expand Down Expand Up @@ -65,27 +54,157 @@ To use OpenAI's chat models:
OPENAI_API_KEY=your-api-key
```

<!--
End setup instructions
-->

If you want to test the memory service locally, [install the LangGraph Studio desktop app](https://github.com/langchain-ai/langgraph-studio?tab=readme-ov-file#download).

If you want to test in the cloud, [follow these instructions to deploy this repository to LangGraph Cloud](https://langchain-ai.github.io/langgraph/cloud/) and use Studio in your browser.

Open this repository in LangGraph studio and navigate to the `chatbot` graph.

<!--
End setup instructions
-->
Optionally, you can set your `user_id`, `model`, or other configurations directly in the Studio UI.

![Flow](./static/studio.png)

Try sending some messages saying your name and other things the bot should remember.

Wait ~10-20 seconds for memories to be created and saved.

3. Open in LangGraph studio. Navigate to the "`chatbot`" graph and have a conversation with it! Try sending some messages saying your name and other things the bot should remember.
Create a *new* thread using the `+` icon.

Wait ~10-20 seconds and then create a *new* thread using the `+` icon. Then chat with the bot again - if you've completed your setup correctly, the bot should now have access to the memories you've saved!
Then chat with the bot again.

The bot should have access to the memories you've saved, and will use them to personalize its responses.

## How it works

This chat bot reads from your memory graph's `Store` to easily list extracted memories.
There are several problems to solve when building a memory service:

Connecting to this type of memory service typically follows an interaction pattern similar to the one outlined below:
1. What should each memory contain?
2. How should memories be updated?
3. How frequently should memories be updated or created?
4. Where should memories be stored?
5. How to call the memory service from our application?

![Interaction Pattern](./static/memory_interactions.png)
We'll address these challenges below, and explain how this LangGraph template approaches them.

### Memory Schema

The memory schema defines what each memory will contain.

By default, this template uses two memory schemas: `User` and `Note`. Both are JSON schemas.

The schemas are defined in [memory_graph/configuration.py](./src/memory_graph/configuration.py).

The `User` schema is used to store a single profile for a user with a set of predefined properties (e.g., name, age, interests).

The `Note` schema is more flexible, containing a `context` and `content` field to capture any type of information as a memory.

These schemas are customizable! You can create new schemas, or add / remove properties as needed for your application.

### Memory Updates

These memory schemas need to be updated with new information over time.

The `User` schema is a single JSON object.

We want to update it with new information about the user as the conversation progresses.

Each `Note`, in contrast, is captured in a list.

We want the flexibility to update existing `Note` schemas or update the list with new ones.

We use the [`trustcall` library](https://github.com/hinthornw/trustcall) to do both of these types of updates.

This is a library that we created for updating JSON schemas via a LLM.

These updates are performed in [memory_graph/graph.py](./src/memory_graph/graph.py).

The `memory_graph` saves both types of memories.

We can see the graph here in the LangGraph Studio, with a branch for each of the defined memory schemas:

* `handle_patch_memory` is for `User` schema memories.
* `handle_insert_memory` is for `Note` schema memories.

![Memory Diagram](./static/memory_graph.png)

The service waits for a pre-determined interval before it considers the thread "complete". If the user queries a second time within that interval, the memory run is cancelled to avoid duplicate processing of a thread.
### Memory Scheduling

Memory updates need to be scheduled to avoid duplicate processing.

Ideally, we want to wait until a chat is complete before we create memories.

But, we don't know when a chat session will end.

So, we wait a pre-determined interval before invoking the memory graph to memories to the storage layer.

If the chatbot makes a call second time within that interval, the initial memory is cancelled.

Scheduling is handled by the LangGraph SDK's `after_seconds` parameter.

We call the `memory_graph` from our application (e.g., `chatbot`) using the LangGraph SDK in [chatbot/graph.py](./src/chatbot/graph.py).

![DeBounce](./static/scheduling.png)

### Memory Storage

The LangGraph API comes with a built-in memory storage layer that can be used to store and retrieve information across threads.

Learn more about the Memory Storage layer [here](https://langchain-ai.github.io/langgraph/how-tos/memory/shared-state/).

Importantly, the memory storage layer is namespaced by a tuple; in this case, we use the `user_id` as well as the schema name.

In addition, the memory storage layer is accessible to both the `chatbot` and the `memory_graph` in all graph nodes.

This diagram shows how these pieces fit together:

![Memory types](./static/memory_types.png)

### Calling the Memory Service

Studio uses the LangGraph API as its backend, packaging the specified code repository with the storage layer.

The `langgraph.json` file is used to configure the LangGraph API specifies the graphs to be run in Studio:

```json
"graphs": {
"chatbot": "./src/chatbot/graph.py:graph",
"memory_graph": "./src/memory_graph/graph.py:graph"
},
```

The chatbot can directly access all stored memories when it's preparing responses for the user.

You can see this in the in the `bot` node in [chatbot/graph.py](./src/chatbot/graph.py):

```python
items = await store.asearch(namespace)
```

To schedule creation of new memories, the chatbot can use the LangGraph SDK to access the memory graph.

This is done in the `schedule_memories` node in [chatbot/graph.py](./src/chatbot/graph.py)

This passes the chatbot's interaction with the user along with the scheduling parameter, `after_seconds`, to the `memory_graph`.

![Flow](./static/memory_template_flow.png)

## Benefits

The separation of concerns between the application logic (chatbot) and the memory (the memory graph) a few advantages:

(1) minimal overhead by removing memory creation logic from the hotpath of the application (e.g., no latency cost for memory creation)

(2) memory creation logic is handled in a background job, separate from the chatbot, with scheduling to avoid duplicate processing

(3) memory graph can be updated and / or hosted (as a service) independently of the application (chatbot)

Here is a schematic of the interaction pattern:

![Interaction Pattern](./static/memory_interactions.png)

## How to evaluate

Expand Down
65 changes: 56 additions & 9 deletions src/memory_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,62 +23,98 @@ async def handle_patch_memory(
state: ProcessorState, config: RunnableConfig, *, store: BaseStore
) -> dict:
"""Extract the user's state from the conversation and update the memory."""
# Get the overall configuration
configurable = configuration.Configuration.from_runnable_config(config)

# Namespace for memory events, where function_name is the name of the memory schema
namespace = (configurable.user_id, "user_states", state.function_name)

# Fetch existing memories from the store for this (patch) memory schema
existing_item = await store.aget(namespace, "memory")
existing = {existing_item.key: existing_item.value} if existing_item else None

# Get the configuration for this memory schema (identified by function_name)
memory_config = next(
conf for conf in configurable.memory_types if conf.name == state.function_name
)

# This is what we use to generate new memories
extractor = create_extractor(
utils.init_model(configurable.model),
# We pass the specified (patch) memory schema as a tool
tools=[
{
# Tool name
"name": memory_config.name,
# Tool description
"description": memory_config.description,
# Schema for patch memory
"parameters": memory_config.parameters,
}
],
tool_choice=memory_config.name,
)

# Prepare the messages
prepared_messages = utils.prepare_messages(
state.messages, memory_config.system_prompt
)

# Pass messages and existing patch to the extractor
inputs = {"messages": prepared_messages, "existing": existing}
# Update the patch memory
result = await extractor.ainvoke(inputs, config)
extracted = result["responses"][0].model_dump(mode="json")
# Upsert the memory to storage
# Save to storage
await store.aput(namespace, "memory", extracted)
return {"messages": []}


async def handle_insertion_memory(
state: ProcessorState, config: RunnableConfig, *, store: BaseStore
) -> dict[str, list]:
"""Upsert memory events."""
"""Handle insertion memory events."""
# Get the overall configuration
configurable = configuration.Configuration.from_runnable_config(config)

# Namespace for memory events, where function_name is the name of the memory schema
namespace = (configurable.user_id, "events", state.function_name)

# Fetch existing memories from the store (5 most recent ones) for the this (insert) memory schema
existing_items = await store.asearch(namespace, limit=5)

# Get the configuration for this memory schema (identified by function_name)
memory_config = next(
conf for conf in configurable.memory_types if conf.name == state.function_name
)

# This is what we use to generate new memories
extractor = create_extractor(
utils.init_model(configurable.model),
# We pass the specified (insert) memory schema as a tool
tools=[
{
# Tool name
"name": memory_config.name,
# Tool description
"description": memory_config.description,
# Schema for insert memory
"parameters": memory_config.parameters,
}
],
tool_choice="any",
# This allows the extractor to insert new memories
enable_inserts=True,
)

# Generate new memories or update existing memories
extracted = await extractor.ainvoke(
{
# Prepare the messages
"messages": utils.prepare_messages(
state.messages, memory_config.system_prompt
),
# Prepare the existing memories
"existing": (
[
(existing_item.key, state.function_name, existing_item.value)
Expand All @@ -90,6 +126,8 @@ async def handle_insertion_memory(
},
config,
)

# Add the memories to storage
await asyncio.gather(
*(
store.aput(
Expand All @@ -103,31 +141,40 @@ async def handle_insertion_memory(
return {"messages": []}


# Create the graph + all nodes
# Create the graph and all nodes
builder = StateGraph(State, config_schema=configuration.Configuration)

builder.add_node(handle_patch_memory, input=ProcessorState)
builder.add_node(handle_insertion_memory, input=ProcessorState)


def scatter_schemas(state: State, config: RunnableConfig) -> list[Send]:
"""Route the memory_types for the memory assistant.
"""Iterate over all memory types in the configuration.

It will route each memory type from configuration to the corresponding memory update node.

These will be executed in parallel.
The memory update nodes will be executed in parallel.
"""
# Get the configuration
configurable = configuration.Configuration.from_runnable_config(config)
sends = []
current_state = asdict(state)

# Loop over all memory types specified in the configuration
for v in configurable.memory_types:
update_mode = v.update_mode

# This specifies the type of memory update to perform from the configuration
match update_mode:
case "patch":
# This is the corresponding node in the graph for the patch-based memory update
target = "handle_patch_memory"
case "insert":
# This is the corresponding node in the graph for the insert-based memory update
target = "handle_insertion_memory"
case _:
raise ValueError(f"Unknown update mode: {update_mode}")

# Use Send API to route to the target node and pass the name of the memory schema as function_name
# Send API allows each memory node to be executed in parallel
sends.append(
Send(
target,
Expand All @@ -137,11 +184,11 @@ def scatter_schemas(state: State, config: RunnableConfig) -> list[Send]:
return sends


# Add conditional edges to the graph
builder.add_conditional_edges(
"__start__", scatter_schemas, ["handle_patch_memory", "handle_insertion_memory"]
)

# Compile the graph
graph = builder.compile()


__all__ = ["graph"]
Binary file added static/memory_motivation.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/memory_template_flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/memory_types.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/scheduling.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/studio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 1 addition & 2 deletions tests/integration_tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
import langsmith as ls
import pytest
from langgraph.store.memory import InMemoryStore
from pydantic import BaseModel, Field

from memory_graph.graph import builder
from pydantic import BaseModel, Field


class User(BaseModel):
Expand Down
Loading