diff --git a/faststream/confluent/router.py b/faststream/confluent/router.py index cdb7a6772a..14dcb9b943 100644 --- a/faststream/confluent/router.py +++ b/faststream/confluent/router.py @@ -422,11 +422,16 @@ def __init__( bool, Doc("Whetever to include operation in AsyncAPI schema or not."), ] = True, + max_workers: Annotated[ + int, + Doc("Number of workers to process messages concurrently."), + ] = 1, ) -> None: super().__init__( call, *topics, publishers=publishers, + max_workers=max_workers, partitions=partitions, polling_interval=polling_interval, group_id=group_id, diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index 1ef4a40504..5b6a3fffa9 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -221,7 +221,7 @@ async def _consume(self) -> None: connected = True if msg is not None: - await self.consume(msg) + await self.consume_one(msg) @property def topic_names(self) -> List[str]: