Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(debug): send checkpoint_map as well #2065

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/langgraph/langgraph/pregel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check notice on line 1 in libs/langgraph/langgraph/pregel/__init__.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 47.8 ms +- 1.0 ms ......................................... fanout_to_subgraph_10x_sync: Mean +- std dev: 45.8 ms +- 3.1 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 76.7 ms +- 1.8 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 85.2 ms +- 2.0 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 476 ms +- 12 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 433 ms +- 14 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 791 ms +- 37 ms ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 835 ms +- 17 ms ......................................... react_agent_10x: Mean +- std dev: 28.4 ms +- 0.7 ms ......................................... react_agent_10x_sync: Mean +- std dev: 20.7 ms +- 0.2 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 46.4 ms +- 3.3 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 36.4 ms +- 3.4 ms ......................................... react_agent_100x: Mean +- std dev: 322 ms +- 14 ms ......................................... react_agent_100x_sync: Mean +- std dev: 258 ms +- 14 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 906 ms +- 15 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 809 ms +- 14 ms ......................................... wide_state_25x300: Mean +- std dev: 18.4 ms +- 0.4 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 10.9 ms +- 0.1 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 271 ms +- 4 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 260 ms +- 3 ms ......................................... wide_state_15x600: Mean +- std dev: 21.3 ms +- 0.5 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 12.5 ms +- 0.1 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 469 ms +- 3 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 458 ms +- 3 ms ......................................... wide_state_9x1200: Mean +- std dev: 21.4 ms +- 0.4 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 12.6 ms +- 0.2 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 307 ms +- 6 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 293 ms +- 3 ms

Check notice on line 1 in libs/langgraph/langgraph/pregel/__init__.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +=========================================+=========+=======================+ | fanout_to_subgraph_100x_checkpoint | 882 ms | 791 ms: 1.12x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint_sync | 882 ms | 809 ms: 1.09x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x | 511 ms | 476 ms: 1.07x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint | 966 ms | 906 ms: 1.07x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_checkpoint_sync | 38.5 ms | 36.4 ms: 1.06x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_checkpoint | 49.0 ms | 46.4 ms: 1.06x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_sync | 47.4 ms | 45.8 ms: 1.03x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300_checkpoint | 280 ms | 271 ms: 1.03x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint_sync | 859 ms | 835 ms: 1.03x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300_checkpoint_sync | 267 ms | 260 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_sync | 444 ms | 433 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300_sync | 11.1 ms | 10.9 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint_sync | 87.1 ms | 85.2 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint | 78.4 ms | 76.7 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_sync | 12.8 ms | 12.5 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300 | 18.8 ms | 18.4 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_sync | 263 ms | 258 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x | 327 ms | 322 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600 | 21.7 ms | 21.3 ms: 1.02x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint | 476 ms | 469 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200 | 21.7 ms | 21.4 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_sync | 21.0 ms | 20.7 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x | 48.4 ms | 47.8 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint_sync | 463 ms | 458 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200_sync | 12.7 ms | 12.6 ms: 1.01x faster | +---------------------------------------

import asyncio
import concurrent
Expand Down Expand Up @@ -484,7 +484,7 @@
patch_checkpoint_map(saved.config, saved.metadata),
saved.metadata,
saved.checkpoint["ts"],
saved.parent_config,
patch_checkpoint_map(saved.parent_config, saved.metadata),
tasks_w_writes(
next_tasks.values(),
saved.pending_writes,
Expand Down Expand Up @@ -565,7 +565,7 @@
patch_checkpoint_map(saved.config, saved.metadata),
saved.metadata,
saved.checkpoint["ts"],
saved.parent_config,
patch_checkpoint_map(saved.parent_config, saved.metadata),
tasks_w_writes(
next_tasks.values(),
saved.pending_writes,
Expand Down
5 changes: 3 additions & 2 deletions libs/langgraph/langgraph/pregel/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from langgraph.pregel.io import read_channels
from langgraph.pregel.utils import find_subgraph_pregel
from langgraph.types import PregelExecutableTask, PregelTask, StateSnapshot
from langgraph.utils.config import patch_checkpoint_map


class TaskPayload(TypedDict):
Expand Down Expand Up @@ -177,8 +178,8 @@ def map_debug_checkpoint(
"timestamp": checkpoint["ts"],
"step": step,
"payload": {
"config": config,
"parent_config": parent_config,
"config": patch_checkpoint_map(config, metadata),
"parent_config": patch_checkpoint_map(parent_config, metadata),
"values": read_channels(channels, stream_channels),
"metadata": metadata,
"next": [t.name for t in tasks],
Expand Down
5 changes: 3 additions & 2 deletions libs/langgraph/langgraph/pregel/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ def _first(self, *, input_keys: Union[str, Sequence[str]]) -> None:
)

def _put_checkpoint(self, metadata: CheckpointMetadata) -> None:
# assign step
# assign step and parents
metadata["step"] = self.step
metadata["parents"] = self.config[CONF].get(CONFIG_KEY_CHECKPOINT_MAP, {})
# debug flag
Expand All @@ -518,13 +518,14 @@ def _put_checkpoint(self, metadata: CheckpointMetadata) -> None:
self.checkpoint = create_checkpoint(self.checkpoint, self.channels, self.step)
# bail if no checkpointer
if self._checkpointer_put_after_previous is not None:
self.checkpoint_metadata = metadata

self.prev_checkpoint_config = (
self.checkpoint_config
if CONFIG_KEY_CHECKPOINT_ID in self.checkpoint_config[CONF]
and self.checkpoint_config[CONF][CONFIG_KEY_CHECKPOINT_ID]
else None
)
self.checkpoint_metadata = metadata
self.checkpoint_config = {
**self.checkpoint_config,
CONF: {
Expand Down
6 changes: 4 additions & 2 deletions libs/langgraph/langgraph/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ def patch_configurable(


def patch_checkpoint_map(
config: RunnableConfig, metadata: Optional[CheckpointMetadata]
config: Optional[RunnableConfig], metadata: Optional[CheckpointMetadata]
) -> RunnableConfig:
if parents := (metadata.get("parents") if metadata else None):
if config is None:
return config
elif parents := (metadata.get("parents") if metadata else None):
conf = config[CONF]
return patch_configurable(
config,
Expand Down
81 changes: 81 additions & 0 deletions libs/langgraph/tests/test_pregel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9092,6 +9092,9 @@ def outer_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("inner:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
),
Expand Down Expand Up @@ -9250,6 +9253,9 @@ def outer_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("inner:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(PregelTask(AnyStr(), "inner_2", (PULL, "inner_2")),),
Expand Down Expand Up @@ -9279,6 +9285,9 @@ def outer_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("inner:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(
Expand Down Expand Up @@ -9707,6 +9716,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
)
Expand Down Expand Up @@ -9770,6 +9786,15 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(
re.compile(r"child:.+|child1:")
): AnyStr(),
}
),
}
},
),
Expand Down Expand Up @@ -9798,6 +9823,9 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("child:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
),
Expand Down Expand Up @@ -10056,6 +10084,9 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("child:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(),
Expand Down Expand Up @@ -10085,6 +10116,9 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("child:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(
Expand Down Expand Up @@ -10170,6 +10204,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
tasks=(),
Expand Down Expand Up @@ -10208,6 +10249,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
tasks=(
Expand Down Expand Up @@ -10253,6 +10301,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
tasks=(
Expand Down Expand Up @@ -10444,6 +10499,12 @@ def edit(state: JokeState):
"thread_id": "1",
"checkpoint_ns": AnyStr("generate_joke:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("generate_joke:"): AnyStr(),
}
),
}
},
tasks=(PregelTask(id=AnyStr(""), name="generate", path=(PULL, "generate")),),
Expand Down Expand Up @@ -10476,6 +10537,12 @@ def edit(state: JokeState):
"thread_id": "1",
"checkpoint_ns": AnyStr("generate_joke:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("generate_joke:"): AnyStr(),
}
),
}
},
tasks=(PregelTask(id=AnyStr(""), name="generate", path=(PULL, "generate")),),
Expand Down Expand Up @@ -10931,6 +10998,12 @@ def weather_graph(state: RouterState):
"thread_id": "14",
"checkpoint_ns": AnyStr("weather_graph:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("weather_graph:"): AnyStr(),
}
),
}
},
tasks=(
Expand Down Expand Up @@ -11020,6 +11093,12 @@ def weather_graph(state: RouterState):
"thread_id": "14",
"checkpoint_ns": AnyStr("weather_graph:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("weather_graph:"): AnyStr(),
}
),
}
},
tasks=(),
Expand Down Expand Up @@ -11917,6 +11996,8 @@ def normalize_config(config: Optional[dict]) -> Optional[dict]:
clean_config["thread_id"] = config["configurable"]["thread_id"]
clean_config["checkpoint_id"] = config["configurable"]["checkpoint_id"]
clean_config["checkpoint_ns"] = config["configurable"]["checkpoint_ns"]
if "checkpoint_map" in config["configurable"]:
clean_config["checkpoint_map"] = config["configurable"]["checkpoint_map"]

return clean_config

Expand Down
Loading
Loading