Skip to content

Commit

Permalink
refactor: correct security with kwarg params merging
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed May 2, 2024
1 parent ed270ad commit a3dd368
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
7 changes: 5 additions & 2 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,18 @@ async def _connect( # type: ignore[override]
**kwargs: Any,
) -> ConsumerConnectionParams:
security_params = parse_security(self.security)
kwargs.update(security_params)

producer = AsyncConfluentProducer(
**kwargs,
**security_params,
client_id=client_id,
)

self._producer = AsyncConfluentFastProducer(
producer=producer,
)
return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params})

return filter_by_dict(ConsumerConnectionParams, **kwargs)

async def start(self) -> None:
await super().start()
Expand Down
7 changes: 5 additions & 2 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,16 +581,19 @@ async def _connect( # type: ignore[override]
**kwargs: Any,
) -> ConsumerConnectionParams:
security_params = parse_security(self.security)
kwargs.update(security_params)

producer = aiokafka.AIOKafkaProducer(
**kwargs,
**security_params,
client_id=client_id,
)

await producer.start()
self._producer = AioKafkaFastProducer(
producer=producer,
)
return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params})

return filter_by_dict(ConsumerConnectionParams, **kwargs)

async def start(self) -> None:
"""Connect broker to Kafka and startup all subscribers."""
Expand Down

0 comments on commit a3dd368

Please sign in to comment.