diff --git a/faststream/kafka/broker/registrator.py b/faststream/kafka/broker/registrator.py index 393581c898..d7febc98d2 100644 --- a/faststream/kafka/broker/registrator.py +++ b/faststream/kafka/broker/registrator.py @@ -58,7 +58,11 @@ class KafkaRegistrator( _subscribers: Dict[ int, - Union["AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber", "AsyncAPIConcurrentDefaultSubscriber"], + Union[ + "AsyncAPIBatchSubscriber", + "AsyncAPIDefaultSubscriber", + "AsyncAPIConcurrentDefaultSubscriber", + ], ] _publishers: Dict[ int, @@ -1662,7 +1666,7 @@ def subscriber( decoder_=decoder or self._decoder, dependencies_=dependencies, middlewares_=middlewares, - max_workers=max_workers + max_workers=max_workers, ) else: return cast("AsyncAPIDefaultSubscriber", subscriber).add_call( diff --git a/faststream/kafka/fastapi/fastapi.py b/faststream/kafka/fastapi/fastapi.py index fa07ace3e2..5bad796902 100644 --- a/faststream/kafka/fastapi/fastapi.py +++ b/faststream/kafka/fastapi/fastapi.py @@ -2626,7 +2626,7 @@ def subscriber( ) -> Union[ "AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber", - "AsyncAPIConcurrentDefaultSubscriber" + "AsyncAPIConcurrentDefaultSubscriber", ]: subscriber = super().subscriber( *topics, diff --git a/faststream/kafka/subscriber/factory.py b/faststream/kafka/subscriber/factory.py index 22adc72428..9837bdf738 100644 --- a/faststream/kafka/subscriber/factory.py +++ b/faststream/kafka/subscriber/factory.py @@ -135,7 +135,7 @@ def create_subscriber( ) -> Union[ "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", - "AsyncAPIConcurrentDefaultSubscriber" + "AsyncAPIConcurrentDefaultSubscriber", ]: if is_manual and not group_id: raise SetupError("You must use `group_id` with manual commit mode.") diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index 9a29a15911..81c920c9c0 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -473,10 +473,8 @@ def get_log_context( group_id=self.group_id, ) -class ConcurrentDefaultSubscriber( - ConcurrentMixin, - DefaultSubscriber["ConsumerRecord"] -): + +class ConcurrentDefaultSubscriber(ConcurrentMixin, DefaultSubscriber["ConsumerRecord"]): def __init__( self, *topics: str,