From a3dd368476c8f52e4c439a184e8067c4e9073955 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Thu, 2 May 2024 21:43:20 +0300 Subject: [PATCH] refactor: correct security with kwarg params merging --- faststream/confluent/broker/broker.py | 7 +++++-- faststream/kafka/broker/broker.py | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index a462bb2257..316bb6751a 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -439,15 +439,18 @@ async def _connect( # type: ignore[override] **kwargs: Any, ) -> ConsumerConnectionParams: security_params = parse_security(self.security) + kwargs.update(security_params) + producer = AsyncConfluentProducer( **kwargs, - **security_params, client_id=client_id, ) + self._producer = AsyncConfluentFastProducer( producer=producer, ) - return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params}) + + return filter_by_dict(ConsumerConnectionParams, **kwargs) async def start(self) -> None: await super().start() diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index 16df9c7c8c..7ab3bc39c2 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -581,16 +581,19 @@ async def _connect( # type: ignore[override] **kwargs: Any, ) -> ConsumerConnectionParams: security_params = parse_security(self.security) + kwargs.update(security_params) + producer = aiokafka.AIOKafkaProducer( **kwargs, - **security_params, client_id=client_id, ) + await producer.start() self._producer = AioKafkaFastProducer( producer=producer, ) - return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params}) + + return filter_by_dict(ConsumerConnectionParams, **kwargs) async def start(self) -> None: """Connect broker to Kafka and startup all subscribers."""