Skip to content

Commit

Permalink
Test parallel store writes
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw committed Nov 21, 2024
1 parent 7082e26 commit de27a37
Show file tree
Hide file tree
Showing 10 changed files with 553 additions and 257 deletions.
20 changes: 4 additions & 16 deletions libs/checkpoint-postgres/langgraph/checkpoint/postgres/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import threading

Check notice on line 1 in libs/checkpoint-postgres/langgraph/checkpoint/postgres/__init__.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 61.1 ms +- 1.5 ms ......................................... fanout_to_subgraph_10x_sync: Mean +- std dev: 51.4 ms +- 1.0 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 90.8 ms +- 6.8 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 93.8 ms +- 0.8 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 584 ms +- 20 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 499 ms +- 5 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 891 ms +- 37 ms ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 925 ms +- 15 ms ......................................... react_agent_10x: Mean +- std dev: 31.2 ms +- 0.6 ms ......................................... react_agent_10x_sync: Mean +- std dev: 22.4 ms +- 0.3 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 47.0 ms +- 0.9 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 36.9 ms +- 0.4 ms ......................................... react_agent_100x: Mean +- std dev: 341 ms +- 7 ms ......................................... react_agent_100x_sync: Mean +- std dev: 270 ms +- 4 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 936 ms +- 10 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 834 ms +- 9 ms ......................................... wide_state_25x300: Mean +- std dev: 24.2 ms +- 0.4 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 15.4 ms +- 0.1 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 279 ms +- 4 ms ......................................... WARNING: the benchmark result may be unstable * the maximum (473 ms) is 78% greater than the mean (266 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. wide_state_25x300_checkpoint_sync: Mean +- std dev: 266 ms +- 19 ms ......................................... wide_state_15x600: Mean +- std dev: 28.2 ms +- 0.4 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 17.8 ms +- 0.2 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 479 ms +- 7 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 464 ms +- 11 ms ......................................... wide_state_9x1200: Mean +- std dev: 28.3 ms +- 0.5 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 17.8 ms +- 0.1 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 312 ms +- 2 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 298 ms +- 4 ms

Check notice on line 1 in libs/checkpoint-postgres/langgraph/checkpoint/postgres/__init__.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +=========================================+=========+=======================+ | fanout_to_subgraph_100x_checkpoint | 951 ms | 891 ms: 1.07x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x | 614 ms | 584 ms: 1.05x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint | 94.6 ms | 90.8 ms: 1.04x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_sync | 18.0 ms | 17.8 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_sync | 505 ms | 499 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint | 947 ms | 936 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_sync | 51.9 ms | 51.4 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x | 61.8 ms | 61.1 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x | 344 ms | 341 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint_sync | 933 ms | 925 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_sync | 22.6 ms | 22.4 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_checkpoint | 47.4 ms | 47.0 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint_sync | 839 ms | 834 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200_sync | 17.9 ms | 17.8 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_sync | 272 ms | 270 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600 | 28.3 ms | 28.2 ms: 1.01x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint | 481 ms | 479 ms: 1.00x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint_sync | 94.1 ms | 93.8 ms: 1.00x faster | +-----------------------------------------+---------+-----------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------------+---------+-----------------------+ Benchmark hidden because not significant (10): wide_state_25x300, wide_state_9x1200_checkpoint, wide_state_25x300_sync, wide_state_9x1200_checkpoint_sync, wide_state_25x300_checkpoint, wide_state_25x300_checkpoint_sync, wide_state_15x600_checkpoint_sync, wide_state_9x1200, react_agent_10x_checkpoint_sync, react_agent_10x
from contextlib import contextmanager
from typing import Any, Iterator, Optional, Sequence, Union
from typing import Any, Iterator, Optional, Sequence

from langchain_core.runnables import RunnableConfig
from psycopg import Capabilities, Connection, Cursor, Pipeline
Expand All @@ -17,29 +17,17 @@
CheckpointTuple,
get_checkpoint_id,
)
from langgraph.checkpoint.postgres import _internal
from langgraph.checkpoint.postgres.base import BasePostgresSaver
from langgraph.checkpoint.serde.base import SerializerProtocol

Conn = Union[Connection[DictRow], ConnectionPool[Connection[DictRow]]]


@contextmanager
def _get_connection(conn: Conn) -> Iterator[Connection[DictRow]]:
if isinstance(conn, Connection):
yield conn
elif isinstance(conn, ConnectionPool):
with conn.connection() as conn:
yield conn
else:
raise TypeError(f"Invalid connection type: {type(conn)}")


class PostgresSaver(BasePostgresSaver):
lock: threading.Lock

def __init__(
self,
conn: Conn,
conn: _internal.Conn,
pipe: Optional[Pipeline] = None,
serde: Optional[SerializerProtocol] = None,
) -> None:
Expand Down Expand Up @@ -373,7 +361,7 @@ def _cursor(self, *, pipeline: bool = False) -> Iterator[Cursor[DictRow]]:
Will be applied regardless of whether the PostgresSaver instance was initialized with a pipeline.
If pipeline mode is not supported, will fall back to using transaction context manager.
"""
with _get_connection(self.conn) as conn:
with _internal.get_connection(self.conn) as conn:
if self.pipe:
# a connection in pipeline mode can be used concurrently
# in multiple threads/coroutines, but only one cursor can be
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Shared async utility functions for the Postgres checkpoint & storage classes."""

from contextlib import asynccontextmanager
from typing import AsyncIterator, Union

from psycopg import AsyncConnection
from psycopg.rows import DictRow
from psycopg_pool import AsyncConnectionPool

Conn = Union[AsyncConnection[DictRow], AsyncConnectionPool[AsyncConnection[DictRow]]]


@asynccontextmanager
async def get_connection(
conn: Conn,
) -> AsyncIterator[AsyncConnection[DictRow]]:
if isinstance(conn, AsyncConnection):
yield conn
elif isinstance(conn, AsyncConnectionPool):
async with conn.connection() as conn:
yield conn
else:
raise TypeError(f"Invalid connection type: {type(conn)}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Shared utility functions for the Postgres checkpoint & storage classes."""

from contextlib import contextmanager
from typing import Iterator, Union

from psycopg import Connection
from psycopg.rows import DictRow
from psycopg_pool import ConnectionPool

Conn = Union[Connection[DictRow], ConnectionPool[Connection[DictRow]]]


@contextmanager
def get_connection(conn: Conn) -> Iterator[Connection[DictRow]]:
if isinstance(conn, Connection):
yield conn
elif isinstance(conn, ConnectionPool):
with conn.connection() as conn:
yield conn
else:
raise TypeError(f"Invalid connection type: {type(conn)}")
22 changes: 4 additions & 18 deletions libs/checkpoint-postgres/langgraph/checkpoint/postgres/aio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Iterator, Optional, Sequence, Union
from typing import Any, AsyncIterator, Iterator, Optional, Sequence

from langchain_core.runnables import RunnableConfig
from psycopg import AsyncConnection, AsyncCursor, AsyncPipeline, Capabilities
Expand All @@ -17,31 +17,17 @@
CheckpointTuple,
get_checkpoint_id,
)
from langgraph.checkpoint.postgres import _ainternal
from langgraph.checkpoint.postgres.base import BasePostgresSaver
from langgraph.checkpoint.serde.base import SerializerProtocol

Conn = Union[AsyncConnection[DictRow], AsyncConnectionPool[AsyncConnection[DictRow]]]


@asynccontextmanager
async def _get_connection(
conn: Conn,
) -> AsyncIterator[AsyncConnection[DictRow]]:
if isinstance(conn, AsyncConnection):
yield conn
elif isinstance(conn, AsyncConnectionPool):
async with conn.connection() as conn:
yield conn
else:
raise TypeError(f"Invalid connection type: {type(conn)}")


class AsyncPostgresSaver(BasePostgresSaver):
lock: asyncio.Lock

def __init__(
self,
conn: Conn,
conn: _ainternal.Conn,
pipe: Optional[AsyncPipeline] = None,
serde: Optional[SerializerProtocol] = None,
) -> None:
Expand Down Expand Up @@ -331,7 +317,7 @@ async def _cursor(
Will be applied regardless of whether the AsyncPostgresSaver instance was initialized with a pipeline.
If pipeline mode is not supported, will fall back to using transaction context manager.
"""
async with _get_connection(self.conn) as conn:
async with _ainternal.get_connection(self.conn) as conn:
if self.pipe:
# a connection in pipeline mode can be used concurrently
# in multiple threads/coroutines, but only one cursor can be
Expand Down
Loading

0 comments on commit de27a37

Please sign in to comment.