diff --git a/docs/docs/how-tos/map-reduce.ipynb b/docs/docs/how-tos/map-reduce.ipynb index 51a2eb14d..fb00f0bfb 100644 --- a/docs/docs/how-tos/map-reduce.ipynb +++ b/docs/docs/how-tos/map-reduce.ipynb @@ -1,286 +1,286 @@ { - "cells": [ - { - "attachments": { - "a108ffc8-6136-4cd7-a6f9-579e41a5a786.png": { - "image/png": "" - } - }, - "cell_type": "markdown", - "id": "95a87145-34d0-4f97-b45f-5c9fd8532c8a", - "metadata": {}, - "source": [ - "# How to create map-reduce branches for parallel execution\n", - "\n", - "[Map-reduce](https://en.wikipedia.org/wiki/MapReduce) operations are essential for efficient task decomposition and parallel processing. This approach involves breaking a task into smaller sub-tasks, processing each sub-task in parallel, and aggregating the results across all of the completed sub-tasks. \n", - "\n", - "Consider this example: given a general topic from the user, generate a list of related subjects, generate a joke for each subject, and select the best joke from the resulting list. In this design pattern, a first node may generate a list of objects (e.g., related subjects) and we want to apply some other node (e.g., generate a joke) to all those objects (e.g., subjects). However, two main challenges arise.\n", - " \n", - "(1) the number of objects (e.g., subjects) may be unknown ahead of time (meaning the number of edges may not be known) when we lay out the graph and (2) the input State to the downstream Node should be different (one for each generated object).\n", - " \n", - "LangGraph addresses these challenges [through its `Send` API](https://langchain-ai.github.io/langgraph/concepts/low_level/#send). By utilizing conditional edges, `Send` can distribute different states (e.g., subjects) to multiple instances of a node (e.g., joke generation). Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management. \n", - "\n", - "![Screenshot 2024-07-12 at 9.45.40 AM.png](attachment:a108ffc8-6136-4cd7-a6f9-579e41a5a786.png)" - ] - }, - { - "cell_type": "markdown", - "id": "66c58b5f", - "metadata": {}, - "source": [ - "## Setup\n", - "\n", - "First, let's install the required packages and set our API keys" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "3eb04cd1", - "metadata": {}, - "outputs": [], - "source": [ - "%%capture --no-stderr\n", - "%pip install -U langchain-anthropic langgraph" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dc292321", - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "import getpass\n", - "\n", - "\n", - "def _set_env(name: str):\n", - " if not os.getenv(name):\n", - " os.environ[name] = getpass.getpass(f\"{name}: \")\n", - "\n", - "\n", - "_set_env(\"ANTHROPIC_API_KEY\")" - ] - }, - { - "cell_type": "markdown", - "id": "b87911bb", - "metadata": {}, - "source": [ - "
\n", - "

Set up LangSmith for LangGraph development

\n", - "

\n", - " Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here. \n", - "

\n", - "
" - ] - }, - { - "cell_type": "markdown", - "id": "b4e782a0", - "metadata": {}, - "source": [ - "## Define the graph" - ] - }, - { - "cell_type": "markdown", - "id": "66803b55", - "metadata": {}, - "source": [ - "
\n", - "

Using Pydantic with LangChain

\n", - "

\n", - " This notebook uses Pydantic v2 BaseModel, which requires langchain-core >= 0.3. Using langchain-core < 0.3 will result in errors due to mixing of Pydantic v1 and v2 BaseModels.\n", - "

\n", - "
" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "0f0f78e4-423d-4e2d-aa1a-01efaec4715f", - "metadata": {}, - "outputs": [], - "source": [ - "import operator\n", - "from typing import Annotated, TypedDict\n", - "\n", - "from langchain_anthropic import ChatAnthropic\n", - "\n", - "from langgraph.constants import Send\n", - "from langgraph.graph import END, StateGraph, START\n", - "\n", - "from pydantic import BaseModel, Field\n", - "\n", - "# Model and prompts\n", - "# Define model and prompts we will use\n", - "subjects_prompt = \"\"\"Generate a comma separated list of between 2 and 5 examples related to: {topic}.\"\"\"\n", - "joke_prompt = \"\"\"Generate a joke about {subject}\"\"\"\n", - "best_joke_prompt = \"\"\"Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.\n", - "\n", - "{jokes}\"\"\"\n", - "\n", - "\n", - "class Subjects(BaseModel):\n", - " subjects: list[str]\n", - "\n", - "\n", - "class Joke(BaseModel):\n", - " joke: str\n", - "\n", - "\n", - "class BestJoke(BaseModel):\n", - " id: int = Field(description=\"Index of the best joke, starting with 0\", ge=0)\n", - "\n", - "\n", - "model = ChatAnthropic(model=\"claude-3-5-sonnet-20240620\")\n", - "\n", - "# Graph components: define the components that will make up the graph\n", - "\n", - "\n", - "# This will be the overall state of the main graph.\n", - "# It will contain a topic (which we expect the user to provide)\n", - "# and then will generate a list of subjects, and then a joke for\n", - "# each subject\n", - "class OverallState(TypedDict):\n", - " topic: str\n", - " subjects: list\n", - " # Notice here we use the operator.add\n", - " # This is because we want combine all the jokes we generate\n", - " # from individual nodes back into one list - this is essentially\n", - " # the \"reduce\" part\n", - " jokes: Annotated[list, operator.add]\n", - " best_selected_joke: str\n", - "\n", - "\n", - "# This will be the state of the node that we will \"map\" all\n", - "# subjects to in order to generate a joke\n", - "class JokeState(TypedDict):\n", - " subject: str\n", - "\n", - "\n", - "# This is the function we will use to generate the subjects of the jokes\n", - "def generate_topics(state: OverallState):\n", - " prompt = subjects_prompt.format(topic=state[\"topic\"])\n", - " response = model.with_structured_output(Subjects).invoke(prompt)\n", - " return {\"subjects\": response.subjects}\n", - "\n", - "\n", - "# Here we generate a joke, given a subject\n", - "def generate_joke(state: JokeState):\n", - " prompt = joke_prompt.format(subject=state[\"subject\"])\n", - " response = model.with_structured_output(Joke).invoke(prompt)\n", - " return {\"jokes\": [response.joke]}\n", - "\n", - "\n", - "# Here we define the logic to map out over the generated subjects\n", - "# We will use this an edge in the graph\n", - "def continue_to_jokes(state: OverallState):\n", - " # We will return a list of `Send` objects\n", - " # Each `Send` object consists of the name of a node in the graph\n", - " # as well as the state to send to that node\n", - " return [Send(\"generate_joke\", {\"subject\": s}) for s in state[\"subjects\"]]\n", - "\n", - "\n", - "# Here we will judge the best joke\n", - "def best_joke(state: OverallState):\n", - " jokes = \"\\n\\n\".join(state[\"jokes\"])\n", - " prompt = best_joke_prompt.format(topic=state[\"topic\"], jokes=jokes)\n", - " response = model.with_structured_output(BestJoke).invoke(prompt)\n", - " return {\"best_selected_joke\": state[\"jokes\"][response.id]}\n", - "\n", - "\n", - "# Construct the graph: here we put everything together to construct our graph\n", - "graph = StateGraph(OverallState)\n", - "graph.add_node(\"generate_topics\", generate_topics)\n", - "graph.add_node(\"generate_joke\", generate_joke)\n", - "graph.add_node(\"best_joke\", best_joke)\n", - "graph.add_edge(START, \"generate_topics\")\n", - "graph.add_conditional_edges(\"generate_topics\", continue_to_jokes, [\"generate_joke\"])\n", - "graph.add_edge(\"generate_joke\", \"best_joke\")\n", - "graph.add_edge(\"best_joke\", END)\n", - "app = graph.compile()" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "37ed1f71-63db-416f-b715-4617b33d4b7f", - "metadata": {}, - "outputs": [ + "cells": [ { - "data": { - "image/jpeg": "", - "text/plain": [ - "" + "attachments": { + "a108ffc8-6136-4cd7-a6f9-579e41a5a786.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "id": "95a87145-34d0-4f97-b45f-5c9fd8532c8a", + "metadata": {}, + "source": [ + "# How to create map-reduce branches for parallel execution\n", + "\n", + "[Map-reduce](https://en.wikipedia.org/wiki/MapReduce) operations are essential for efficient task decomposition and parallel processing. This approach involves breaking a task into smaller sub-tasks, processing each sub-task in parallel, and aggregating the results across all of the completed sub-tasks. \n", + "\n", + "Consider this example: given a general topic from the user, generate a list of related subjects, generate a joke for each subject, and select the best joke from the resulting list. In this design pattern, a first node may generate a list of objects (e.g., related subjects) and we want to apply some other node (e.g., generate a joke) to all those objects (e.g., subjects). However, two main challenges arise.\n", + " \n", + "(1) the number of objects (e.g., subjects) may be unknown ahead of time (meaning the number of edges may not be known) when we lay out the graph and (2) the input State to the downstream Node should be different (one for each generated object).\n", + " \n", + "LangGraph addresses these challenges [through its `Send` API](https://langchain-ai.github.io/langgraph/concepts/low_level/#send). By utilizing conditional edges, `Send` can distribute different states (e.g., subjects) to multiple instances of a node (e.g., joke generation). Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management. \n", + "\n", + "![Screenshot 2024-07-12 at 9.45.40 AM.png](attachment:a108ffc8-6136-4cd7-a6f9-579e41a5a786.png)" ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "from IPython.display import Image\n", - "\n", - "Image(app.get_graph().draw_mermaid_png())" - ] - }, - { - "cell_type": "markdown", - "id": "4a0026d8", - "metadata": {}, - "source": [ - "## Use the graph" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "fd90cace", - "metadata": {}, - "outputs": [ + }, + { + "cell_type": "markdown", + "id": "66c58b5f", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, let's install the required packages and set our API keys" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "3eb04cd1", + "metadata": {}, + "outputs": [], + "source": [ + "%%capture --no-stderr\n", + "%pip install -U langchain-anthropic langgraph" + ] + }, { - "name": "stdout", - "output_type": "stream", - "text": [ - "{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}\n", - "{'generate_joke': {'jokes': [\"Why don't elephants use computers? They're afraid of the mouse!\"]}}\n", - "{'generate_joke': {'jokes': [\"Why don't dolphins use smartphones? Because they're afraid of phishing!\"]}}\n", - "{'generate_joke': {'jokes': [\"Why don't you see penguins in Britain? Because they're afraid of Wales!\"]}}\n", - "{'generate_joke': {'jokes': [\"Why don't lions like fast food? Because they can't catch it!\"]}}\n", - "{'best_joke': {'best_selected_joke': \"Why don't dolphins use smartphones? Because they're afraid of phishing!\"}}\n" - ] + "cell_type": "code", + "execution_count": null, + "id": "dc292321", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import getpass\n", + "\n", + "\n", + "def _set_env(name: str):\n", + " if not os.getenv(name):\n", + " os.environ[name] = getpass.getpass(f\"{name}: \")\n", + "\n", + "\n", + "_set_env(\"ANTHROPIC_API_KEY\")" + ] + }, + { + "cell_type": "markdown", + "id": "b87911bb", + "metadata": {}, + "source": [ + "
\n", + "

Set up LangSmith for LangGraph development

\n", + "

\n", + " Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here. \n", + "

\n", + "
" + ] + }, + { + "cell_type": "markdown", + "id": "b4e782a0", + "metadata": {}, + "source": [ + "## Define the graph" + ] + }, + { + "cell_type": "markdown", + "id": "66803b55", + "metadata": {}, + "source": [ + "
\n", + "

Using Pydantic with LangChain

\n", + "

\n", + " This notebook uses Pydantic v2 BaseModel, which requires langchain-core >= 0.3. Using langchain-core < 0.3 will result in errors due to mixing of Pydantic v1 and v2 BaseModels.\n", + "

\n", + "
" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "0f0f78e4-423d-4e2d-aa1a-01efaec4715f", + "metadata": {}, + "outputs": [], + "source": [ + "import operator\n", + "from typing import Annotated, TypedDict\n", + "\n", + "from langchain_anthropic import ChatAnthropic\n", + "\n", + "from langgraph.types import Send\n", + "from langgraph.graph import END, StateGraph, START\n", + "\n", + "from pydantic import BaseModel, Field\n", + "\n", + "# Model and prompts\n", + "# Define model and prompts we will use\n", + "subjects_prompt = \"\"\"Generate a comma separated list of between 2 and 5 examples related to: {topic}.\"\"\"\n", + "joke_prompt = \"\"\"Generate a joke about {subject}\"\"\"\n", + "best_joke_prompt = \"\"\"Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.\n", + "\n", + "{jokes}\"\"\"\n", + "\n", + "\n", + "class Subjects(BaseModel):\n", + " subjects: list[str]\n", + "\n", + "\n", + "class Joke(BaseModel):\n", + " joke: str\n", + "\n", + "\n", + "class BestJoke(BaseModel):\n", + " id: int = Field(description=\"Index of the best joke, starting with 0\", ge=0)\n", + "\n", + "\n", + "model = ChatAnthropic(model=\"claude-3-5-sonnet-20240620\")\n", + "\n", + "# Graph components: define the components that will make up the graph\n", + "\n", + "\n", + "# This will be the overall state of the main graph.\n", + "# It will contain a topic (which we expect the user to provide)\n", + "# and then will generate a list of subjects, and then a joke for\n", + "# each subject\n", + "class OverallState(TypedDict):\n", + " topic: str\n", + " subjects: list\n", + " # Notice here we use the operator.add\n", + " # This is because we want combine all the jokes we generate\n", + " # from individual nodes back into one list - this is essentially\n", + " # the \"reduce\" part\n", + " jokes: Annotated[list, operator.add]\n", + " best_selected_joke: str\n", + "\n", + "\n", + "# This will be the state of the node that we will \"map\" all\n", + "# subjects to in order to generate a joke\n", + "class JokeState(TypedDict):\n", + " subject: str\n", + "\n", + "\n", + "# This is the function we will use to generate the subjects of the jokes\n", + "def generate_topics(state: OverallState):\n", + " prompt = subjects_prompt.format(topic=state[\"topic\"])\n", + " response = model.with_structured_output(Subjects).invoke(prompt)\n", + " return {\"subjects\": response.subjects}\n", + "\n", + "\n", + "# Here we generate a joke, given a subject\n", + "def generate_joke(state: JokeState):\n", + " prompt = joke_prompt.format(subject=state[\"subject\"])\n", + " response = model.with_structured_output(Joke).invoke(prompt)\n", + " return {\"jokes\": [response.joke]}\n", + "\n", + "\n", + "# Here we define the logic to map out over the generated subjects\n", + "# We will use this an edge in the graph\n", + "def continue_to_jokes(state: OverallState):\n", + " # We will return a list of `Send` objects\n", + " # Each `Send` object consists of the name of a node in the graph\n", + " # as well as the state to send to that node\n", + " return [Send(\"generate_joke\", {\"subject\": s}) for s in state[\"subjects\"]]\n", + "\n", + "\n", + "# Here we will judge the best joke\n", + "def best_joke(state: OverallState):\n", + " jokes = \"\\n\\n\".join(state[\"jokes\"])\n", + " prompt = best_joke_prompt.format(topic=state[\"topic\"], jokes=jokes)\n", + " response = model.with_structured_output(BestJoke).invoke(prompt)\n", + " return {\"best_selected_joke\": state[\"jokes\"][response.id]}\n", + "\n", + "\n", + "# Construct the graph: here we put everything together to construct our graph\n", + "graph = StateGraph(OverallState)\n", + "graph.add_node(\"generate_topics\", generate_topics)\n", + "graph.add_node(\"generate_joke\", generate_joke)\n", + "graph.add_node(\"best_joke\", best_joke)\n", + "graph.add_edge(START, \"generate_topics\")\n", + "graph.add_conditional_edges(\"generate_topics\", continue_to_jokes, [\"generate_joke\"])\n", + "graph.add_edge(\"generate_joke\", \"best_joke\")\n", + "graph.add_edge(\"best_joke\", END)\n", + "app = graph.compile()" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "37ed1f71-63db-416f-b715-4617b33d4b7f", + "metadata": {}, + "outputs": [ + { + "data": { + "image/jpeg": "", + "text/plain": [ + "" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from IPython.display import Image\n", + "\n", + "Image(app.get_graph().draw_mermaid_png())" + ] + }, + { + "cell_type": "markdown", + "id": "4a0026d8", + "metadata": {}, + "source": [ + "## Use the graph" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "fd90cace", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}\n", + "{'generate_joke': {'jokes': [\"Why don't elephants use computers? They're afraid of the mouse!\"]}}\n", + "{'generate_joke': {'jokes': [\"Why don't dolphins use smartphones? Because they're afraid of phishing!\"]}}\n", + "{'generate_joke': {'jokes': [\"Why don't you see penguins in Britain? Because they're afraid of Wales!\"]}}\n", + "{'generate_joke': {'jokes': [\"Why don't lions like fast food? Because they can't catch it!\"]}}\n", + "{'best_joke': {'best_selected_joke': \"Why don't dolphins use smartphones? Because they're afraid of phishing!\"}}\n" + ] + } + ], + "source": [ + "# Call the graph: here we call it to generate a list of jokes\n", + "for s in app.stream({\"topic\": \"animals\"}):\n", + " print(s)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" } - ], - "source": [ - "# Call the graph: here we call it to generate a list of jokes\n", - "for s in app.stream({\"topic\": \"animals\"}):\n", - " print(s)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 + "nbformat": 4, + "nbformat_minor": 5 } diff --git a/docs/docs/reference/graphs.md b/docs/docs/reference/graphs.md index b2f4acdf2..14392ba2c 100644 --- a/docs/docs/reference/graphs.md +++ b/docs/docs/reference/graphs.md @@ -29,7 +29,7 @@ handler: python ## StreamMode -::: langgraph.pregel.StreamMode +::: langgraph.types.StreamMode ## Constants @@ -69,8 +69,12 @@ builder.add_conditional_edges("my_node", my_condition) ## Send -::: langgraph.constants.Send +::: langgraph.types.Send + +## Interrupt + +::: langgraph.types.Interrupt ## RetryPolicy -::: langgraph.pregel.types.RetryPolicy +::: langgraph.types.RetryPolicy diff --git a/libs/langgraph/langgraph/constants.py b/libs/langgraph/langgraph/constants.py index e8719e664..bde74c438 100644 --- a/libs/langgraph/langgraph/constants.py +++ b/libs/langgraph/langgraph/constants.py @@ -1,133 +1,109 @@ -from dataclasses import dataclass from types import MappingProxyType -from typing import Any, Literal, Mapping +from typing import Any, Mapping +from langgraph.types import Interrupt, Send # noqa: F401 + +# Interrupt, Send re-exported for backwards compatibility + + +# --- Empty read-only containers --- +EMPTY_MAP: Mapping[str, Any] = MappingProxyType({}) +EMPTY_SEQ: tuple[str, ...] = tuple() + +# --- Public constants --- +TAG_HIDDEN = "langsmith:hidden" +# tag to hide a node/edge from certain tracing/streaming environments +START = "__start__" +# the first (maybe virtual) node in graph-style Pregel +END = "__end__" +# the last (maybe virtual) node in graph-style Pregel + +# --- Reserved write keys --- INPUT = "__input__" +# for values passed as input to the graph +INTERRUPT = "__interrupt__" +# for dynamic interrupts raised by nodes +ERROR = "__error__" +# for errors raised by nodes +NO_WRITES = "__no_writes__" +# marker to signal node didn't write anything +SCHEDULED = "__scheduled__" +# marker to signal node was scheduled (in distributed mode) +TASKS = "__pregel_tasks" +# for Send objects returned by nodes/edges, corresponds to PUSH below + +# --- Reserved config.configurable keys --- CONFIG_KEY_SEND = "__pregel_send" +# holds the `write` function that accepts writes to state/edges/reserved keys CONFIG_KEY_READ = "__pregel_read" +# holds the `read` function that returns a copy of the current state CONFIG_KEY_CHECKPOINTER = "__pregel_checkpointer" +# holds a `BaseCheckpointSaver` passed from parent graph to child graphs CONFIG_KEY_STREAM = "__pregel_stream" +# holds a `StreamProtocol` passed from parent graph to child graphs CONFIG_KEY_STREAM_WRITER = "__pregel_stream_writer" +# holds a `StreamWriter` for stream_mode=custom CONFIG_KEY_STORE = "__pregel_store" +# holds a `BaseStore` made available to managed values CONFIG_KEY_RESUMING = "__pregel_resuming" +# holds a boolean indicating if subgraphs should resume from a previous checkpoint CONFIG_KEY_TASK_ID = "__pregel_task_id" +# holds the task ID for the current task CONFIG_KEY_DEDUPE_TASKS = "__pregel_dedupe_tasks" +# holds a boolean indicating if tasks should be deduplicated (for distributed mode) CONFIG_KEY_ENSURE_LATEST = "__pregel_ensure_latest" +# holds a boolean indicating whether to assert the requested checkpoint is the latest +# (for distributed mode) CONFIG_KEY_DELEGATE = "__pregel_delegate" -# this one part of public API so more readable +# holds a boolean indicating whether to delegate subgraphs (for distributed mode) CONFIG_KEY_CHECKPOINT_MAP = "checkpoint_map" -INTERRUPT = "__interrupt__" -ERROR = "__error__" -NO_WRITES = "__no_writes__" -SCHEDULED = "__scheduled__" -TASKS = "__pregel_tasks" # for backwards compat, this is the original name of PUSH +# holds a mapping of checkpoint_ns -> checkpoint_id for parent graphs +CONFIG_KEY_CHECKPOINT_ID = "checkpoint_id" +# holds the current checkpoint_id, if any +CONFIG_KEY_CHECKPOINT_NS = "checkpoint_ns" +# holds the current checkpoint_ns, "" for root graph + +# --- Other constants --- PUSH = "__pregel_push" +# denotes push-style tasks, ie. those created by Send objects PULL = "__pregel_pull" +# denotes pull-style tasks, ie. those triggered by edges RUNTIME_PLACEHOLDER = "__pregel_runtime_placeholder__" +# placeholder for managed values replaced at runtime +NS_SEP = "|" +# for checkpoint_ns, separates each level (ie. graph|subgraph|subsubgraph) +NS_END = ":" +# for checkpoint_ns, for each level, separates the namespace from the task_id + RESERVED = { - SCHEDULED, + TAG_HIDDEN, + # reserved write keys + INPUT, INTERRUPT, ERROR, NO_WRITES, + SCHEDULED, TASKS, - PUSH, - PULL, + # reserved config.configurable keys CONFIG_KEY_SEND, CONFIG_KEY_READ, CONFIG_KEY_CHECKPOINTER, - CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_STREAM, CONFIG_KEY_STREAM_WRITER, CONFIG_KEY_STORE, + CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_RESUMING, CONFIG_KEY_TASK_ID, CONFIG_KEY_DEDUPE_TASKS, CONFIG_KEY_ENSURE_LATEST, CONFIG_KEY_DELEGATE, - INPUT, + CONFIG_KEY_CHECKPOINT_MAP, + CONFIG_KEY_CHECKPOINT_ID, + CONFIG_KEY_CHECKPOINT_NS, + # other constants + PUSH, + PULL, RUNTIME_PLACEHOLDER, + NS_SEP, + NS_END, } -TAG_HIDDEN = "langsmith:hidden" - -START = "__start__" -END = "__end__" - -NS_SEP = "|" -NS_END = ":" - -EMPTY_MAP: Mapping[str, Any] = MappingProxyType({}) - - -class Send: - """A message or packet to send to a specific node in the graph. - - The `Send` class is used within a `StateGraph`'s conditional edges to - dynamically invoke a node with a custom state at the next step. - - Importantly, the sent state can differ from the core graph's state, - allowing for flexible and dynamic workflow management. - - One such example is a "map-reduce" workflow where your graph invokes - the same node multiple times in parallel with different states, - before aggregating the results back into the main graph's state. - - Attributes: - node (str): The name of the target node to send the message to. - arg (Any): The state or message to send to the target node. - - Examples: - >>> from typing import Annotated - >>> import operator - >>> class OverallState(TypedDict): - ... subjects: list[str] - ... jokes: Annotated[list[str], operator.add] - ... - >>> from langgraph.constants import Send - >>> from langgraph.graph import END, START - >>> def continue_to_jokes(state: OverallState): - ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] - ... - >>> from langgraph.graph import StateGraph - >>> builder = StateGraph(OverallState) - >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]}) - >>> builder.add_conditional_edges(START, continue_to_jokes) - >>> builder.add_edge("generate_joke", END) - >>> graph = builder.compile() - >>> - >>> # Invoking with two subjects results in a generated joke for each - >>> graph.invoke({"subjects": ["cats", "dogs"]}) - {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']} - """ - - node: str - arg: Any - - def __init__(self, /, node: str, arg: Any) -> None: - """ - Initialize a new instance of the Send class. - - Args: - node (str): The name of the target node to send the message to. - arg (Any): The state or message to send to the target node. - """ - self.node = node - self.arg = arg - - def __hash__(self) -> int: - return hash((self.node, self.arg)) - - def __repr__(self) -> str: - return f"Send(node={self.node!r}, arg={self.arg!r})" - - def __eq__(self, value: object) -> bool: - return ( - isinstance(value, Send) - and self.node == value.node - and self.arg == value.arg - ) - - -@dataclass -class Interrupt: - value: Any - when: Literal["during"] = "during" diff --git a/libs/langgraph/langgraph/errors.py b/libs/langgraph/langgraph/errors.py index ec84e0b28..63bc8aff6 100644 --- a/libs/langgraph/langgraph/errors.py +++ b/libs/langgraph/langgraph/errors.py @@ -1,7 +1,9 @@ from typing import Any, Sequence -from langgraph.checkpoint.base import EmptyChannelError -from langgraph.constants import Interrupt +from langgraph.checkpoint.base import EmptyChannelError # noqa: F401 +from langgraph.types import Interrupt + +# EmptyChannelError re-exported for backwards compatibility class GraphRecursionError(RecursionError): @@ -24,13 +26,14 @@ class GraphRecursionError(RecursionError): class InvalidUpdateError(Exception): - """Raised when attempting to update a channel with an invalid sequence of updates.""" + """Raised when attempting to update a channel with an invalid set of updates.""" pass class GraphInterrupt(Exception): - """Raised when a subgraph is interrupted.""" + """Raised when a subgraph is interrupted, suppressed by the root graph. + Never raised directly, or surfaced to the user.""" def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None: super().__init__(interrupts) @@ -44,7 +47,7 @@ def __init__(self, value: Any) -> None: class GraphDelegate(Exception): - """Raised when a graph is delegated.""" + """Raised when a graph is delegated (for distributed mode).""" def __init__(self, *args: dict[str, Any]) -> None: super().__init__(*args) @@ -57,22 +60,22 @@ class EmptyInputError(Exception): class TaskNotFound(Exception): - """Raised when the executor is unable to find a task.""" + """Raised when the executor is unable to find a task (for distributed mode).""" pass class CheckpointNotLatest(Exception): - """Raised when the checkpoint is not the latest version.""" + """Raised when the checkpoint is not the latest version (for distributed mode).""" + + pass + + +class MultipleSubgraphsError(Exception): + """Raised when multiple subgraphs are called inside the same node.""" pass -__all__ = [ - "GraphRecursionError", - "InvalidUpdateError", - "GraphInterrupt", - "NodeInterrupt", - "EmptyInputError", - "EmptyChannelError", -] +_SEEN_CHECKPOINT_NS: set[str] = set() +"""Used for subgraph detection.""" diff --git a/libs/langgraph/langgraph/graph/graph.py b/libs/langgraph/langgraph/graph/graph.py index c5a043ee7..e957a15b9 100644 --- a/libs/langgraph/langgraph/graph/graph.py +++ b/libs/langgraph/langgraph/graph/graph.py @@ -26,7 +26,6 @@ from typing_extensions import Self from langgraph.channels.ephemeral_value import EphemeralValue -from langgraph.checkpoint.base import BaseCheckpointSaver from langgraph.constants import ( END, NS_END, @@ -38,8 +37,8 @@ from langgraph.errors import InvalidUpdateError from langgraph.pregel import Channel, Pregel from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry +from langgraph.types import All, Checkpointer from langgraph.utils.runnable import RunnableCallable, coerce_to_runnable logger = logging.getLogger(__name__) @@ -406,7 +405,7 @@ def validate(self, interrupt: Optional[Sequence[str]] = None) -> Self: def compile( self, - checkpointer: Optional[BaseCheckpointSaver] = None, + checkpointer: Checkpointer = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, debug: bool = False, diff --git a/libs/langgraph/langgraph/graph/state.py b/libs/langgraph/langgraph/graph/state.py index cee0fb849..bc0762c80 100644 --- a/libs/langgraph/langgraph/graph/state.py +++ b/libs/langgraph/langgraph/graph/state.py @@ -32,7 +32,6 @@ from langgraph.channels.ephemeral_value import EphemeralValue from langgraph.channels.last_value import LastValue from langgraph.channels.named_barrier_value import NamedBarrierValue -from langgraph.checkpoint.base import BaseCheckpointSaver from langgraph.constants import NS_END, NS_SEP, TAG_HIDDEN from langgraph.errors import InvalidUpdateError from langgraph.graph.graph import END, START, Branch, CompiledGraph, Graph, Send @@ -45,9 +44,9 @@ is_writable_managed_value, ) from langgraph.pregel.read import ChannelRead, PregelNode -from langgraph.pregel.types import All, RetryPolicy from langgraph.pregel.write import SKIP_WRITE, ChannelWrite, ChannelWriteEntry from langgraph.store.base import BaseStore +from langgraph.types import All, Checkpointer, RetryPolicy from langgraph.utils.fields import get_field_default from langgraph.utils.pydantic import create_model from langgraph.utils.runnable import coerce_to_runnable @@ -400,7 +399,7 @@ def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> Self: def compile( self, - checkpointer: Optional[BaseCheckpointSaver] = None, + checkpointer: Checkpointer = None, *, store: Optional[BaseStore] = None, interrupt_before: Optional[Union[All, list[str]]] = None, @@ -413,7 +412,7 @@ def compile( streamed, batched, and run asynchronously. Args: - checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object. + checkpointer (Checkpointer): An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point. interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before. diff --git a/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py b/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py index 1e2209bd7..c5c64cd24 100644 --- a/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py +++ b/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py @@ -16,13 +16,13 @@ from langchain_core.tools import BaseTool from langgraph._api.deprecation import deprecated_parameter -from langgraph.checkpoint.base import BaseCheckpointSaver from langgraph.graph import StateGraph from langgraph.graph.graph import CompiledGraph from langgraph.graph.message import add_messages from langgraph.managed import IsLastStep from langgraph.prebuilt.tool_executor import ToolExecutor from langgraph.prebuilt.tool_node import ToolNode +from langgraph.types import Checkpointer # We create the AgentState that we will pass around @@ -132,7 +132,7 @@ def create_react_agent( state_schema: Optional[StateSchemaType] = None, messages_modifier: Optional[MessagesModifier] = None, state_modifier: Optional[StateModifier] = None, - checkpointer: Optional[BaseCheckpointSaver] = None, + checkpointer: Checkpointer = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, debug: bool = False, diff --git a/libs/langgraph/langgraph/pregel/__init__.py b/libs/langgraph/langgraph/pregel/__init__.py index f1a9aa578..f78d072d6 100644 --- a/libs/langgraph/langgraph/pregel/__init__.py +++ b/libs/langgraph/langgraph/pregel/__init__.py @@ -83,11 +83,11 @@ from langgraph.pregel.read import PregelNode from langgraph.pregel.retry import RetryPolicy from langgraph.pregel.runner import PregelRunner -from langgraph.pregel.types import All, StateSnapshot, StreamMode from langgraph.pregel.utils import get_new_channel_versions from langgraph.pregel.validate import validate_graph, validate_keys from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry from langgraph.store.base import BaseStore +from langgraph.types import All, Checkpointer, StateSnapshot, StreamMode from langgraph.utils.config import ( ensure_config, merge_configs, @@ -197,7 +197,7 @@ class Pregel(Runnable[Union[dict[str, Any], Any], Union[dict[str, Any], Any]]): debug: bool """Whether to print debug information during execution. Defaults to False.""" - checkpointer: Optional[BaseCheckpointSaver] = None + checkpointer: Checkpointer = None """Checkpointer used to save and load graph state. Defaults to None.""" store: Optional[BaseStore] = None @@ -281,7 +281,7 @@ def config_specs(self) -> list[ConfigurableFieldSpec]: [spec for node in self.nodes.values() for spec in node.config_specs] + ( self.checkpointer.config_specs - if self.checkpointer is not None + if isinstance(self.checkpointer, BaseCheckpointSaver) else [] ) + ( @@ -1059,6 +1059,8 @@ def _defaults( Union[All, Sequence[str]], Optional[BaseCheckpointSaver], ]: + if config["recursion_limit"] < 1: + raise ValueError("recursion_limit must be at least 1") debug = debug if debug is not None else self.debug if output_keys is None: output_keys = self.stream_channels_asis @@ -1072,12 +1074,16 @@ def _defaults( if CONFIG_KEY_TASK_ID in config.get("configurable", {}): # if being called as a node in another graph, always use values mode stream_mode = ["values"] - if CONFIG_KEY_CHECKPOINTER in config.get("configurable", {}): - checkpointer: Optional[BaseCheckpointSaver] = config["configurable"][ - CONFIG_KEY_CHECKPOINTER - ] + if self.checkpointer is False: + checkpointer: Optional[BaseCheckpointSaver] = None + elif CONFIG_KEY_CHECKPOINTER in config.get("configurable", {}): + checkpointer = config["configurable"][CONFIG_KEY_CHECKPOINTER] else: checkpointer = self.checkpointer + if checkpointer and not config.get("configurable"): + raise ValueError( + f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in checkpointer.config_specs]}" + ) return ( debug, set(stream_mode), @@ -1193,12 +1199,6 @@ def output() -> Iterator: run_id=config.get("run_id"), ) try: - if config["recursion_limit"] < 1: - raise ValueError("recursion_limit must be at least 1") - if self.checkpointer and not config.get("configurable"): - raise ValueError( - f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}" - ) # assign defaults ( debug, @@ -1414,12 +1414,6 @@ def output() -> Iterator: None, ) try: - if config["recursion_limit"] < 1: - raise ValueError("recursion_limit must be at least 1") - if self.checkpointer and not config.get("configurable"): - raise ValueError( - f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}" - ) # assign defaults ( debug, diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index 8a63d4cc6..f9b0096c8 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -33,6 +33,7 @@ CONFIG_KEY_READ, CONFIG_KEY_SEND, CONFIG_KEY_TASK_ID, + EMPTY_SEQ, INTERRUPT, NO_WRITES, NS_END, @@ -50,15 +51,16 @@ from langgraph.pregel.log import logger from langgraph.pregel.manager import ChannelsManager from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All, PregelExecutableTask, PregelTask +from langgraph.types import All, PregelExecutableTask, PregelTask from langgraph.utils.config import merge_configs, patch_config GetNextVersion = Callable[[Optional[V], BaseChannel], V] -EMPTY_SEQ: tuple[str, ...] = tuple() - class WritesProtocol(Protocol): + """Protocol for objects containing writes to be applied to checkpoint. + Implemented by PregelTaskWrites and PregelExecutableTask.""" + @property def name(self) -> str: ... @@ -70,6 +72,9 @@ def triggers(self) -> Sequence[str]: ... class PregelTaskWrites(NamedTuple): + """Simplest implementation of WritesProtocol, for usage with writes that + don't originate from a runnable task, eg. graph input, update_state, etc.""" + name: str writes: Sequence[tuple[str, Any]] triggers: Sequence[str] @@ -80,6 +85,7 @@ def should_interrupt( interrupt_nodes: Union[All, Sequence[str]], tasks: Iterable[PregelExecutableTask], ) -> list[PregelExecutableTask]: + """Check if the graph should be interrupted based on current state.""" version_type = type(next(iter(checkpoint["channel_versions"].values()), None)) null_version = version_type() # type: ignore[misc] seen = checkpoint["versions_seen"].get(INTERRUPT, {}) @@ -117,6 +123,9 @@ def local_read( select: Union[list[str], str], fresh: bool = False, ) -> Union[dict[str, Any], Any]: + """Function injected under CONFIG_KEY_READ in task config, to read current state. + Used by conditional edges to read a copy of the state with reflecting the writes + from that node only.""" if isinstance(select, str): managed_keys = [] for c, _ in task.writes: @@ -153,6 +162,8 @@ def local_write( managed: ManagedValueMapping, writes: Sequence[tuple[str, Any]], ) -> None: + """Function injected under CONFIG_KEY_SEND in task config, to write to channels. + Validates writes and forwards them to `commit` function.""" for chan, value in writes: if chan == TASKS: if not isinstance(value, Send): @@ -169,6 +180,7 @@ def local_write( def increment(current: Optional[int], channel: BaseChannel) -> int: + """Default channel versioning function, increments the current int version.""" return current + 1 if current is not None else 1 @@ -178,6 +190,9 @@ def apply_writes( tasks: Iterable[WritesProtocol], get_next_version: Optional[GetNextVersion], ) -> dict[str, list[Any]]: + """Apply writes from a set of tasks (usually the tasks from a Pregel step) + to the checkpoint and channels, and return managed values writes to be applied + externally.""" # update seen versions for task in tasks: checkpoint["versions_seen"].setdefault(task.name, {}).update( @@ -297,6 +312,9 @@ def prepare_next_tasks( checkpointer: Optional[BaseCheckpointSaver] = None, manager: Union[None, ParentRunManager, AsyncParentRunManager] = None, ) -> Union[dict[str, PregelTask], dict[str, PregelExecutableTask]]: + """Prepare the set of tasks that will make up the next Pregel step. + This is the union of all PUSH tasks (Sends) and PULL tasks (nodes triggered + by edges).""" tasks: dict[str, Union[PregelTask, PregelExecutableTask]] = {} # Consume pending packets for idx, _ in enumerate(checkpoint["pending_sends"]): @@ -348,6 +366,8 @@ def prepare_single_task( checkpointer: Optional[BaseCheckpointSaver] = None, manager: Union[None, ParentRunManager, AsyncParentRunManager] = None, ) -> Union[None, PregelTask, PregelExecutableTask]: + """Prepares a single task for the next Pregel step, given a task path, which + uniquely identifies a PUSH or PULL task within the graph.""" checkpoint_id = UUID(checkpoint["id"]).bytes configurable = config.get("configurable", {}) parent_ns = configurable.get("checkpoint_ns", "") @@ -568,6 +588,7 @@ def _proc_input( *, for_execution: bool, ) -> Iterator[Any]: + """Prepare input for a PULL task, based on the process's channels and triggers.""" # If all trigger channels subscribed by this process are not empty # then invoke the process with the values of all non-empty channels if isinstance(proc.channels, dict): diff --git a/libs/langgraph/langgraph/pregel/debug.py b/libs/langgraph/langgraph/pregel/debug.py index 56f1eb9c6..982182842 100644 --- a/libs/langgraph/langgraph/pregel/debug.py +++ b/libs/langgraph/langgraph/pregel/debug.py @@ -22,7 +22,7 @@ from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, PendingWrite from langgraph.constants import ERROR, INTERRUPT, TAG_HIDDEN from langgraph.pregel.io import read_channels -from langgraph.pregel.types import PregelExecutableTask, PregelTask, StateSnapshot +from langgraph.types import PregelExecutableTask, PregelTask, StateSnapshot class TaskPayload(TypedDict): @@ -84,6 +84,7 @@ class DebugOutputCheckpoint(DebugOutputBase): def map_debug_tasks( step: int, tasks: Iterable[PregelExecutableTask] ) -> Iterator[DebugOutputTask]: + """Produce "task" events for stream_mode=debug.""" ts = datetime.now(timezone.utc).isoformat() for task in tasks: if task.config is not None and TAG_HIDDEN in task.config.get("tags", []): @@ -107,6 +108,7 @@ def map_debug_task_results( task_tup: tuple[PregelExecutableTask, Sequence[tuple[str, Any]]], stream_keys: Union[str, Sequence[str]], ) -> Iterator[DebugOutputTaskResult]: + """Produce "task_result" events for stream_mode=debug.""" stream_channels_list = ( [stream_keys] if isinstance(stream_keys, str) else stream_keys ) @@ -135,6 +137,7 @@ def map_debug_checkpoint( tasks: Iterable[PregelExecutableTask], pending_writes: list[PendingWrite], ) -> Iterator[DebugOutputCheckpoint]: + """Produce "checkpoint" events for stream_mode=debug.""" yield { "type": "checkpoint", "timestamp": checkpoint["ts"], @@ -213,6 +216,7 @@ def tasks_w_writes( pending_writes: Optional[list[PendingWrite]], states: Optional[dict[str, Union[RunnableConfig, StateSnapshot]]], ) -> tuple[PregelTask, ...]: + """Apply writes / subgraph states to tasks to be returned in a StateSnapshot.""" pending_writes = pending_writes or [] return tuple( PregelTask( diff --git a/libs/langgraph/langgraph/pregel/executor.py b/libs/langgraph/langgraph/pregel/executor.py index 46f1c3f64..691098b7a 100644 --- a/libs/langgraph/langgraph/pregel/executor.py +++ b/libs/langgraph/langgraph/pregel/executor.py @@ -39,6 +39,13 @@ def __call__( class BackgroundExecutor(ContextManager): + """A context manager that runs sync tasks in the background. + Uses a thread pool executor to delegate tasks to separate threads. + On exit, + - cancels any (not yet started) tasks with `__cancel_on_exit__=True` + - waits for all tasks to finish + - re-raises the first exception from tasks with `__reraise_on_exit__=True`""" + def __init__(self, config: RunnableConfig) -> None: self.stack = ExitStack() self.executor = self.stack.enter_context(get_executor_for_config(config)) @@ -49,7 +56,7 @@ def submit( # type: ignore[valid-type] fn: Callable[P, T], *args: P.args, __name__: Optional[str] = None, # currently not used in sync version - __cancel_on_exit__: bool = False, + __cancel_on_exit__: bool = False, # for sync, can cancel only if not started __reraise_on_exit__: bool = True, **kwargs: P.kwargs, ) -> concurrent.futures.Future[T]: @@ -101,6 +108,14 @@ def __exit__( class AsyncBackgroundExecutor(AsyncContextManager): + """A context manager that runs async tasks in the background. + Uses the current event loop to delegate tasks to asyncio tasks. + On exit, + - cancels any tasks with `__cancel_on_exit__=True` + - waits for all tasks to finish + - re-raises the first exception from tasks with `__reraise_on_exit__=True` + ignoring CancelledError""" + def __init__(self) -> None: self.context_not_supported = sys.version_info < (3, 11) self.tasks: dict[asyncio.Task, tuple[bool, bool]] = {} diff --git a/libs/langgraph/langgraph/pregel/io.py b/libs/langgraph/langgraph/pregel/io.py index ad2252c9d..ef9822641 100644 --- a/libs/langgraph/langgraph/pregel/io.py +++ b/libs/langgraph/langgraph/pregel/io.py @@ -3,9 +3,9 @@ from langchain_core.runnables.utils import AddableDict from langgraph.channels.base import BaseChannel, EmptyChannelError -from langgraph.constants import ERROR, INTERRUPT, TAG_HIDDEN +from langgraph.constants import EMPTY_SEQ, ERROR, INTERRUPT, TAG_HIDDEN from langgraph.pregel.log import logger -from langgraph.pregel.types import PregelExecutableTask +from langgraph.types import PregelExecutableTask def read_channel( @@ -97,9 +97,6 @@ def __radd__(self, other: dict[str, Any]) -> "AddableUpdatesDict": raise TypeError("AddableUpdatesDict does not support right-side addition") -EMPTY_SEQ: tuple[str, ...] = tuple() - - def map_output_updates( output_channels: Union[str, Sequence[str]], tasks: list[tuple[PregelExecutableTask, Sequence[tuple[str, Any]]]], diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index 49baa1846..45b8798af 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -44,6 +44,7 @@ CONFIG_KEY_RESUMING, CONFIG_KEY_STREAM, CONFIG_KEY_TASK_ID, + EMPTY_SEQ, ERROR, INPUT, INTERRUPT, @@ -53,10 +54,12 @@ TASKS, ) from langgraph.errors import ( + _SEEN_CHECKPOINT_NS, CheckpointNotLatest, EmptyInputError, GraphDelegate, GraphInterrupt, + MultipleSubgraphsError, ) from langgraph.managed.base import ( ManagedValueMapping, @@ -93,21 +96,20 @@ ) from langgraph.pregel.manager import AsyncChannelsManager, ChannelsManager from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All, PregelExecutableTask, StreamMode from langgraph.pregel.utils import get_new_channel_versions from langgraph.store.base import BaseStore from langgraph.store.batch import AsyncBatchedStore +from langgraph.types import All, PregelExecutableTask, StreamMode from langgraph.utils.config import patch_configurable V = TypeVar("V") P = ParamSpec("P") +StreamChunk = tuple[tuple[str, ...], str, Any] + INPUT_DONE = object() INPUT_RESUMING = object() -EMPTY_SEQ = () SPECIAL_CHANNELS = (ERROR, INTERRUPT, SCHEDULED) -StreamChunk = tuple[tuple[str, ...], str, Any] - class StreamProtocol: __slots__ = ("modes", "__call__") @@ -195,6 +197,7 @@ def __init__( specs: Mapping[str, Union[BaseChannel, ManagedValueSpec]], output_keys: Union[str, Sequence[str]], stream_keys: Union[str, Sequence[str]], + check_subgraphs: bool = True, debug: bool = False, ) -> None: self.stream = stream @@ -220,6 +223,11 @@ def __init__( self.config = patch_configurable( self.config, {"checkpoint_ns": "", "checkpoint_id": None} ) + if check_subgraphs and self.is_nested and self.checkpointer is not None: + if self.config["configurable"]["checkpoint_ns"] in _SEEN_CHECKPOINT_NS: + raise MultipleSubgraphsError + else: + _SEEN_CHECKPOINT_NS.add(self.config["configurable"]["checkpoint_ns"]) if ( CONFIG_KEY_CHECKPOINT_MAP in self.config["configurable"] and self.config["configurable"].get("checkpoint_ns") @@ -634,6 +642,7 @@ def __init__( specs: Mapping[str, Union[BaseChannel, ManagedValueSpec]], output_keys: Union[str, Sequence[str]] = EMPTY_SEQ, stream_keys: Union[str, Sequence[str]] = EMPTY_SEQ, + check_subgraphs: bool = True, debug: bool = False, ) -> None: super().__init__( @@ -646,6 +655,7 @@ def __init__( specs=specs, output_keys=output_keys, stream_keys=stream_keys, + check_subgraphs=check_subgraphs, debug=debug, ) self.stack = ExitStack() @@ -755,6 +765,7 @@ def __init__( specs: Mapping[str, Union[BaseChannel, ManagedValueSpec]], output_keys: Union[str, Sequence[str]] = EMPTY_SEQ, stream_keys: Union[str, Sequence[str]] = EMPTY_SEQ, + check_subgraphs: bool = True, debug: bool = False, ) -> None: super().__init__( @@ -767,6 +778,7 @@ def __init__( specs=specs, output_keys=output_keys, stream_keys=stream_keys, + check_subgraphs=check_subgraphs, debug=debug, ) self.store = AsyncBatchedStore(self.store) if self.store else None diff --git a/libs/langgraph/langgraph/pregel/messages.py b/libs/langgraph/langgraph/pregel/messages.py index 7c3f90b10..d0ae539e2 100644 --- a/libs/langgraph/langgraph/pregel/messages.py +++ b/libs/langgraph/langgraph/pregel/messages.py @@ -24,6 +24,9 @@ class StreamMessagesHandler(BaseCallbackHandler, _StreamingCallbackHandler): + """A callback handler that implements stream_mode=messages. + Collects messages from (1) chat model stream events and (2) node outputs.""" + def __init__(self, stream: Callable[[StreamChunk], None]): self.stream = stream self.metadata: dict[UUID, Meta] = {} diff --git a/libs/langgraph/langgraph/pregel/metadata.py b/libs/langgraph/langgraph/pregel/metadata.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/libs/langgraph/langgraph/pregel/read.py b/libs/langgraph/langgraph/pregel/read.py index 3ad988b89..097e76fa2 100644 --- a/libs/langgraph/langgraph/pregel/read.py +++ b/libs/langgraph/langgraph/pregel/read.py @@ -31,6 +31,9 @@ class ChannelRead(RunnableCallable): + """Implements the logic for reading state from CONFIG_KEY_READ. + Usable both as a runnable as well as a static method to call imperatively.""" + channel: Union[str, list[str]] fresh: bool = False @@ -108,21 +111,39 @@ def do_read( class PregelNode(Runnable): + """A node in a Pregel graph. This won't be invoked as a runnable by the graph + itself, but instead acts as a container for the components necessary to make + a PregelExecutableTask for a node.""" + channels: Union[list[str], Mapping[str, str]] + """The channels that will be passed as input to `bound`. + If a list, the node will be invoked with the first of that isn't empty. + If a dict, the keys are the names of the channels, and the values are the keys + to use in the input to `bound`.""" triggers: list[str] + """If any of these channels is written to, this node will be triggered in + the next step.""" mapper: Optional[Callable[[Any], Any]] + """A function to transform the input before passing it to `bound`.""" writers: list[Runnable] + """A list of writers that will be executed after `bound`, responsible for + taking the output of `bound` and writing it to the appropriate channels.""" bound: Runnable[Any, Any] + """The main logic of the node. This will be invoked with the input from + `channels`.""" retry_policy: Optional[RetryPolicy] + """The retry policy to use when invoking the node.""" tags: Optional[Sequence[str]] + """Tags to attach to the node for tracing.""" metadata: Optional[Mapping[str, Any]] + """Metadata to attach to the node for tracing.""" def __init__( self, @@ -151,7 +172,7 @@ def copy(self, update: dict[str, Any]) -> PregelNode: @cached_property def flat_writers(self) -> list[Runnable]: - """Get writers with optimizations applied.""" + """Get writers with optimizations applied. Dedupes consecutive ChannelWrites.""" writers = self.writers.copy() while ( len(writers) > 1 @@ -170,6 +191,7 @@ def flat_writers(self) -> list[Runnable]: @cached_property def node(self) -> Optional[Runnable[Any, Any]]: + """Get a runnable that combines `bound` and `writers`.""" writers = self.flat_writers if self.bound is DEFAULT_BOUND and not writers: return None diff --git a/libs/langgraph/langgraph/pregel/retry.py b/libs/langgraph/langgraph/pregel/retry.py index 90ccaa7d0..33c60d875 100644 --- a/libs/langgraph/langgraph/pregel/retry.py +++ b/libs/langgraph/langgraph/pregel/retry.py @@ -5,8 +5,8 @@ from typing import Optional, Sequence from langgraph.constants import CONFIG_KEY_RESUMING -from langgraph.errors import GraphInterrupt -from langgraph.pregel.types import PregelExecutableTask, RetryPolicy +from langgraph.errors import _SEEN_CHECKPOINT_NS, GraphInterrupt +from langgraph.types import PregelExecutableTask, RetryPolicy from langgraph.utils.config import patch_configurable logger = logging.getLogger(__name__) @@ -71,6 +71,13 @@ def run_with_retry( ) # signal subgraphs to resume (if available) config = patch_configurable(config, {CONFIG_KEY_RESUMING: True}) + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) + finally: + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) async def arun_with_retry( @@ -137,3 +144,10 @@ async def arun_with_retry( ) # signal subgraphs to resume (if available) config = patch_configurable(config, {CONFIG_KEY_RESUMING: True}) + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) + finally: + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) diff --git a/libs/langgraph/langgraph/pregel/runner.py b/libs/langgraph/langgraph/pregel/runner.py index 14e84352f..b8392b613 100644 --- a/libs/langgraph/langgraph/pregel/runner.py +++ b/libs/langgraph/langgraph/pregel/runner.py @@ -18,10 +18,14 @@ from langgraph.errors import GraphDelegate, GraphInterrupt from langgraph.pregel.executor import Submit from langgraph.pregel.retry import arun_with_retry, run_with_retry -from langgraph.pregel.types import PregelExecutableTask, RetryPolicy +from langgraph.types import PregelExecutableTask, RetryPolicy class PregelRunner: + """Responsible for executing a set of Pregel tasks concurrently, committing + their writes, yielding control to caller when there is output to emit, and + interrupting other tasks if appropriate.""" + def __init__( self, *, @@ -215,6 +219,8 @@ def commit( def _should_stop_others( done: Union[set[concurrent.futures.Future[Any]], set[asyncio.Future[Any]]], ) -> bool: + """Check if any task failed, if so, cancel all other tasks. + GraphInterrupts are not considered failures.""" for fut in done: if fut.cancelled(): return True @@ -227,6 +233,7 @@ def _should_stop_others( def _exception( fut: Union[concurrent.futures.Future[Any], asyncio.Future[Any]], ) -> Optional[BaseException]: + """Return the exception from a future, without raising CancelledError.""" if fut.cancelled(): if isinstance(fut, asyncio.Future): return asyncio.CancelledError() @@ -245,6 +252,7 @@ def _panic_or_proceed( timeout_exc_cls: Type[Exception] = TimeoutError, panic: bool = True, ) -> None: + """Cancel remaining tasks if any failed, re-raise exception if panic is True.""" done: set[Union[concurrent.futures.Future[Any], asyncio.Future[Any]]] = set() inflight: set[Union[concurrent.futures.Future[Any], asyncio.Future[Any]]] = set() for fut, val in futs.items(): diff --git a/libs/langgraph/langgraph/pregel/types.py b/libs/langgraph/langgraph/pregel/types.py index d34845483..7a72b88c9 100644 --- a/libs/langgraph/langgraph/pregel/types.py +++ b/libs/langgraph/langgraph/pregel/types.py @@ -1,124 +1,25 @@ -from collections import deque -from typing import Any, Callable, Literal, NamedTuple, Optional, Sequence, Type, Union - -from langchain_core.runnables import Runnable, RunnableConfig - -from langgraph.checkpoint.base import CheckpointMetadata -from langgraph.constants import Interrupt - - -def default_retry_on(exc: Exception) -> bool: - import httpx - import requests - - if isinstance(exc, ConnectionError): - return True - if isinstance( - exc, - ( - ValueError, - TypeError, - ArithmeticError, - ImportError, - LookupError, - NameError, - SyntaxError, - RuntimeError, - ReferenceError, - StopIteration, - StopAsyncIteration, - OSError, - ), - ): - return False - if isinstance(exc, httpx.HTTPStatusError): - return 500 <= exc.response.status_code < 600 - if isinstance(exc, requests.HTTPError): - return 500 <= exc.response.status_code < 600 if exc.response else True - return True - - -class RetryPolicy(NamedTuple): - """Configuration for retrying nodes.""" - - initial_interval: float = 0.5 - """Amount of time that must elapse before the first retry occurs. In seconds.""" - backoff_factor: float = 2.0 - """Multiplier by which the interval increases after each retry.""" - max_interval: float = 128.0 - """Maximum amount of time that may elapse between retries. In seconds.""" - max_attempts: int = 3 - """Maximum number of attempts to make before giving up, including the first.""" - jitter: bool = True - """Whether to add random jitter to the interval between retries.""" - retry_on: Union[ - Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool] - ] = default_retry_on - """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry.""" - - -class CachePolicy(NamedTuple): - """Configuration for caching nodes.""" - - pass - - -class PregelTask(NamedTuple): - id: str - name: str - path: tuple[Union[str, int], ...] - error: Optional[Exception] = None - interrupts: tuple[Interrupt, ...] = () - state: Union[None, RunnableConfig, "StateSnapshot"] = None - - -class PregelExecutableTask(NamedTuple): - name: str - input: Any - proc: Runnable - writes: deque[tuple[str, Any]] - config: RunnableConfig - triggers: list[str] - retry_policy: Optional[RetryPolicy] - cache_policy: Optional[CachePolicy] - id: str - path: tuple[Union[str, int], ...] - scheduled: bool = False - - -class StateSnapshot(NamedTuple): - """Snapshot of the state of the graph at the beginning of a step.""" - - values: Union[dict[str, Any], Any] - """Current values of channels""" - next: tuple[str, ...] - """The name of the node to execute in each task for this step.""" - config: RunnableConfig - """Config used to fetch this snapshot""" - metadata: Optional[CheckpointMetadata] - """Metadata associated with this snapshot""" - created_at: Optional[str] - """Timestamp of snapshot creation""" - parent_config: Optional[RunnableConfig] - """Config used to fetch the parent snapshot, if any""" - tasks: tuple[PregelTask, ...] - """Tasks to execute in this step. If already attempted, may contain an error.""" - - -All = Literal["*"] - -StreamMode = Literal["values", "updates", "debug", "messages", "custom"] -"""How the stream method should emit outputs. - -- 'values': Emit all values of the state for each step. -- 'updates': Emit only the node name(s) and updates - that were returned by the node(s) **after** each step. -- 'debug': Emit debug events for each step. -- 'messages': Emit LLM messages token-by-token. -- 'custom': Emit custom output `write: StreamWriter` kwarg of each node. -""" - -StreamWriter = Callable[[Any], None] -"""Callable that accepts a single argument and writes it to the output stream. -Always injected into nodes if requested, -but it's a no-op when not using stream_mode="custom".""" +"""Re-export types moved to langgraph.types""" + +from langgraph.types import ( + All, + CachePolicy, + PregelExecutableTask, + PregelTask, + RetryPolicy, + StateSnapshot, + StreamMode, + StreamWriter, + default_retry_on, +) + +__all__ = [ + "All", + "CachePolicy", + "PregelExecutableTask", + "PregelTask", + "RetryPolicy", + "StateSnapshot", + "StreamMode", + "StreamWriter", + "default_retry_on", +] diff --git a/libs/langgraph/langgraph/pregel/utils.py b/libs/langgraph/langgraph/pregel/utils.py index c6dc064d3..3a29e5ed1 100644 --- a/libs/langgraph/langgraph/pregel/utils.py +++ b/libs/langgraph/langgraph/pregel/utils.py @@ -4,7 +4,7 @@ def get_new_channel_versions( previous_versions: ChannelVersions, current_versions: ChannelVersions ) -> ChannelVersions: - """Get new channel versions.""" + """Get subset of current_versions that are newer than previous_versions.""" if previous_versions: version_type = type(next(iter(current_versions.values()), None)) null_version = version_type() # type: ignore[misc] diff --git a/libs/langgraph/langgraph/pregel/validate.py b/libs/langgraph/langgraph/pregel/validate.py index 232014240..cf957dc07 100644 --- a/libs/langgraph/langgraph/pregel/validate.py +++ b/libs/langgraph/langgraph/pregel/validate.py @@ -3,7 +3,7 @@ from langgraph.channels.base import BaseChannel from langgraph.constants import RESERVED from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All +from langgraph.types import All def validate_graph( @@ -17,7 +17,7 @@ def validate_graph( ) -> None: for chan in channels: if chan in RESERVED: - raise ValueError(f"Channel names {RESERVED} are reserved") + raise ValueError(f"Channel names {chan} are reserved") subscribed_channels = set[str]() for name, node in nodes.items(): diff --git a/libs/langgraph/langgraph/pregel/write.py b/libs/langgraph/langgraph/pregel/write.py index 2adcab757..c2795c67c 100644 --- a/libs/langgraph/langgraph/pregel/write.py +++ b/libs/langgraph/langgraph/pregel/write.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio from typing import ( Any, Callable, @@ -22,30 +21,29 @@ TYPE_SEND = Callable[[Sequence[tuple[str, Any]]], None] R = TypeVar("R", bound=Runnable) - SKIP_WRITE = object() PASSTHROUGH = object() class ChannelWriteEntry(NamedTuple): channel: str + """Channel name to write to.""" value: Any = PASSTHROUGH + """Value to write, or PASSTHROUGH to use the input.""" skip_none: bool = False + """Whether to skip writing if the value is None.""" mapper: Optional[Callable] = None + """Function to transform the value before writing.""" class ChannelWrite(RunnableCallable): + """Implements th logic for sending writes to CONFIG_KEY_SEND. + Can be used as a runnable or as a static method to call imperatively.""" + writes: list[Union[ChannelWriteEntry, Send]] - """ - Sequence of write entries, each of which is a tuple of: - - channel name - - runnable to map input, or None to use the input, or any other value to use instead - - whether to skip writing if the mapped value is None - """ + """Sequence of write entries or Send objects to write.""" require_at_least_one_of: Optional[Sequence[str]] - """ - If defined, at least one of these channels must be written to. - """ + """If defined, at least one of these channels must be written to.""" def __init__( self, @@ -145,6 +143,7 @@ def do_write( @staticmethod def is_writer(runnable: Runnable) -> bool: + """Used by PregelNode to distinguish between writers and other runnables.""" return ( isinstance(runnable, ChannelWrite) or getattr(runnable, "_is_channel_writer", False) is True @@ -152,13 +151,9 @@ def is_writer(runnable: Runnable) -> bool: @staticmethod def register_writer(runnable: R) -> R: + """Used to mark a runnable as a writer, so that it can be detected by is_writer. + Instances of ChannelWrite are automatically marked as writers.""" # using object.__setattr__ to work around objects that override __setattr__ # eg. pydantic models and dataclasses object.__setattr__(runnable, "_is_channel_writer", True) return runnable - - -def _mk_future(val: Any) -> asyncio.Future: - fut: asyncio.Future[Any] = asyncio.Future() - fut.set_result(val) - return fut diff --git a/libs/langgraph/langgraph/types.py b/libs/langgraph/langgraph/types.py new file mode 100644 index 000000000..f8a8a74c6 --- /dev/null +++ b/libs/langgraph/langgraph/types.py @@ -0,0 +1,214 @@ +from collections import deque +from dataclasses import dataclass +from typing import ( + Any, + Callable, + Literal, + NamedTuple, + Optional, + Sequence, + Type, + Union, +) + +from langchain_core.runnables import Runnable, RunnableConfig + +from langgraph.checkpoint.base import BaseCheckpointSaver, CheckpointMetadata + +All = Literal["*"] +"""Special value to indicate that graph should interrupt on all nodes.""" + +Checkpointer = Union[None, Literal[False], BaseCheckpointSaver] +"""Type of the checkpointer to use for a subgraph. False disables checkpointing, +even if the parent graph has a checkpointer. None inherits checkpointer.""" + +StreamMode = Literal["values", "updates", "debug", "messages", "custom"] +"""How the stream method should emit outputs. + +- 'values': Emit all values of the state for each step. +- 'updates': Emit only the node name(s) and updates + that were returned by the node(s) **after** each step. +- 'debug': Emit debug events for each step. +- 'messages': Emit LLM messages token-by-token. +- 'custom': Emit custom output `write: StreamWriter` kwarg of each node. +""" + +StreamWriter = Callable[[Any], None] +"""Callable that accepts a single argument and writes it to the output stream. +Always injected into nodes if requested as a keyword argument, but it's a no-op +when not using stream_mode="custom".""" + + +def default_retry_on(exc: Exception) -> bool: + import httpx + import requests + + if isinstance(exc, ConnectionError): + return True + if isinstance( + exc, + ( + ValueError, + TypeError, + ArithmeticError, + ImportError, + LookupError, + NameError, + SyntaxError, + RuntimeError, + ReferenceError, + StopIteration, + StopAsyncIteration, + OSError, + ), + ): + return False + if isinstance(exc, httpx.HTTPStatusError): + return 500 <= exc.response.status_code < 600 + if isinstance(exc, requests.HTTPError): + return 500 <= exc.response.status_code < 600 if exc.response else True + return True + + +class RetryPolicy(NamedTuple): + """Configuration for retrying nodes.""" + + initial_interval: float = 0.5 + """Amount of time that must elapse before the first retry occurs. In seconds.""" + backoff_factor: float = 2.0 + """Multiplier by which the interval increases after each retry.""" + max_interval: float = 128.0 + """Maximum amount of time that may elapse between retries. In seconds.""" + max_attempts: int = 3 + """Maximum number of attempts to make before giving up, including the first.""" + jitter: bool = True + """Whether to add random jitter to the interval between retries.""" + retry_on: Union[ + Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool] + ] = default_retry_on + """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry.""" + + +class CachePolicy(NamedTuple): + """Configuration for caching nodes.""" + + pass + + +@dataclass +class Interrupt: + value: Any + when: Literal["during"] = "during" + + +class PregelTask(NamedTuple): + id: str + name: str + path: tuple[Union[str, int], ...] + error: Optional[Exception] = None + interrupts: tuple[Interrupt, ...] = () + state: Union[None, RunnableConfig, "StateSnapshot"] = None + + +class PregelExecutableTask(NamedTuple): + name: str + input: Any + proc: Runnable + writes: deque[tuple[str, Any]] + config: RunnableConfig + triggers: list[str] + retry_policy: Optional[RetryPolicy] + cache_policy: Optional[CachePolicy] + id: str + path: tuple[Union[str, int], ...] + scheduled: bool = False + + +class StateSnapshot(NamedTuple): + """Snapshot of the state of the graph at the beginning of a step.""" + + values: Union[dict[str, Any], Any] + """Current values of channels""" + next: tuple[str, ...] + """The name of the node to execute in each task for this step.""" + config: RunnableConfig + """Config used to fetch this snapshot""" + metadata: Optional[CheckpointMetadata] + """Metadata associated with this snapshot""" + created_at: Optional[str] + """Timestamp of snapshot creation""" + parent_config: Optional[RunnableConfig] + """Config used to fetch the parent snapshot, if any""" + tasks: tuple[PregelTask, ...] + """Tasks to execute in this step. If already attempted, may contain an error.""" + + +class Send: + """A message or packet to send to a specific node in the graph. + + The `Send` class is used within a `StateGraph`'s conditional edges to + dynamically invoke a node with a custom state at the next step. + + Importantly, the sent state can differ from the core graph's state, + allowing for flexible and dynamic workflow management. + + One such example is a "map-reduce" workflow where your graph invokes + the same node multiple times in parallel with different states, + before aggregating the results back into the main graph's state. + + Attributes: + node (str): The name of the target node to send the message to. + arg (Any): The state or message to send to the target node. + + Examples: + >>> from typing import Annotated + >>> import operator + >>> class OverallState(TypedDict): + ... subjects: list[str] + ... jokes: Annotated[list[str], operator.add] + ... + >>> from langgraph.types import Send + >>> from langgraph.graph import END, START + >>> def continue_to_jokes(state: OverallState): + ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] + ... + >>> from langgraph.graph import StateGraph + >>> builder = StateGraph(OverallState) + >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]}) + >>> builder.add_conditional_edges(START, continue_to_jokes) + >>> builder.add_edge("generate_joke", END) + >>> graph = builder.compile() + >>> + >>> # Invoking with two subjects results in a generated joke for each + >>> graph.invoke({"subjects": ["cats", "dogs"]}) + {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']} + """ + + __slots__ = ("node", "arg") + + node: str + arg: Any + + def __init__(self, /, node: str, arg: Any) -> None: + """ + Initialize a new instance of the Send class. + + Args: + node (str): The name of the target node to send the message to. + arg (Any): The state or message to send to the target node. + """ + self.node = node + self.arg = arg + + def __hash__(self) -> int: + return hash((self.node, self.arg)) + + def __repr__(self) -> str: + return f"Send(node={self.node!r}, arg={self.arg!r})" + + def __eq__(self, value: object) -> bool: + return ( + isinstance(value, Send) + and self.node == value.node + and self.arg == value.arg + ) diff --git a/libs/langgraph/langgraph/utils/runnable.py b/libs/langgraph/langgraph/utils/runnable.py index bfc2a627b..d81f86e34 100644 --- a/libs/langgraph/langgraph/utils/runnable.py +++ b/libs/langgraph/langgraph/utils/runnable.py @@ -35,7 +35,7 @@ from typing_extensions import TypeGuard from langgraph.constants import CONFIG_KEY_STREAM_WRITER -from langgraph.pregel.types import StreamWriter +from langgraph.types import StreamWriter from langgraph.utils.config import ( ensure_config, get_async_callback_manager_for_config, diff --git a/libs/langgraph/langgraph/version.py b/libs/langgraph/langgraph/version.py index 3368893c0..f5cb757f5 100644 --- a/libs/langgraph/langgraph/version.py +++ b/libs/langgraph/langgraph/version.py @@ -1,4 +1,4 @@ -"""Main entrypoint into package.""" +"""Exports package version.""" from importlib import metadata diff --git a/libs/langgraph/tests/any_str.py b/libs/langgraph/tests/any_str.py index 9a1977a8c..5643a00fb 100644 --- a/libs/langgraph/tests/any_str.py +++ b/libs/langgraph/tests/any_str.py @@ -1,6 +1,28 @@ import re from typing import Any, Sequence, Union +from typing_extensions import Self + + +class FloatBetween(float): + def __new__(cls, min_value: float, max_value: float) -> Self: + return super().__new__(cls, min_value) + + def __init__(self, min_value: float, max_value: float) -> None: + super().__init__() + self.min_value = min_value + self.max_value = max_value + + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, float) + and other >= self.min_value + and other <= self.max_value + ) + + def __hash__(self) -> int: + return hash((float(self), self.min_value, self.max_value)) + class AnyStr(str): def __init__(self, prefix: Union[str, re.Pattern] = "") -> None: diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index 3e43ef28f..605f00747 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -52,8 +52,8 @@ CheckpointTuple, ) from langgraph.checkpoint.memory import MemorySaver -from langgraph.constants import ERROR, PULL, PUSH, Interrupt, Send -from langgraph.errors import InvalidUpdateError, NodeInterrupt +from langgraph.constants import ERROR, PULL, PUSH +from langgraph.errors import InvalidUpdateError, MultipleSubgraphsError, NodeInterrupt from langgraph.graph import END, Graph from langgraph.graph.graph import START from langgraph.graph.message import MessageGraph, add_messages @@ -70,9 +70,9 @@ StateSnapshot, ) from langgraph.pregel.retry import RetryPolicy -from langgraph.pregel.types import PregelTask, StreamWriter from langgraph.store.memory import MemoryStore -from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence +from langgraph.types import Interrupt, PregelTask, Send, StreamWriter +from tests.any_str import AnyDict, AnyStr, AnyVersion, FloatBetween, UnsortedSequence from tests.conftest import ALL_CHECKPOINTERS_SYNC, SHOULD_CHECK_SNAPSHOTS from tests.fake_chat import FakeChatModel from tests.fake_tracer import FakeTracer @@ -1861,7 +1861,12 @@ def test_invoke_two_processes_two_in_join_two_out(mocker: MockerFixture) -> None assert [*executor.map(app.invoke, [2] * 100)] == [[13, 13]] * 100 -def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_SYNC) +def test_invoke_join_then_call_other_pregel( + mocker: MockerFixture, request: pytest.FixtureRequest, checkpointer_name: str +) -> None: + checkpointer = request.getfixturevalue(f"checkpointer_{checkpointer_name}") + add_one = mocker.Mock(side_effect=lambda x: x + 1) add_10_each = mocker.Mock(side_effect=lambda x: [y + 10 for y in x]) @@ -1912,6 +1917,17 @@ def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None: with ThreadPoolExecutor() as executor: assert [*executor.map(app.invoke, [[2, 3]] * 10)] == [27] * 10 + # add checkpointer + app.checkpointer = checkpointer + # subgraph is called twice in the same node, through .map(), so raises + with pytest.raises(MultipleSubgraphsError): + app.invoke([2, 3], {"configurable": {"thread_id": "1"}}) + + # set inner graph checkpointer NeverCheckpoint + inner_app.checkpointer = False + # subgraph still called twice, but checkpointing for inner graph is disabled + assert app.invoke([2, 3], {"configurable": {"thread_id": "1"}}) == 27 + def test_invoke_two_processes_one_in_two_out(mocker: MockerFixture) -> None: add_one = mocker.Mock(side_effect=lambda x: x + 1) @@ -8580,22 +8596,22 @@ def outer_2(state: State): assert chunks == [ # arrives before "inner" finishes ( - 0.0, + FloatBetween(0.0, 0.1), ( (AnyStr("inner:"),), {"inner_1": {"my_key": "got here", "my_other_key": ""}}, ), ), - (0.2, ((), {"outer_1": {"my_key": " and parallel"}})), + (FloatBetween(0.2, 0.3), ((), {"outer_1": {"my_key": " and parallel"}})), ( - 0.5, + FloatBetween(0.5, 0.6), ( (AnyStr("inner:"),), {"inner_2": {"my_key": " and there", "my_other_key": "got here"}}, ), ), - (0.5, ((), {"inner": {"my_key": "got here and there"}})), - (0.5, ((), {"outer_2": {"my_key": " and back again"}})), + (FloatBetween(0.5, 0.6), ((), {"inner": {"my_key": "got here and there"}})), + (FloatBetween(0.5, 0.6), ((), {"outer_2": {"my_key": " and back again"}})), ] diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 640658399..d17925aca 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -51,8 +51,8 @@ CheckpointTuple, ) from langgraph.checkpoint.memory import MemorySaver -from langgraph.constants import ERROR, PULL, PUSH, Interrupt, Send -from langgraph.errors import InvalidUpdateError, NodeInterrupt +from langgraph.constants import ERROR, PULL, PUSH +from langgraph.errors import InvalidUpdateError, MultipleSubgraphsError, NodeInterrupt from langgraph.graph import END, Graph, StateGraph from langgraph.graph.graph import START from langgraph.graph.message import MessageGraph, add_messages @@ -68,9 +68,9 @@ StateSnapshot, ) from langgraph.pregel.retry import RetryPolicy -from langgraph.pregel.types import PregelTask, StreamWriter from langgraph.store.memory import MemoryStore -from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence +from langgraph.types import Interrupt, PregelTask, Send, StreamWriter +from tests.any_str import AnyDict, AnyStr, AnyVersion, FloatBetween, UnsortedSequence from tests.conftest import ( ALL_CHECKPOINTERS_ASYNC, ALL_CHECKPOINTERS_ASYNC_PLUS_NONE, @@ -2080,7 +2080,10 @@ async def test_invoke_two_processes_two_in_join_two_out(mocker: MockerFixture) - ] -async def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_ASYNC) +async def test_invoke_join_then_call_other_pregel( + mocker: MockerFixture, checkpointer_name: str +) -> None: add_one = mocker.Mock(side_effect=lambda x: x + 1) add_10_each = mocker.Mock(side_effect=lambda x: [y + 10 for y in x]) @@ -2133,6 +2136,18 @@ async def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None 27 for _ in range(10) ] + async with awith_checkpointer(checkpointer_name) as checkpointer: + # add checkpointer + app.checkpointer = checkpointer + # subgraph is called twice in the same node, through .map(), so raises + with pytest.raises(MultipleSubgraphsError): + await app.ainvoke([2, 3], {"configurable": {"thread_id": "1"}}) + + # set inner graph checkpointer NeverCheckpoint + inner_app.checkpointer = False + # subgraph still called twice, but checkpointing for inner graph is disabled + assert await app.ainvoke([2, 3], {"configurable": {"thread_id": "1"}}) == 27 + async def test_invoke_two_processes_one_in_two_out(mocker: MockerFixture) -> None: add_one = mocker.Mock(side_effect=lambda x: x + 1) @@ -7187,22 +7202,22 @@ async def outer_2(state: State): assert chunks == [ # arrives before "inner" finishes ( - 0.0, + FloatBetween(0.0, 0.1), ( (AnyStr("inner:"),), {"inner_1": {"my_key": "got here", "my_other_key": ""}}, ), ), - (0.2, ((), {"outer_1": {"my_key": " and parallel"}})), + (FloatBetween(0.2, 0.3), ((), {"outer_1": {"my_key": " and parallel"}})), ( - 0.5, + FloatBetween(0.5, 0.6), ( (AnyStr("inner:"),), {"inner_2": {"my_key": " and there", "my_other_key": "got here"}}, ), ), - (0.5, ((), {"inner": {"my_key": "got here and there"}})), - (0.5, ((), {"outer_2": {"my_key": " and back again"}})), + (FloatBetween(0.5, 0.6), ((), {"inner": {"my_key": "got here and there"}})), + (FloatBetween(0.5, 0.6), ((), {"outer_2": {"my_key": " and back again"}})), ] diff --git a/libs/langgraph/tests/test_tracing_interops.py b/libs/langgraph/tests/test_tracing_interops.py index 9bc8750b5..5b458394b 100644 --- a/libs/langgraph/tests/test_tracing_interops.py +++ b/libs/langgraph/tests/test_tracing_interops.py @@ -5,11 +5,14 @@ from unittest.mock import MagicMock import langsmith as ls +import pytest from langchain_core.runnables import RunnableConfig from langchain_core.tracers import LangChainTracer from langgraph.graph import StateGraph +pytestmark = pytest.mark.anyio + def _get_mock_client(**kwargs: Any) -> ls.Client: mock_session = MagicMock() @@ -52,6 +55,7 @@ def wait_for( raise ValueError(f"Callable did not return within {total_time}") +@pytest.mark.skip("This test times out in CI") async def test_nested_tracing(): lt_py_311 = sys.version_info < (3, 11) mock_client = _get_mock_client() @@ -76,7 +80,7 @@ async def child_node(state: State) -> State: child_builder = StateGraph(State) child_builder.add_node(child_node) child_builder.add_edge("__start__", "child_node") - child_graph = child_builder.compile() + child_graph = child_builder.compile().with_config(run_name="child_graph") parent_builder = StateGraph(State) parent_builder.add_node(parent_node) @@ -101,7 +105,7 @@ def get_posts(): # If the callbacks weren't propagated correctly, we'd # end up with broken dotted_orders parent_run = next(data for data in posts if data["name"] == "parent_node") - child_run = next(data for data in posts if data["name"] == "child_node") + child_run = next(data for data in posts if data["name"] == "child_graph") traceable_run = next(data for data in posts if data["name"] == "some_traceable") assert child_run["dotted_order"].startswith(traceable_run["dotted_order"]) diff --git a/libs/scheduler-kafka/README.md b/libs/scheduler-kafka/README.md index 637a337dd..fd65d7f3c 100644 --- a/libs/scheduler-kafka/README.md +++ b/libs/scheduler-kafka/README.md @@ -95,7 +95,7 @@ You can pass any of the following values as `kwargs` to either `KafkaOrchestrato - batch_max_n (int): Maximum number of messages to include in a single batch. Default: 10. - batch_max_ms (int): Maximum time in milliseconds to wait for messages to include in a batch. Default: 1000. -- retry_policy (langgraph.pregel.types.RetryPolicy): Controls which graph-level errors will be retried when processing messages. A good use for this is to retry database errors thrown by the checkpointer. Defaults to None. +- retry_policy (langgraph.types.RetryPolicy): Controls which graph-level errors will be retried when processing messages. A good use for this is to retry database errors thrown by the checkpointer. Defaults to None. ### Connection settings diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py index 9cbb6bf83..c803239e8 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py @@ -25,7 +25,6 @@ ) from langgraph.pregel.manager import AsyncChannelsManager, ChannelsManager from langgraph.pregel.runner import PregelRunner -from langgraph.pregel.types import RetryPolicy from langgraph.scheduler.kafka.retry import aretry, retry from langgraph.scheduler.kafka.types import ( AsyncConsumer, @@ -38,6 +37,7 @@ Sendable, Topics, ) +from langgraph.types import RetryPolicy from langgraph.utils.config import patch_configurable diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py index 1ad9c5c5b..39e7b755b 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py @@ -24,7 +24,6 @@ from langgraph.pregel import Pregel from langgraph.pregel.executor import BackgroundExecutor, Submit from langgraph.pregel.loop import AsyncPregelLoop, SyncPregelLoop -from langgraph.pregel.types import RetryPolicy from langgraph.scheduler.kafka.retry import aretry, retry from langgraph.scheduler.kafka.types import ( AsyncConsumer, @@ -37,6 +36,7 @@ Producer, Topics, ) +from langgraph.types import RetryPolicy from langgraph.utils.config import patch_configurable @@ -158,6 +158,7 @@ async def attempt(self, msg: MessageToOrchestrator) -> None: specs=graph.channels, output_keys=graph.output_channels, stream_keys=graph.stream_channels, + check_subgraphs=False, ) as loop: if loop.tick( input_keys=graph.input_channels, @@ -347,6 +348,7 @@ def attempt(self, msg: MessageToOrchestrator) -> None: specs=graph.channels, output_keys=graph.output_channels, stream_keys=graph.stream_channels, + check_subgraphs=False, ) as loop: if loop.tick( input_keys=graph.input_channels, diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py index bb80047f8..74dbe3e27 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py @@ -6,7 +6,7 @@ from typing_extensions import ParamSpec -from langgraph.pregel.types import RetryPolicy +from langgraph.types import RetryPolicy logger = logging.getLogger(__name__) P = ParamSpec("P")