Skip to content

Commit

Permalink
Merge pull request #2596 from langchain-ai/dqbd/sdk-cancel-on-disconnect
Browse files Browse the repository at this point in the history
feat(sdk): pass cancel on disconnect when joining stream
  • Loading branch information
dqbd authored Dec 3, 2024
2 parents 63f5f15 + a91bf11 commit c6fe265
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion libs/sdk-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@langchain/langgraph-sdk",
"version": "0.0.29",
"version": "0.0.30",
"description": "Client library for interacting with the LangGraph API",
"type": "module",
"packageManager": "[email protected]",
Expand Down
17 changes: 13 additions & 4 deletions libs/sdk-js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1021,19 +1021,28 @@ export class RunsClient extends BaseClient {
*
* @param threadId The ID of the thread.
* @param runId The ID of the run.
* @param signal An optional abort signal.
* @returns An async generator yielding stream parts.
*/
async *joinStream(
threadId: string,
runId: string,
signal?: AbortSignal,
options?:
| { signal?: AbortSignal; cancelOnDisconnect?: boolean }
| AbortSignal,
): AsyncGenerator<{ event: StreamEvent; data: any }> {
const opts =
typeof options === "object" &&
options != null &&
options instanceof AbortSignal
? { signal: options }
: options;

const response = await this.asyncCaller.fetch(
...this.prepareFetchOptions(`/threads/${threadId}/runs/${runId}/stream`, {
method: "GET",
timeoutMs: null,
signal,
signal: opts?.signal,
params: { cancel_on_disconnect: opts?.cancelOnDisconnect ? "1" : "0" },
}),
);

Expand All @@ -1048,7 +1057,7 @@ export class RunsClient extends BaseClient {
async start(ctrl) {
parser = createParser((event) => {
if (
(signal && signal.aborted) ||
(opts?.signal && opts.signal.aborted) ||
(event.type === "event" && event.data === "[DONE]")
) {
ctrl.terminate();
Expand Down
11 changes: 9 additions & 2 deletions libs/sdk-py/langgraph_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1796,14 +1796,17 @@ async def join(self, thread_id: str, run_id: str) -> dict:
""" # noqa: E501
return await self.http.get(f"/threads/{thread_id}/runs/{run_id}/join")

def join_stream(self, thread_id: str, run_id: str) -> AsyncIterator[StreamPart]:
def join_stream(
self, thread_id: str, run_id: str, *, cancel_on_disconnect: bool = False
) -> AsyncIterator[StreamPart]:
"""Stream output from a run in real-time, until the run is done.
Output is not buffered, so any output produced before this call will
not be received here.
Args:
thread_id: The thread ID to join.
run_id: The run ID to join.
cancel_on_disconnect: Whether to cancel the run when the stream is disconnected.
Returns:
None
Expand All @@ -1816,7 +1819,11 @@ def join_stream(self, thread_id: str, run_id: str) -> AsyncIterator[StreamPart]:
)
""" # noqa: E501
return self.http.stream(f"/threads/{thread_id}/runs/{run_id}/stream", "GET")
return self.http.stream(
f"/threads/{thread_id}/runs/{run_id}/stream",
"GET",
params={"cancel_on_disconnect": cancel_on_disconnect},
)

async def delete(self, thread_id: str, run_id: str) -> None:
"""Delete a run.
Expand Down
2 changes: 1 addition & 1 deletion libs/sdk-py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langgraph-sdk"
version = "0.1.40"
version = "0.1.41"
description = "SDK for interacting with LangGraph API"
authors = []
license = "MIT"
Expand Down

0 comments on commit c6fe265

Please sign in to comment.