Skip to content

Commit

Permalink
refactor: correct security with kwarg params merging (#1417)
Browse files Browse the repository at this point in the history
* refactor: correct security with kwarg params merging

* fix: correct filter_by_dict usage

---------

Co-authored-by: Kumaran Rajendhiran <[email protected]>
  • Loading branch information
Lancetnik and kumaranvpl authored May 3, 2024
1 parent 669647f commit 312dd55
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
7 changes: 5 additions & 2 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,18 @@ async def _connect( # type: ignore[override]
**kwargs: Any,
) -> ConsumerConnectionParams:
security_params = parse_security(self.security)
kwargs.update(security_params)

producer = AsyncConfluentProducer(
**kwargs,
**security_params,
client_id=client_id,
)

self._producer = AsyncConfluentFastProducer(
producer=producer,
)
return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params})

return filter_by_dict(ConsumerConnectionParams, kwargs)

async def start(self) -> None:
await super().start()
Expand Down
7 changes: 5 additions & 2 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,16 +581,19 @@ async def _connect( # type: ignore[override]
**kwargs: Any,
) -> ConsumerConnectionParams:
security_params = parse_security(self.security)
kwargs.update(security_params)

producer = aiokafka.AIOKafkaProducer(
**kwargs,
**security_params,
client_id=client_id,
)

await producer.start()
self._producer = AioKafkaFastProducer(
producer=producer,
)
return filter_by_dict(ConsumerConnectionParams, {**kwargs, **security_params})

return filter_by_dict(ConsumerConnectionParams, kwargs)

async def start(self) -> None:
"""Connect broker to Kafka and startup all subscribers."""
Expand Down

0 comments on commit 312dd55

Please sign in to comment.