From ffbb92d5144af68b7835872a7a893f7215acaf3e Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Sun, 1 Dec 2024 21:33:55 +0700 Subject: [PATCH 1/9] init --- faststream/confluent/broker/registrator.py | 8 ++- faststream/confluent/fastapi/fastapi.py | 17 ++++- faststream/confluent/subscriber/asyncapi.py | 8 +++ faststream/confluent/subscriber/factory.py | 66 ++++++++++++++------ faststream/confluent/subscriber/usecase.py | 69 ++++++++++++++++++--- 5 files changed, 137 insertions(+), 31 deletions(-) diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index 3d4301f7a4..ddece2a6a6 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -38,6 +38,7 @@ from faststream.confluent.schemas import TopicPartition from faststream.confluent.subscriber.asyncapi import ( AsyncAPIBatchSubscriber, + AsyncAPIConcurrentDefaultSubscriber, AsyncAPIDefaultSubscriber, ) @@ -1188,9 +1189,11 @@ def subscriber( bool, Doc("Whetever to include operation in AsyncAPI schema or not."), ] = True, + max_workers: int, ) -> Union[ "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", ]: if not auto_commit and not group_id: raise SetupError("You should install `group_id` with manual commit mode") @@ -1234,7 +1237,10 @@ def subscriber( if batch: subscriber = cast("AsyncAPIBatchSubscriber", subscriber) else: - subscriber = cast("AsyncAPIDefaultSubscriber", subscriber) + if max_workers > 1: + subscriber = cast("AsyncAPIConcurrentDefaultSubscriber", subscriber) + else: + subscriber = cast("AsyncAPIDefaultSubscriber", subscriber) subscriber = super().subscriber(subscriber) # type: ignore[arg-type,assignment] diff --git a/faststream/confluent/fastapi/fastapi.py b/faststream/confluent/fastapi/fastapi.py index 88e3b970c0..d6b85e92a9 100644 --- a/faststream/confluent/fastapi/fastapi.py +++ b/faststream/confluent/fastapi/fastapi.py @@ -55,6 +55,7 @@ from faststream.confluent.schemas import TopicPartition from faststream.confluent.subscriber.asyncapi import ( AsyncAPIBatchSubscriber, + AsyncAPIConcurrentDefaultSubscriber, AsyncAPIDefaultSubscriber, ) from faststream.security import BaseSecurity @@ -542,10 +543,15 @@ def __init__( """ ), ] = Default(generate_unique_id), + max_workers: Annotated[ + int, + Doc("Number of workers to process messages concurrently."), + ] = 1, ) -> None: super().__init__( bootstrap_servers=bootstrap_servers, client_id=client_id, + max_workers=max_workers, request_timeout_ms=request_timeout_ms, retry_backoff_ms=retry_backoff_ms, metadata_max_age_ms=metadata_max_age_ms, @@ -2205,13 +2211,19 @@ def subscriber( """ ), ] = False, + max_workers: Annotated[ + int, + Doc("Number of workers to process messages concurrently."), + ] = 1, ) -> Union[ "AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", ]: subscriber = super().subscriber( *topics, polling_interval=polling_interval, + max_workers=max_workers, partitions=partitions, group_id=group_id, group_instance_id=group_instance_id, @@ -2255,7 +2267,10 @@ def subscriber( if batch: return cast("AsyncAPIBatchSubscriber", subscriber) else: - return cast("AsyncAPIDefaultSubscriber", subscriber) + if max_workers > 1: + return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber) + else: + return cast("AsyncAPIDefaultSubscriber", subscriber) @overload # type: ignore[override] def publisher( diff --git a/faststream/confluent/subscriber/asyncapi.py b/faststream/confluent/subscriber/asyncapi.py index ee87c0e0a0..bb0d592f76 100644 --- a/faststream/confluent/subscriber/asyncapi.py +++ b/faststream/confluent/subscriber/asyncapi.py @@ -17,6 +17,7 @@ from faststream.broker.types import MsgType from faststream.confluent.subscriber.usecase import ( BatchSubscriber, + ConcurrentDefaultSubscriber, DefaultSubscriber, LogicSubscriber, ) @@ -72,3 +73,10 @@ class AsyncAPIBatchSubscriber( AsyncAPISubscriber[Tuple["ConfluentMsg", ...]], ): pass + + +class AsyncAPIConcurrentDefaultSubscriber( + ConcurrentDefaultSubscriber, + AsyncAPISubscriber["ConfluentMsg"], +): + pass diff --git a/faststream/confluent/subscriber/factory.py b/faststream/confluent/subscriber/factory.py index e2585d76b2..32d5ffcd66 100644 --- a/faststream/confluent/subscriber/factory.py +++ b/faststream/confluent/subscriber/factory.py @@ -12,8 +12,10 @@ from faststream.confluent.subscriber.asyncapi import ( AsyncAPIBatchSubscriber, + AsyncAPIConcurrentDefaultSubscriber, AsyncAPIDefaultSubscriber, ) +from faststream.exceptions import SetupError if TYPE_CHECKING: from confluent_kafka import Message as ConfluentMsg @@ -114,6 +116,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + max_workers: int, no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], @@ -129,6 +132,9 @@ def create_subscriber( "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", ]: + if is_manual and max_workers > 1: + raise SetupError("Max workers not work with manual commit mode.") + if batch: return AsyncAPIBatchSubscriber( *topics, @@ -151,22 +157,44 @@ def create_subscriber( include_in_schema=include_in_schema, ) else: - return AsyncAPIDefaultSubscriber( - *topics, - partitions=partitions, - polling_interval=polling_interval, - group_id=group_id, - connection_data=connection_data, - is_manual=is_manual, - no_ack=no_ack, - no_reply=no_reply, - retry=retry, - broker_dependencies=broker_dependencies, - broker_middlewares=cast( - Sequence["BrokerMiddleware[ConfluentMsg]"], - broker_middlewares, - ), - title_=title_, - description_=description_, - include_in_schema=include_in_schema, - ) + if max_workers > 1: + return AsyncAPIConcurrentDefaultSubscriber( + *topics, + max_workers=max_workers, + partitions=partitions, + polling_interval=polling_interval, + group_id=group_id, + connection_data=connection_data, + is_manual=is_manual, + no_ack=no_ack, + no_reply=no_reply, + retry=retry, + broker_dependencies=broker_dependencies, + broker_middlewares=cast( + Sequence["BrokerMiddleware[ConfluentMsg]"], + broker_middlewares, + ), + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + ) + else: + return AsyncAPIDefaultSubscriber( + *topics, + partitions=partitions, + polling_interval=polling_interval, + group_id=group_id, + connection_data=connection_data, + is_manual=is_manual, + no_ack=no_ack, + no_reply=no_reply, + retry=retry, + broker_dependencies=broker_dependencies, + broker_middlewares=cast( + Sequence["BrokerMiddleware[ConfluentMsg]"], + broker_middlewares, + ), + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + ) diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index 0045163938..d8ffd09d12 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -1,4 +1,3 @@ -import asyncio from abc import ABC, abstractmethod from typing import ( TYPE_CHECKING, @@ -17,6 +16,7 @@ from typing_extensions import override from faststream.broker.publisher.fake import FakePublisher +from faststream.broker.subscriber.mixins import ConcurrentMixin, TasksMixin from faststream.broker.subscriber.usecase import SubscriberUsecase from faststream.broker.types import MsgType from faststream.broker.utils import process_msg @@ -37,7 +37,7 @@ from faststream.types import AnyDict, Decorator, LoggerProto -class LogicSubscriber(ABC, SubscriberUsecase[MsgType]): +class LogicSubscriber(ABC, TasksMixin, SubscriberUsecase[MsgType]): """A class to handle logic for consuming messages from Kafka.""" topics: Sequence[str] @@ -46,7 +46,6 @@ class LogicSubscriber(ABC, SubscriberUsecase[MsgType]): builder: Optional[Callable[..., "AsyncConfluentConsumer"]] consumer: Optional["AsyncConfluentConsumer"] - task: Optional["asyncio.Task[None]"] client_id: Optional[str] def __init__( @@ -94,7 +93,6 @@ def __init__( self.is_manual = is_manual self.consumer = None - self.task = None self.polling_interval = polling_interval # Setup it later @@ -154,7 +152,7 @@ async def start(self) -> None: await super().start() if self.calls: - self.task = asyncio.create_task(self._consume()) + self.add_task(self._consume()) async def close(self) -> None: await super().close() @@ -163,11 +161,6 @@ async def close(self) -> None: await self.consumer.stop() self.consumer = None - if self.task is not None and not self.task.done(): - self.task.cancel() - - self.task = None - @override async def get_one( self, @@ -204,6 +197,9 @@ def _make_response_publisher( ), ) + async def consume_one(self, msg: MsgType) -> None: + await self.consume(msg) + @abstractmethod async def get_msg(self) -> Optional[MsgType]: raise NotImplementedError() @@ -408,3 +404,56 @@ def get_log_context( topic=topic, group_id=self.group_id, ) + + +class ConcurrentDefaultSubscriber(ConcurrentMixin, DefaultSubscriber): + def __init__( + self, + *topics: str, + # Kafka information + partitions: Sequence["TopicPartition"], + polling_interval: float, + group_id: Optional[str], + connection_data: "AnyDict", + is_manual: bool, + # Subscriber args + max_workers: int, + no_ack: bool, + no_reply: bool, + retry: bool, + broker_dependencies: Iterable["Depends"], + broker_middlewares: Sequence["BrokerMiddleware[Message]"], + # AsyncAPI args + title_: Optional[str], + description_: Optional[str], + include_in_schema: bool, + ) -> None: + super().__init__( + *topics, + partitions=partitions, + polling_interval=polling_interval, + group_id=group_id, + connection_data=connection_data, + is_manual=is_manual, + # subscriber args + default_parser=AsyncConfluentParser.parse_message, + default_decoder=AsyncConfluentParser.decode_message, + # Propagated args + no_ack=no_ack, + no_reply=no_reply, + retry=retry, + broker_middlewares=broker_middlewares, + broker_dependencies=broker_dependencies, + # AsyncAPI args + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + max_workers=max_workers, + ) + + async def start(self) -> None: + await super().start() + self.start_consume_task() + + async def consume_one(self, msg: "Message") -> None: + await self._put_msg(msg) From a9d848be3bc1ef2d251e0c6c2df69eab8eec7e94 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Sun, 1 Dec 2024 21:44:23 +0700 Subject: [PATCH 2/9] Feat: add tests --- tests/brokers/confluent/test_consume.py | 39 +++++++++++++++++++- tests/brokers/confluent/test_misconfigure.py | 11 ++++++ 2 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 tests/brokers/confluent/test_misconfigure.py diff --git a/tests/brokers/confluent/test_consume.py b/tests/brokers/confluent/test_consume.py index f3eb5774cd..7dff51fbcb 100644 --- a/tests/brokers/confluent/test_consume.py +++ b/tests/brokers/confluent/test_consume.py @@ -1,5 +1,5 @@ import asyncio -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -322,3 +322,40 @@ async def subscriber_with_auto_commit(m): assert event.is_set() assert event2.is_set() + + @pytest.mark.asyncio + @pytest.mark.slow + async def test_concurrent_consume(self, queue: str, mock: MagicMock): + event = asyncio.Event() + event2 = asyncio.Event() + + consume_broker = self.get_broker() + + args, kwargs = self.get_subscriber_params(queue, max_workers=2) + + @consume_broker.subscriber(*args, **kwargs) + async def handler(msg): + mock() + if event.is_set(): + event2.set() + else: + event.set() + await asyncio.sleep(0.1) + + async with self.patch_broker(consume_broker) as br: + await br.start() + + for i in range(5): + await br.publish(i, queue) + + await asyncio.wait( + ( + asyncio.create_task(event.wait()), + asyncio.create_task(event2.wait()), + ), + timeout=3, + ) + + assert event.is_set() + assert event2.is_set() + assert mock.call_count == 2, mock.call_count diff --git a/tests/brokers/confluent/test_misconfigure.py b/tests/brokers/confluent/test_misconfigure.py new file mode 100644 index 0000000000..bcc115fee0 --- /dev/null +++ b/tests/brokers/confluent/test_misconfigure.py @@ -0,0 +1,11 @@ +import pytest + +from faststream.confluent import KafkaBroker +from faststream.exceptions import SetupError + + +def test_max_workers_with_manual(queue: str) -> None: + broker = KafkaBroker() + + with pytest.raises(SetupError): + broker.subscriber(queue, max_workers=3, auto_commit=False) From 26f01125f036b18d13525356095f80812da68b5a Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Mon, 2 Dec 2024 22:49:15 +0700 Subject: [PATCH 3/9] Fix: typo, lint --- faststream/confluent/broker/registrator.py | 6 +++++- faststream/confluent/fastapi/fastapi.py | 5 ----- faststream/confluent/subscriber/usecase.py | 4 +--- tests/brokers/confluent/test_consume.py | 2 +- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index ddece2a6a6..1831364867 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -1189,7 +1189,10 @@ def subscriber( bool, Doc("Whetever to include operation in AsyncAPI schema or not."), ] = True, - max_workers: int, + max_workers: Annotated[ + int, + Doc("Number of workers to process messages concurrently."), + ] = 1, ) -> Union[ "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", @@ -1200,6 +1203,7 @@ def subscriber( subscriber = create_subscriber( *topics, + max_workers=max_workers, polling_interval=polling_interval, partitions=partitions, batch=batch, diff --git a/faststream/confluent/fastapi/fastapi.py b/faststream/confluent/fastapi/fastapi.py index d6b85e92a9..dd4178543e 100644 --- a/faststream/confluent/fastapi/fastapi.py +++ b/faststream/confluent/fastapi/fastapi.py @@ -543,15 +543,10 @@ def __init__( """ ), ] = Default(generate_unique_id), - max_workers: Annotated[ - int, - Doc("Number of workers to process messages concurrently."), - ] = 1, ) -> None: super().__init__( bootstrap_servers=bootstrap_servers, client_id=client_id, - max_workers=max_workers, request_timeout_ms=request_timeout_ms, retry_backoff_ms=retry_backoff_ms, metadata_max_age_ms=metadata_max_age_ms, diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index d8ffd09d12..1ef4a40504 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -436,8 +436,7 @@ def __init__( connection_data=connection_data, is_manual=is_manual, # subscriber args - default_parser=AsyncConfluentParser.parse_message, - default_decoder=AsyncConfluentParser.decode_message, + max_workers=max_workers, # Propagated args no_ack=no_ack, no_reply=no_reply, @@ -448,7 +447,6 @@ def __init__( title_=title_, description_=description_, include_in_schema=include_in_schema, - max_workers=max_workers, ) async def start(self) -> None: diff --git a/tests/brokers/confluent/test_consume.py b/tests/brokers/confluent/test_consume.py index 7dff51fbcb..49a20a867d 100644 --- a/tests/brokers/confluent/test_consume.py +++ b/tests/brokers/confluent/test_consume.py @@ -331,7 +331,7 @@ async def test_concurrent_consume(self, queue: str, mock: MagicMock): consume_broker = self.get_broker() - args, kwargs = self.get_subscriber_params(queue, max_workers=2) + args, kwargs = self.get_subscriber_params(queue, group_id="test", max_workers=2) @consume_broker.subscriber(*args, **kwargs) async def handler(msg): From 794f39b306cfbac4f149d51bbe13d8d21c3e61d5 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Tue, 3 Dec 2024 23:40:03 +0700 Subject: [PATCH 4/9] fix --- tests/brokers/confluent/test_consume.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/brokers/confluent/test_consume.py b/tests/brokers/confluent/test_consume.py index 49a20a867d..7dff51fbcb 100644 --- a/tests/brokers/confluent/test_consume.py +++ b/tests/brokers/confluent/test_consume.py @@ -331,7 +331,7 @@ async def test_concurrent_consume(self, queue: str, mock: MagicMock): consume_broker = self.get_broker() - args, kwargs = self.get_subscriber_params(queue, group_id="test", max_workers=2) + args, kwargs = self.get_subscriber_params(queue, max_workers=2) @consume_broker.subscriber(*args, **kwargs) async def handler(msg): From 9788188485dbb7abf6df6b4f3f2ea79de74c194e Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Wed, 4 Dec 2024 00:06:34 +0700 Subject: [PATCH 5/9] Fix: add consume one & max_workers to router --- faststream/confluent/router.py | 5 +++++ faststream/confluent/subscriber/usecase.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/faststream/confluent/router.py b/faststream/confluent/router.py index cdb7a6772a..14dcb9b943 100644 --- a/faststream/confluent/router.py +++ b/faststream/confluent/router.py @@ -422,11 +422,16 @@ def __init__( bool, Doc("Whetever to include operation in AsyncAPI schema or not."), ] = True, + max_workers: Annotated[ + int, + Doc("Number of workers to process messages concurrently."), + ] = 1, ) -> None: super().__init__( call, *topics, publishers=publishers, + max_workers=max_workers, partitions=partitions, polling_interval=polling_interval, group_id=group_id, diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index 1ef4a40504..5b6a3fffa9 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -221,7 +221,7 @@ async def _consume(self) -> None: connected = True if msg is not None: - await self.consume(msg) + await self.consume_one(msg) @property def topic_names(self) -> List[str]: From 55c9c90620d9f94cd1dd99ee8ee6f91547b9555d Mon Sep 17 00:00:00 2001 From: Flosckow Date: Tue, 3 Dec 2024 17:21:42 +0000 Subject: [PATCH 6/9] docs: generate API References --- docs/docs/SUMMARY.md | 2 ++ .../asyncapi/AsyncAPIConcurrentDefaultSubscriber.md | 11 +++++++++++ .../subscriber/usecase/ConcurrentDefaultSubscriber.md | 11 +++++++++++ 3 files changed, 24 insertions(+) create mode 100644 docs/docs/en/api/faststream/confluent/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md create mode 100644 docs/docs/en/api/faststream/confluent/subscriber/usecase/ConcurrentDefaultSubscriber.md diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index dd3f9c90ca..58d0f9c3f7 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -567,12 +567,14 @@ search: - subscriber - asyncapi - [AsyncAPIBatchSubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPIBatchSubscriber.md) + - [AsyncAPIConcurrentDefaultSubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md) - [AsyncAPIDefaultSubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPIDefaultSubscriber.md) - [AsyncAPISubscriber](api/faststream/confluent/subscriber/asyncapi/AsyncAPISubscriber.md) - factory - [create_subscriber](api/faststream/confluent/subscriber/factory/create_subscriber.md) - usecase - [BatchSubscriber](api/faststream/confluent/subscriber/usecase/BatchSubscriber.md) + - [ConcurrentDefaultSubscriber](api/faststream/confluent/subscriber/usecase/ConcurrentDefaultSubscriber.md) - [DefaultSubscriber](api/faststream/confluent/subscriber/usecase/DefaultSubscriber.md) - [LogicSubscriber](api/faststream/confluent/subscriber/usecase/LogicSubscriber.md) - testing diff --git a/docs/docs/en/api/faststream/confluent/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md b/docs/docs/en/api/faststream/confluent/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md new file mode 100644 index 0000000000..372b29b571 --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.subscriber.asyncapi.AsyncAPIConcurrentDefaultSubscriber diff --git a/docs/docs/en/api/faststream/confluent/subscriber/usecase/ConcurrentDefaultSubscriber.md b/docs/docs/en/api/faststream/confluent/subscriber/usecase/ConcurrentDefaultSubscriber.md new file mode 100644 index 0000000000..13d0f308c1 --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/subscriber/usecase/ConcurrentDefaultSubscriber.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.subscriber.usecase.ConcurrentDefaultSubscriber From 3fb95a3b734b2deb2e156a7d640f26be3c3f68a8 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Wed, 4 Dec 2024 22:59:12 +0700 Subject: [PATCH 7/9] Fix: typing --- faststream/confluent/broker/registrator.py | 2 +- faststream/confluent/subscriber/factory.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index 1831364867..6d74078476 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -54,7 +54,7 @@ class KafkaRegistrator( """Includable to KafkaBroker router.""" _subscribers: Dict[ # type: ignore[assignment] - int, Union["AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber"] + int, Union["AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber", "AsyncAPIConcurrentDefaultSubscriber"] ] _publishers: Dict[ # type: ignore[assignment] int, Union["AsyncAPIBatchPublisher", "AsyncAPIDefaultPublisher"] diff --git a/faststream/confluent/subscriber/factory.py b/faststream/confluent/subscriber/factory.py index 32d5ffcd66..a0b0f3b239 100644 --- a/faststream/confluent/subscriber/factory.py +++ b/faststream/confluent/subscriber/factory.py @@ -39,6 +39,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + max_workers: int, no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], @@ -63,6 +64,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + max_workers: int, no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], @@ -87,6 +89,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + max_workers: int, no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], @@ -131,6 +134,7 @@ def create_subscriber( ) -> Union[ "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", + "AsyncAPIConcurrentDefaultSubscriber" ]: if is_manual and max_workers > 1: raise SetupError("Max workers not work with manual commit mode.") From b3de0bd4ae8f60ed1b13597f4951e93baa13b9cd Mon Sep 17 00:00:00 2001 From: Flosckow Date: Wed, 4 Dec 2024 16:08:48 +0000 Subject: [PATCH 8/9] docs: generate API References --- faststream/confluent/broker/registrator.py | 7 ++++++- faststream/confluent/subscriber/factory.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index 6d74078476..62f3ddc70d 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -54,7 +54,12 @@ class KafkaRegistrator( """Includable to KafkaBroker router.""" _subscribers: Dict[ # type: ignore[assignment] - int, Union["AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber", "AsyncAPIConcurrentDefaultSubscriber"] + int, + Union[ + "AsyncAPIBatchSubscriber", + "AsyncAPIDefaultSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", + ], ] _publishers: Dict[ # type: ignore[assignment] int, Union["AsyncAPIBatchPublisher", "AsyncAPIDefaultPublisher"] diff --git a/faststream/confluent/subscriber/factory.py b/faststream/confluent/subscriber/factory.py index a0b0f3b239..99a1c53d2c 100644 --- a/faststream/confluent/subscriber/factory.py +++ b/faststream/confluent/subscriber/factory.py @@ -134,7 +134,7 @@ def create_subscriber( ) -> Union[ "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", - "AsyncAPIConcurrentDefaultSubscriber" + "AsyncAPIConcurrentDefaultSubscriber", ]: if is_manual and max_workers > 1: raise SetupError("Max workers not work with manual commit mode.") From eb26ef145ac1d2eea7d477cd804840ac5719dce5 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Wed, 4 Dec 2024 19:23:34 +0300 Subject: [PATCH 9/9] lint: fix mypy --- faststream/broker/subscriber/mixins.py | 14 ++++++++------ faststream/confluent/broker/registrator.py | 4 ++-- faststream/confluent/subscriber/factory.py | 6 +++++- faststream/confluent/subscriber/usecase.py | 6 +++--- faststream/kafka/subscriber/factory.py | 6 +++++- faststream/kafka/subscriber/usecase.py | 6 +++--- faststream/nats/subscriber/usecase.py | 13 +++++-------- 7 files changed, 31 insertions(+), 24 deletions(-) diff --git a/faststream/broker/subscriber/mixins.py b/faststream/broker/subscriber/mixins.py index f1e2274a3b..2043d8b1ae 100644 --- a/faststream/broker/subscriber/mixins.py +++ b/faststream/broker/subscriber/mixins.py @@ -3,16 +3,18 @@ TYPE_CHECKING, Any, Coroutine, + Generic, List, ) import anyio +from faststream.broker.types import MsgType + from .usecase import SubscriberUsecase if TYPE_CHECKING: from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream - from nats.aio.msg import Msg class TasksMixin(SubscriberUsecase[Any]): @@ -34,9 +36,9 @@ async def close(self) -> None: self.tasks = [] -class ConcurrentMixin(TasksMixin): - send_stream: "MemoryObjectSendStream[Msg]" - receive_stream: "MemoryObjectReceiveStream[Msg]" +class ConcurrentMixin(TasksMixin, Generic[MsgType]): + send_stream: "MemoryObjectSendStream[MsgType]" + receive_stream: "MemoryObjectReceiveStream[MsgType]" def __init__( self, @@ -69,13 +71,13 @@ async def _serve_consume_queue( async def _consume_msg( self, - msg: "Msg", + msg: "MsgType", ) -> None: """Proxy method to call `self.consume` with semaphore block.""" async with self.limiter: await self.consume(msg) - async def _put_msg(self, msg: "Msg") -> None: + async def _put_msg(self, msg: "MsgType") -> None: """Proxy method to put msg into in-memory queue with semaphore block.""" async with self.limiter: await self.send_stream.send(msg) diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index 62f3ddc70d..55edd17846 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -53,7 +53,7 @@ class KafkaRegistrator( ): """Includable to KafkaBroker router.""" - _subscribers: Dict[ # type: ignore[assignment] + _subscribers: Dict[ int, Union[ "AsyncAPIBatchSubscriber", @@ -1251,7 +1251,7 @@ def subscriber( else: subscriber = cast("AsyncAPIDefaultSubscriber", subscriber) - subscriber = super().subscriber(subscriber) # type: ignore[arg-type,assignment] + subscriber = super().subscriber(subscriber) # type: ignore[assignment] return subscriber.add_call( filter_=filter, diff --git a/faststream/confluent/subscriber/factory.py b/faststream/confluent/subscriber/factory.py index 99a1c53d2c..ae6b907c28 100644 --- a/faststream/confluent/subscriber/factory.py +++ b/faststream/confluent/subscriber/factory.py @@ -73,7 +73,10 @@ def create_subscriber( title_: Optional[str], description_: Optional[str], include_in_schema: bool, -) -> "AsyncAPIDefaultSubscriber": ... +) -> Union[ + "AsyncAPIDefaultSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", +]: ... @overload @@ -104,6 +107,7 @@ def create_subscriber( ) -> Union[ "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", ]: ... diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index 5b6a3fffa9..b435f35433 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import ( TYPE_CHECKING, Any, @@ -37,7 +37,7 @@ from faststream.types import AnyDict, Decorator, LoggerProto -class LogicSubscriber(ABC, TasksMixin, SubscriberUsecase[MsgType]): +class LogicSubscriber(TasksMixin, SubscriberUsecase[MsgType]): """A class to handle logic for consuming messages from Kafka.""" topics: Sequence[str] @@ -406,7 +406,7 @@ def get_log_context( ) -class ConcurrentDefaultSubscriber(ConcurrentMixin, DefaultSubscriber): +class ConcurrentDefaultSubscriber(ConcurrentMixin[Message], DefaultSubscriber): def __init__( self, *topics: str, diff --git a/faststream/kafka/subscriber/factory.py b/faststream/kafka/subscriber/factory.py index 74c3f78dbe..cdc2b35a7d 100644 --- a/faststream/kafka/subscriber/factory.py +++ b/faststream/kafka/subscriber/factory.py @@ -76,7 +76,10 @@ def create_subscriber( title_: Optional[str], description_: Optional[str], include_in_schema: bool, -) -> "AsyncAPIDefaultSubscriber": ... +) -> Union[ + "AsyncAPIDefaultSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", +]: ... @overload @@ -108,6 +111,7 @@ def create_subscriber( ) -> Union[ "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", ]: ... diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index ea9c3e7b0f..9b682f21ae 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -13,7 +13,7 @@ ) import anyio -from aiokafka import TopicPartition +from aiokafka import ConsumerRecord, TopicPartition from aiokafka.errors import ConsumerStoppedError, KafkaError from typing_extensions import override @@ -32,7 +32,7 @@ from faststream.utils.path import compile_path if TYPE_CHECKING: - from aiokafka import AIOKafkaConsumer, ConsumerRecord + from aiokafka import AIOKafkaConsumer from aiokafka.abc import ConsumerRebalanceListener from fast_depends.dependencies import Depends @@ -468,7 +468,7 @@ def get_log_context( ) -class ConcurrentDefaultSubscriber(ConcurrentMixin, DefaultSubscriber): +class ConcurrentDefaultSubscriber(ConcurrentMixin[ConsumerRecord], DefaultSubscriber): def __init__( self, *topics: str, diff --git a/faststream/nats/subscriber/usecase.py b/faststream/nats/subscriber/usecase.py index 8b960a5a74..af90d57d16 100644 --- a/faststream/nats/subscriber/usecase.py +++ b/faststream/nats/subscriber/usecase.py @@ -18,6 +18,7 @@ import anyio from fast_depends.dependencies import Depends +from nats.aio.msg import Msg from nats.errors import ConnectionClosedError, TimeoutError from nats.js.api import ConsumerConfig, ObjectInfo from typing_extensions import Annotated, Doc, override @@ -45,7 +46,6 @@ if TYPE_CHECKING: from nats.aio.client import Client - from nats.aio.msg import Msg from nats.aio.subscription import Subscription from nats.js import JetStreamContext from nats.js.kv import KeyValue @@ -431,7 +431,7 @@ def get_log_context( class ConcurrentCoreSubscriber( - ConcurrentMixin, + ConcurrentMixin[Msg], CoreSubscriber, ): def __init__( @@ -624,7 +624,7 @@ async def _create_subscription( class ConcurrentPushStreamSubscriber( - ConcurrentMixin, + ConcurrentMixin[Msg], _StreamSubscriber, ): subscription: Optional["JetStreamContext.PushSubscription"] @@ -691,10 +691,7 @@ async def _create_subscription( ) -class PullStreamSubscriber( - TasksMixin, - _StreamSubscriber, -): +class PullStreamSubscriber(TasksMixin, _StreamSubscriber): subscription: Optional["JetStreamContext.PullSubscription"] def __init__( @@ -777,7 +774,7 @@ async def _consume_pull( class ConcurrentPullStreamSubscriber( - ConcurrentMixin, + ConcurrentMixin[Msg], PullStreamSubscriber, ): def __init__(