Skip to content

Commit

Permalink
Feat: Add kafka concurrent subscriber (#1912)
Browse files Browse the repository at this point in the history
* Feat: stage 1 add typing, and mock class for concurrent subscriber

* Fix: lint

* Feat: stage 2 add concurrent consume

* Fix: lint

* Feat: change consume to put

* Fix: topics, typo

* Feat: add tests

* docs: generate API References

* chore: polish PR

* chore: update python version in precommit

---------

Co-authored-by: Daniil Dumchenko <[email protected]>
Co-authored-by: Flosckow <[email protected]>
Co-authored-by: Nikita Pastukhov <[email protected]>
  • Loading branch information
4 people authored Nov 22, 2024
1 parent a1f7ebd commit 9bc7a05
Show file tree
Hide file tree
Showing 24 changed files with 342 additions and 157 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: |
3.8
3.9
3.10
python-version: "3.12"
- name: Set $PY environment variable
run: echo "PY=$(python -VV | sha256sum | cut -d' ' -f1)" >> $GITHUB_ENV
- uses: actions/cache@v4
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,14 @@ search:
- subscriber
- asyncapi
- [AsyncAPIBatchSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIBatchSubscriber.md)
- [AsyncAPIConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md)
- [AsyncAPIDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIDefaultSubscriber.md)
- [AsyncAPISubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPISubscriber.md)
- factory
- [create_subscriber](api/faststream/kafka/subscriber/factory/create_subscriber.md)
- usecase
- [BatchSubscriber](api/faststream/kafka/subscriber/usecase/BatchSubscriber.md)
- [ConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentDefaultSubscriber.md)
- [DefaultSubscriber](api/faststream/kafka/subscriber/usecase/DefaultSubscriber.md)
- [LogicSubscriber](api/faststream/kafka/subscriber/usecase/LogicSubscriber.md)
- testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.asyncapi.AsyncAPIConcurrentDefaultSubscriber
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.usecase.ConcurrentDefaultSubscriber
4 changes: 2 additions & 2 deletions faststream/broker/fastapi/get_dependant.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
lambda x: isinstance(x, FieldInfo),
p.field_info.metadata or (),
),
Field(**field_data), # type: ignore[pydantic-field]
Field(**field_data),
)

else:
Expand All @@ -109,7 +109,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
"le": info.field_info.le,
}
)
f = Field(**field_data) # type: ignore[pydantic-field]
f = Field(**field_data)

params_unique[p.name] = (
info.annotation,
Expand Down
8 changes: 4 additions & 4 deletions faststream/broker/subscriber/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@


class TasksMixin(SubscriberUsecase[Any]):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.tasks: List[asyncio.Task[Any]] = []

def add_task(self, coro: Coroutine[Any, Any, Any]) -> None:
Expand All @@ -40,7 +40,7 @@ class ConcurrentMixin(TasksMixin):

def __init__(
self,
*,
*args: Any,
max_workers: int,
**kwargs: Any,
) -> None:
Expand All @@ -51,7 +51,7 @@ def __init__(
)
self.limiter = anyio.Semaphore(max_workers)

super().__init__(**kwargs)
super().__init__(*args, **kwargs)

def start_consume_task(self) -> None:
self.add_task(self._serve_consume_queue())
Expand Down
20 changes: 1 addition & 19 deletions faststream/broker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)

import anyio
from typing_extensions import Literal, Self, overload
from typing_extensions import Self

from faststream.broker.acknowledgement_watcher import WatcherContext, get_watcher
from faststream.broker.types import MsgType
Expand All @@ -35,24 +35,6 @@
from faststream.types import LoggerProto


@overload
async def process_msg(
msg: Literal[None],
middlewares: Iterable["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> None: ...


@overload
async def process_msg(
msg: MsgType,
middlewares: Iterable["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> "StreamMessage[MsgType]": ...


async def process_msg(
msg: Optional[MsgType],
middlewares: Iterable["BrokerMiddleware[MsgType]"],
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
Partition = TypeVar("Partition")


class KafkaBroker(
class KafkaBroker( # type: ignore[misc]
KafkaRegistrator,
KafkaLoggingBroker,
):
Expand Down
108 changes: 54 additions & 54 deletions faststream/confluent/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ class KafkaRegistrator(
):
"""Includable to KafkaBroker router."""

_subscribers: Dict[
_subscribers: Dict[ # type: ignore[assignment]
int, Union["AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber"]
]
_publishers: Dict[int, Union["AsyncAPIBatchPublisher", "AsyncAPIDefaultPublisher"]]
_publishers: Dict[ # type: ignore[assignment]
int, Union["AsyncAPIBatchPublisher", "AsyncAPIDefaultPublisher"]
]

@overload # type: ignore[override]
def subscriber(
Expand Down Expand Up @@ -1193,60 +1195,56 @@ def subscriber(
if not auto_commit and not group_id:
raise SetupError("You should install `group_id` with manual commit mode")

subscriber = super().subscriber(
create_subscriber(
*topics,
polling_interval=polling_interval,
partitions=partitions,
batch=batch,
max_records=max_records,
group_id=group_id,
connection_data={
"group_instance_id": group_instance_id,
"fetch_max_wait_ms": fetch_max_wait_ms,
"fetch_max_bytes": fetch_max_bytes,
"fetch_min_bytes": fetch_min_bytes,
"max_partition_fetch_bytes": max_partition_fetch_bytes,
"auto_offset_reset": auto_offset_reset,
"enable_auto_commit": auto_commit,
"auto_commit_interval_ms": auto_commit_interval_ms,
"check_crcs": check_crcs,
"partition_assignment_strategy": partition_assignment_strategy,
"max_poll_interval_ms": max_poll_interval_ms,
"session_timeout_ms": session_timeout_ms,
"heartbeat_interval_ms": heartbeat_interval_ms,
"isolation_level": isolation_level,
},
is_manual=not auto_commit,
# subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_middlewares=self._middlewares,
broker_dependencies=self._dependencies,
# AsyncAPI
title_=title,
description_=description,
include_in_schema=self._solve_include_in_schema(include_in_schema),
)
subscriber = create_subscriber(
*topics,
polling_interval=polling_interval,
partitions=partitions,
batch=batch,
max_records=max_records,
group_id=group_id,
connection_data={
"group_instance_id": group_instance_id,
"fetch_max_wait_ms": fetch_max_wait_ms,
"fetch_max_bytes": fetch_max_bytes,
"fetch_min_bytes": fetch_min_bytes,
"max_partition_fetch_bytes": max_partition_fetch_bytes,
"auto_offset_reset": auto_offset_reset,
"enable_auto_commit": auto_commit,
"auto_commit_interval_ms": auto_commit_interval_ms,
"check_crcs": check_crcs,
"partition_assignment_strategy": partition_assignment_strategy,
"max_poll_interval_ms": max_poll_interval_ms,
"session_timeout_ms": session_timeout_ms,
"heartbeat_interval_ms": heartbeat_interval_ms,
"isolation_level": isolation_level,
},
is_manual=not auto_commit,
# subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_middlewares=self._middlewares,
broker_dependencies=self._dependencies,
# AsyncAPI
title_=title,
description_=description,
include_in_schema=self._solve_include_in_schema(include_in_schema),
)

if batch:
return cast("AsyncAPIBatchSubscriber", subscriber).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
subscriber = cast("AsyncAPIBatchSubscriber", subscriber)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
subscriber = cast("AsyncAPIDefaultSubscriber", subscriber)

subscriber = super().subscriber(subscriber) # type: ignore[arg-type,assignment]

return subscriber.add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)

@overload # type: ignore[override]
def publisher(
Expand Down Expand Up @@ -1577,6 +1575,8 @@ def publisher(
)

if batch:
return cast("AsyncAPIBatchPublisher", super().publisher(publisher))
publisher = cast("AsyncAPIBatchPublisher", publisher)
else:
return cast("AsyncAPIDefaultPublisher", super().publisher(publisher))
publisher = cast("AsyncAPIDefaultPublisher", publisher)

return super().publisher(publisher) # type: ignore[return-value,arg-type]
6 changes: 3 additions & 3 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(
}
)

self.producer = Producer(final_config, logger=self.logger)
self.producer = Producer(final_config)

self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())
Expand Down Expand Up @@ -312,7 +312,7 @@ def __init__(
)

self.config = final_config
self.consumer = Consumer(final_config, logger=self.logger)
self.consumer = Consumer(final_config)

@property
def topics_to_create(self) -> List[str]:
Expand Down Expand Up @@ -381,7 +381,7 @@ async def getmany(
) -> Tuple[Message, ...]:
"""Consumes a batch of messages from Kafka and groups them by topic and partition."""
raw_messages: List[Optional[Message]] = await call_or_await(
self.consumer.consume,
self.consumer.consume, # type: ignore[arg-type]
num_messages=max_records or 10,
timeout=timeout,
)
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def __init__(
graceful_timeout=graceful_timeout,
decoder=decoder,
parser=parser,
middlewares=middlewares,
middlewares=middlewares, # type: ignore[arg-type]
schema_url=schema_url,
setup_state=setup_state,
# logger options
Expand Down
Loading

0 comments on commit 9bc7a05

Please sign in to comment.