diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index a462bb2257..8f9de15b09 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -439,15 +439,18 @@ async def _connect( # type: ignore[override] **kwargs: Any, ) -> ConsumerConnectionParams: security_params = parse_security(self.security) + kwargs.update(security_params) + producer = AsyncConfluentProducer( **kwargs, - **security_params, client_id=client_id, ) + self._producer = AsyncConfluentFastProducer( producer=producer, ) - return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params}) + + return filter_by_dict(ConsumerConnectionParams, kwargs) async def start(self) -> None: await super().start() diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index 16df9c7c8c..59d6e733d6 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -581,16 +581,19 @@ async def _connect( # type: ignore[override] **kwargs: Any, ) -> ConsumerConnectionParams: security_params = parse_security(self.security) + kwargs.update(security_params) + producer = aiokafka.AIOKafkaProducer( **kwargs, - **security_params, client_id=client_id, ) + await producer.start() self._producer = AioKafkaFastProducer( producer=producer, ) - return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params}) + + return filter_by_dict(ConsumerConnectionParams, kwargs) async def start(self) -> None: """Connect broker to Kafka and startup all subscribers."""