Skip to content

Commit

Permalink
Merge pull request #2032 from langchain-ai/nc/7oct/kafka-loop-arg
Browse files Browse the repository at this point in the history
scheduler-kafka: Pass loop arg to default async consumer and producer
  • Loading branch information
nfcampos authored Oct 7, 2024
2 parents debfd85 + e5b4cd2 commit 4685dc1
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
3 changes: 3 additions & 0 deletions libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
self.retry_policy = retry_policy

async def __aenter__(self) -> Self:
loop = asyncio.get_running_loop()
self.subgraphs = {
k: v async for k, v in self.graph.aget_subgraphs(recurse=True)
}
Expand All @@ -81,6 +82,7 @@ async def __aenter__(self) -> Self:
auto_offset_reset="earliest",
group_id="executor",
enable_auto_commit=False,
loop=loop,
**self.kwargs,
)
)
Expand All @@ -89,6 +91,7 @@ async def __aenter__(self) -> Self:

self.producer = await self.stack.enter_async_context(
DefaultAsyncProducer(
loop=loop,
**self.kwargs,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(
self.retry_policy = retry_policy

async def __aenter__(self) -> Self:
loop = asyncio.get_running_loop()
self.subgraphs = {
k: v async for k, v in self.graph.aget_subgraphs(recurse=True)
}
Expand All @@ -79,6 +80,7 @@ async def __aenter__(self) -> Self:
auto_offset_reset="earliest",
group_id="orchestrator",
enable_auto_commit=False,
loop=loop,
**self.kwargs,
)
)
Expand All @@ -87,6 +89,7 @@ async def __aenter__(self) -> Self:

self.producer = await self.stack.enter_async_context(
DefaultAsyncProducer(
loop=loop,
**self.kwargs,
)
)
Expand Down

0 comments on commit 4685dc1

Please sign in to comment.