From 29548b2e27b5ab3cc7445b7709b63bb7a46b66fa Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Wed, 9 Oct 2024 17:56:40 +0200 Subject: [PATCH 1/5] fix(debug): add failing tests --- libs/langgraph/tests/test_pregel.py | 3 +++ libs/langgraph/tests/test_pregel_async.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index 348922e2b..a17cf4e99 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -11917,6 +11917,9 @@ def normalize_config(config: Optional[dict]) -> Optional[dict]: clean_config["thread_id"] = config["configurable"]["thread_id"] clean_config["checkpoint_id"] = config["configurable"]["checkpoint_id"] clean_config["checkpoint_ns"] = config["configurable"]["checkpoint_ns"] + clean_config["checkpoint_map"] = config["configurable"]["checkpoint_map"] + if "checkpoint_map" in config["configurable"]: + clean_config["checkpoint_map"] = config["configurable"]["checkpoint_map"] return clean_config diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 96317d041..928cfc5eb 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -10143,6 +10143,8 @@ def normalize_config(config: Optional[dict]) -> Optional[dict]: clean_config["thread_id"] = config["configurable"]["thread_id"] clean_config["checkpoint_id"] = config["configurable"]["checkpoint_id"] clean_config["checkpoint_ns"] = config["configurable"]["checkpoint_ns"] + if "checkpoint_map" in config["configurable"]: + clean_config["checkpoint_map"] = config["configurable"]["checkpoint_map"] return clean_config From c5ec568cfbf9dcedc26a7d6de699c564383d0479 Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Wed, 9 Oct 2024 17:58:03 +0200 Subject: [PATCH 2/5] Patch config before entering map_debug_checkpoint --- libs/langgraph/langgraph/pregel/loop.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index dadc54b7a..e472bdd1a 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -101,7 +101,7 @@ from langgraph.pregel.utils import get_new_channel_versions from langgraph.store.base import BaseStore from langgraph.types import All, PregelExecutableTask, StreamMode -from langgraph.utils.config import patch_configurable +from langgraph.utils.config import patch_checkpoint_map, patch_configurable V = TypeVar("V") P = ParamSpec("P") @@ -176,6 +176,7 @@ class PregelLoop: checkpoint_pending_writes: List[PendingWrite] checkpoint_previous_versions: dict[str, Union[str, float, int]] prev_checkpoint_config: Optional[RunnableConfig] + prev_checkpoint_metadata: Optional[CheckpointMetadata] step: int stop: int @@ -362,14 +363,16 @@ def tick( "debug", map_debug_checkpoint, self.step - 1, # printing checkpoint for previous step - self.checkpoint_config, + patch_checkpoint_map(self.checkpoint_config, self.checkpoint_metadata), self.channels, self.stream_keys, self.checkpoint_metadata, self.checkpoint, self.tasks.values(), self.checkpoint_pending_writes, - self.prev_checkpoint_config, + patch_checkpoint_map( + self.prev_checkpoint_config, self.prev_checkpoint_metadata + ), self.output_keys, ) @@ -518,13 +521,15 @@ def _put_checkpoint(self, metadata: CheckpointMetadata) -> None: self.checkpoint = create_checkpoint(self.checkpoint, self.channels, self.step) # bail if no checkpointer if self._checkpointer_put_after_previous is not None: + self.prev_checkpoint_metadata = self.checkpoint_metadata + self.checkpoint_metadata = metadata + self.prev_checkpoint_config = ( self.checkpoint_config if CONFIG_KEY_CHECKPOINT_ID in self.checkpoint_config[CONF] and self.checkpoint_config[CONF][CONFIG_KEY_CHECKPOINT_ID] else None ) - self.checkpoint_metadata = metadata self.checkpoint_config = { **self.checkpoint_config, CONF: { From b818bf2fba07dc19a423b287d9f04ade2394815e Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Mon, 14 Oct 2024 15:32:41 -0700 Subject: [PATCH 3/5] Fix up --- libs/langgraph/langgraph/pregel/debug.py | 4 +++- libs/langgraph/langgraph/pregel/loop.py | 12 ++++-------- libs/langgraph/langgraph/utils/config.py | 6 ++++-- libs/langgraph/tests/test_pregel.py | 1 - 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/libs/langgraph/langgraph/pregel/debug.py b/libs/langgraph/langgraph/pregel/debug.py index f70a6e8c4..1fc922761 100644 --- a/libs/langgraph/langgraph/pregel/debug.py +++ b/libs/langgraph/langgraph/pregel/debug.py @@ -32,6 +32,7 @@ from langgraph.pregel.io import read_channels from langgraph.pregel.utils import find_subgraph_pregel from langgraph.types import PregelExecutableTask, PregelTask, StateSnapshot +from langgraph.utils.config import patch_checkpoint_map class TaskPayload(TypedDict): @@ -177,8 +178,9 @@ def map_debug_checkpoint( "timestamp": checkpoint["ts"], "step": step, "payload": { - "config": config, + "config": patch_checkpoint_map(config, metadata), "parent_config": parent_config, + # "parent_config": patch_checkpoint_map(parent_config, metadata), "values": read_channels(channels, stream_channels), "metadata": metadata, "next": [t.name for t in tasks], diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index e472bdd1a..d4f3a52c3 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -101,7 +101,7 @@ from langgraph.pregel.utils import get_new_channel_versions from langgraph.store.base import BaseStore from langgraph.types import All, PregelExecutableTask, StreamMode -from langgraph.utils.config import patch_checkpoint_map, patch_configurable +from langgraph.utils.config import patch_configurable V = TypeVar("V") P = ParamSpec("P") @@ -176,7 +176,6 @@ class PregelLoop: checkpoint_pending_writes: List[PendingWrite] checkpoint_previous_versions: dict[str, Union[str, float, int]] prev_checkpoint_config: Optional[RunnableConfig] - prev_checkpoint_metadata: Optional[CheckpointMetadata] step: int stop: int @@ -363,16 +362,14 @@ def tick( "debug", map_debug_checkpoint, self.step - 1, # printing checkpoint for previous step - patch_checkpoint_map(self.checkpoint_config, self.checkpoint_metadata), + self.checkpoint_config, self.channels, self.stream_keys, self.checkpoint_metadata, self.checkpoint, self.tasks.values(), self.checkpoint_pending_writes, - patch_checkpoint_map( - self.prev_checkpoint_config, self.prev_checkpoint_metadata - ), + self.prev_checkpoint_config, self.output_keys, ) @@ -505,7 +502,7 @@ def _first(self, *, input_keys: Union[str, Sequence[str]]) -> None: ) def _put_checkpoint(self, metadata: CheckpointMetadata) -> None: - # assign step + # assign step and parents metadata["step"] = self.step metadata["parents"] = self.config[CONF].get(CONFIG_KEY_CHECKPOINT_MAP, {}) # debug flag @@ -521,7 +518,6 @@ def _put_checkpoint(self, metadata: CheckpointMetadata) -> None: self.checkpoint = create_checkpoint(self.checkpoint, self.channels, self.step) # bail if no checkpointer if self._checkpointer_put_after_previous is not None: - self.prev_checkpoint_metadata = self.checkpoint_metadata self.checkpoint_metadata = metadata self.prev_checkpoint_config = ( diff --git a/libs/langgraph/langgraph/utils/config.py b/libs/langgraph/langgraph/utils/config.py index 3993df9b2..fe25b6d9a 100644 --- a/libs/langgraph/langgraph/utils/config.py +++ b/libs/langgraph/langgraph/utils/config.py @@ -36,9 +36,11 @@ def patch_configurable( def patch_checkpoint_map( - config: RunnableConfig, metadata: Optional[CheckpointMetadata] + config: Optional[RunnableConfig], metadata: Optional[CheckpointMetadata] ) -> RunnableConfig: - if parents := (metadata.get("parents") if metadata else None): + if config is None: + return config + elif parents := (metadata.get("parents") if metadata else None): conf = config[CONF] return patch_configurable( config, diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index a17cf4e99..febc85f24 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -11917,7 +11917,6 @@ def normalize_config(config: Optional[dict]) -> Optional[dict]: clean_config["thread_id"] = config["configurable"]["thread_id"] clean_config["checkpoint_id"] = config["configurable"]["checkpoint_id"] clean_config["checkpoint_ns"] = config["configurable"]["checkpoint_ns"] - clean_config["checkpoint_map"] = config["configurable"]["checkpoint_map"] if "checkpoint_map" in config["configurable"]: clean_config["checkpoint_map"] = config["configurable"]["checkpoint_map"] From 233bd78ee4e7c3ccfa4b779900ceba6bae30b3ed Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Mon, 14 Oct 2024 15:44:18 -0700 Subject: [PATCH 4/5] Add checkpoint_map to parent_config --- libs/langgraph/langgraph/pregel/__init__.py | 4 +- libs/langgraph/langgraph/pregel/debug.py | 3 +- libs/langgraph/tests/test_pregel.py | 79 +++++++++++++++++++++ libs/langgraph/tests/test_pregel_async.py | 58 +++++++++++++++ 4 files changed, 140 insertions(+), 4 deletions(-) diff --git a/libs/langgraph/langgraph/pregel/__init__.py b/libs/langgraph/langgraph/pregel/__init__.py index 01d53feab..5436d426d 100644 --- a/libs/langgraph/langgraph/pregel/__init__.py +++ b/libs/langgraph/langgraph/pregel/__init__.py @@ -484,7 +484,7 @@ def _prepare_state_snapshot( patch_checkpoint_map(saved.config, saved.metadata), saved.metadata, saved.checkpoint["ts"], - saved.parent_config, + patch_checkpoint_map(saved.parent_config, saved.metadata), tasks_w_writes( next_tasks.values(), saved.pending_writes, @@ -565,7 +565,7 @@ async def _aprepare_state_snapshot( patch_checkpoint_map(saved.config, saved.metadata), saved.metadata, saved.checkpoint["ts"], - saved.parent_config, + patch_checkpoint_map(saved.parent_config, saved.metadata), tasks_w_writes( next_tasks.values(), saved.pending_writes, diff --git a/libs/langgraph/langgraph/pregel/debug.py b/libs/langgraph/langgraph/pregel/debug.py index 1fc922761..d772e7cba 100644 --- a/libs/langgraph/langgraph/pregel/debug.py +++ b/libs/langgraph/langgraph/pregel/debug.py @@ -179,8 +179,7 @@ def map_debug_checkpoint( "step": step, "payload": { "config": patch_checkpoint_map(config, metadata), - "parent_config": parent_config, - # "parent_config": patch_checkpoint_map(parent_config, metadata), + "parent_config": patch_checkpoint_map(parent_config, metadata), "values": read_channels(channels, stream_channels), "metadata": metadata, "next": [t.name for t in tasks], diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index febc85f24..a73083290 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -9092,6 +9092,9 @@ def outer_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("inner:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, ), @@ -9250,6 +9253,9 @@ def outer_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("inner:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, tasks=(PregelTask(AnyStr(), "inner_2", (PULL, "inner_2")),), @@ -9279,6 +9285,9 @@ def outer_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("inner:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, tasks=( @@ -9707,6 +9716,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, ) @@ -9770,6 +9786,15 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr( + re.compile(r"child:.+|child1:") + ): AnyStr(), + } + ), } }, ), @@ -9798,6 +9823,9 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("child:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, ), @@ -10056,6 +10084,9 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("child:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, tasks=(), @@ -10085,6 +10116,9 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("child:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, tasks=( @@ -10170,6 +10204,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, tasks=(), @@ -10208,6 +10249,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, tasks=( @@ -10253,6 +10301,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, tasks=( @@ -10444,6 +10499,12 @@ def edit(state: JokeState): "thread_id": "1", "checkpoint_ns": AnyStr("generate_joke:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("generate_joke:"): AnyStr(), + } + ), } }, tasks=(PregelTask(id=AnyStr(""), name="generate", path=(PULL, "generate")),), @@ -10476,6 +10537,12 @@ def edit(state: JokeState): "thread_id": "1", "checkpoint_ns": AnyStr("generate_joke:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("generate_joke:"): AnyStr(), + } + ), } }, tasks=(PregelTask(id=AnyStr(""), name="generate", path=(PULL, "generate")),), @@ -10931,6 +10998,12 @@ def weather_graph(state: RouterState): "thread_id": "14", "checkpoint_ns": AnyStr("weather_graph:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("weather_graph:"): AnyStr(), + } + ), } }, tasks=( @@ -11020,6 +11093,12 @@ def weather_graph(state: RouterState): "thread_id": "14", "checkpoint_ns": AnyStr("weather_graph:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("weather_graph:"): AnyStr(), + } + ), } }, tasks=(), diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 928cfc5eb..44bee3f72 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -7738,6 +7738,9 @@ def outer_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("inner:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, ), @@ -7903,6 +7906,9 @@ def outer_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("inner:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("inner:"): AnyStr()} + ), } }, tasks=( @@ -7934,6 +7940,9 @@ def outer_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("inner:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("inner:"): AnyStr()} + ), } }, tasks=( @@ -8328,6 +8337,9 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("child:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, ).tasks[0] @@ -8374,6 +8386,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, ) @@ -8439,6 +8458,15 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr( + re.compile(r"child:.+|child1:") + ): AnyStr(), + } + ), } }, ), @@ -8467,6 +8495,9 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("child:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, ), @@ -8732,6 +8763,9 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("child:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, tasks=(), @@ -8761,6 +8795,9 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr("child:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), } }, tasks=( @@ -8850,6 +8887,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, tasks=(), @@ -8888,6 +8932,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, tasks=( @@ -8933,6 +8984,13 @@ def parent_2(state: State): "thread_id": "1", "checkpoint_ns": AnyStr(), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), } }, tasks=( From ff310cc8d609d246844c3bb452111b55f99e1386 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Mon, 14 Oct 2024 15:45:12 -0700 Subject: [PATCH 5/5] One more --- libs/langgraph/tests/test_pregel_async.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 44bee3f72..1c45d414f 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -9626,6 +9626,12 @@ def get_first_in_list(): "thread_id": "14", "checkpoint_ns": AnyStr("weather_graph:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("weather_graph:"): AnyStr(), + } + ), } }, tasks=( @@ -9717,6 +9723,12 @@ def get_first_in_list(): "thread_id": "14", "checkpoint_ns": AnyStr("weather_graph:"), "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("weather_graph:"): AnyStr(), + } + ), } }, tasks=(),