Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent confluent kafka #1961

Merged
merged 10 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,14 @@ search:
- subscriber
- asyncapi
- [AsyncAPIBatchSubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPIBatchSubscriber.md)
- [AsyncAPIConcurrentDefaultSubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md)
- [AsyncAPIDefaultSubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPIDefaultSubscriber.md)
- [AsyncAPISubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPISubscriber.md)
- factory
- [create_subscriber](api/faststream/confluent/subscriber/factory/create_subscriber.md)
- usecase
- [BatchSubscriber](api/faststream/confluent/subscriber/usecase/BatchSubscriber.md)
- [ConcurrentDefaultSubscriber](api/faststream/confluent/subscriber/usecase/ConcurrentDefaultSubscriber.md)
- [DefaultSubscriber](api/faststream/confluent/subscriber/usecase/DefaultSubscriber.md)
- [LogicSubscriber](api/faststream/confluent/subscriber/usecase/LogicSubscriber.md)
- testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.subscriber.asyncapi.AsyncAPIConcurrentDefaultSubscriber
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.subscriber.usecase.ConcurrentDefaultSubscriber
14 changes: 8 additions & 6 deletions faststream/broker/subscriber/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
TYPE_CHECKING,
Any,
Coroutine,
Generic,
List,
)

import anyio

from faststream.broker.types import MsgType

from .usecase import SubscriberUsecase

if TYPE_CHECKING:
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from nats.aio.msg import Msg


class TasksMixin(SubscriberUsecase[Any]):
Expand All @@ -34,9 +36,9 @@ async def close(self) -> None:
self.tasks = []


class ConcurrentMixin(TasksMixin):
send_stream: "MemoryObjectSendStream[Msg]"
receive_stream: "MemoryObjectReceiveStream[Msg]"
class ConcurrentMixin(TasksMixin, Generic[MsgType]):
send_stream: "MemoryObjectSendStream[MsgType]"
receive_stream: "MemoryObjectReceiveStream[MsgType]"

def __init__(
self,
Expand Down Expand Up @@ -69,13 +71,13 @@ async def _serve_consume_queue(

async def _consume_msg(
self,
msg: "Msg",
msg: "MsgType",
) -> None:
"""Proxy method to call `self.consume` with semaphore block."""
async with self.limiter:
await self.consume(msg)

async def _put_msg(self, msg: "Msg") -> None:
async def _put_msg(self, msg: "MsgType") -> None:
"""Proxy method to put msg into in-memory queue with semaphore block."""
async with self.limiter:
await self.send_stream.send(msg)
23 changes: 19 additions & 4 deletions faststream/confluent/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from faststream.confluent.schemas import TopicPartition
from faststream.confluent.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIConcurrentDefaultSubscriber,
AsyncAPIDefaultSubscriber,
)

Expand All @@ -52,8 +53,13 @@ class KafkaRegistrator(
):
"""Includable to KafkaBroker router."""

_subscribers: Dict[ # type: ignore[assignment]
int, Union["AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber"]
_subscribers: Dict[
int,
Union[
"AsyncAPIBatchSubscriber",
"AsyncAPIDefaultSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
],
]
_publishers: Dict[ # type: ignore[assignment]
int, Union["AsyncAPIBatchPublisher", "AsyncAPIDefaultPublisher"]
Expand Down Expand Up @@ -1188,15 +1194,21 @@ def subscriber(
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
] = 1,
) -> Union[
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
]:
if not auto_commit and not group_id:
raise SetupError("You should install `group_id` with manual commit mode")

subscriber = create_subscriber(
*topics,
max_workers=max_workers,
polling_interval=polling_interval,
partitions=partitions,
batch=batch,
Expand Down Expand Up @@ -1234,9 +1246,12 @@ def subscriber(
if batch:
subscriber = cast("AsyncAPIBatchSubscriber", subscriber)
else:
subscriber = cast("AsyncAPIDefaultSubscriber", subscriber)
if max_workers > 1:
subscriber = cast("AsyncAPIConcurrentDefaultSubscriber", subscriber)
else:
subscriber = cast("AsyncAPIDefaultSubscriber", subscriber)

subscriber = super().subscriber(subscriber) # type: ignore[arg-type,assignment]
subscriber = super().subscriber(subscriber) # type: ignore[assignment]

return subscriber.add_call(
filter_=filter,
Expand Down
12 changes: 11 additions & 1 deletion faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from faststream.confluent.schemas import TopicPartition
from faststream.confluent.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIConcurrentDefaultSubscriber,
AsyncAPIDefaultSubscriber,
)
from faststream.security import BaseSecurity
Expand Down Expand Up @@ -2205,13 +2206,19 @@ def subscriber(
"""
),
] = False,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
] = 1,
) -> Union[
"AsyncAPIBatchSubscriber",
"AsyncAPIDefaultSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
]:
subscriber = super().subscriber(
*topics,
polling_interval=polling_interval,
max_workers=max_workers,
partitions=partitions,
group_id=group_id,
group_instance_id=group_instance_id,
Expand Down Expand Up @@ -2255,7 +2262,10 @@ def subscriber(
if batch:
return cast("AsyncAPIBatchSubscriber", subscriber)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber)
if max_workers > 1:
return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber)

@overload # type: ignore[override]
def publisher(
Expand Down
5 changes: 5 additions & 0 deletions faststream/confluent/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,16 @@ def __init__(
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
] = 1,
) -> None:
super().__init__(
call,
*topics,
publishers=publishers,
max_workers=max_workers,
partitions=partitions,
polling_interval=polling_interval,
group_id=group_id,
Expand Down
8 changes: 8 additions & 0 deletions faststream/confluent/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from faststream.broker.types import MsgType
from faststream.confluent.subscriber.usecase import (
BatchSubscriber,
ConcurrentDefaultSubscriber,
DefaultSubscriber,
LogicSubscriber,
)
Expand Down Expand Up @@ -72,3 +73,10 @@ class AsyncAPIBatchSubscriber(
AsyncAPISubscriber[Tuple["ConfluentMsg", ...]],
):
pass


class AsyncAPIConcurrentDefaultSubscriber(
ConcurrentDefaultSubscriber,
AsyncAPISubscriber["ConfluentMsg"],
):
pass
76 changes: 56 additions & 20 deletions faststream/confluent/subscriber/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

from faststream.confluent.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIConcurrentDefaultSubscriber,
AsyncAPIDefaultSubscriber,
)
from faststream.exceptions import SetupError

if TYPE_CHECKING:
from confluent_kafka import Message as ConfluentMsg
Expand All @@ -37,6 +39,7 @@ def create_subscriber(
is_manual: bool,
# Subscriber args
no_ack: bool,
max_workers: int,
no_reply: bool,
retry: bool,
broker_dependencies: Iterable["Depends"],
Expand All @@ -61,6 +64,7 @@ def create_subscriber(
is_manual: bool,
# Subscriber args
no_ack: bool,
max_workers: int,
no_reply: bool,
retry: bool,
broker_dependencies: Iterable["Depends"],
Expand All @@ -69,7 +73,10 @@ def create_subscriber(
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
) -> "AsyncAPIDefaultSubscriber": ...
) -> Union[
"AsyncAPIDefaultSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
]: ...


@overload
Expand All @@ -85,6 +92,7 @@ def create_subscriber(
is_manual: bool,
# Subscriber args
no_ack: bool,
max_workers: int,
no_reply: bool,
retry: bool,
broker_dependencies: Iterable["Depends"],
Expand All @@ -99,6 +107,7 @@ def create_subscriber(
) -> Union[
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
]: ...


Expand All @@ -114,6 +123,7 @@ def create_subscriber(
is_manual: bool,
# Subscriber args
no_ack: bool,
max_workers: int,
no_reply: bool,
retry: bool,
broker_dependencies: Iterable["Depends"],
Expand All @@ -128,7 +138,11 @@ def create_subscriber(
) -> Union[
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
]:
if is_manual and max_workers > 1:
raise SetupError("Max workers not work with manual commit mode.")

if batch:
return AsyncAPIBatchSubscriber(
*topics,
Expand All @@ -151,22 +165,44 @@ def create_subscriber(
include_in_schema=include_in_schema,
)
else:
return AsyncAPIDefaultSubscriber(
*topics,
partitions=partitions,
polling_interval=polling_interval,
group_id=group_id,
connection_data=connection_data,
is_manual=is_manual,
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=cast(
Sequence["BrokerMiddleware[ConfluentMsg]"],
broker_middlewares,
),
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
if max_workers > 1:
return AsyncAPIConcurrentDefaultSubscriber(
*topics,
max_workers=max_workers,
partitions=partitions,
polling_interval=polling_interval,
group_id=group_id,
connection_data=connection_data,
is_manual=is_manual,
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=cast(
Sequence["BrokerMiddleware[ConfluentMsg]"],
broker_middlewares,
),
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
return AsyncAPIDefaultSubscriber(
*topics,
partitions=partitions,
polling_interval=polling_interval,
group_id=group_id,
connection_data=connection_data,
is_manual=is_manual,
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=cast(
Sequence["BrokerMiddleware[ConfluentMsg]"],
broker_middlewares,
),
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
Loading
Loading