diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 9063516d5e..479c5e49af 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -134,6 +134,7 @@ search: - [KafkaRouter](public_api/faststream/kafka/KafkaRouter.md) - [TestApp](public_api/faststream/kafka/TestApp.md) - [TestKafkaBroker](public_api/faststream/kafka/TestKafkaBroker.md) + - [TopicPartition](public_api/faststream/kafka/TopicPartition.md) - nats - [AckPolicy](public_api/faststream/nats/AckPolicy.md) - [ConsumerConfig](public_api/faststream/nats/ConsumerConfig.md) @@ -504,6 +505,7 @@ search: - [KafkaRouter](api/faststream/kafka/KafkaRouter.md) - [TestApp](api/faststream/kafka/TestApp.md) - [TestKafkaBroker](api/faststream/kafka/TestKafkaBroker.md) + - [TopicPartition](api/faststream/kafka/TopicPartition.md) - broker - [KafkaBroker](api/faststream/kafka/broker/KafkaBroker.md) - broker diff --git a/docs/docs/en/api/faststream/kafka/TopicPartition.md b/docs/docs/en/api/faststream/kafka/TopicPartition.md new file mode 100644 index 0000000000..41fbd7f624 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/TopicPartition.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: aiokafka.structs.TopicPartition diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index 10da34ba0f..8c97b8dc44 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -20,7 +20,7 @@ hide: * fix (#1415): raise SetupError if rpc and reply_to are using in TestCL… by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1419](https://github.com/airtai/faststream/pull/1419){.external-link target="_blank"} * Chore/update deps2 by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1418](https://github.com/airtai/faststream/pull/1418){.external-link target="_blank"} * refactor: correct security with kwarg params merging by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1417](https://github.com/airtai/faststream/pull/1417){.external-link target="_blank"} -* fix (#1414): correct Messag.ack error processing by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1420](https://github.com/airtai/faststream/pull/1420){.external-link target="_blank"} +* fix (#1414): correct Message.ack error processing by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1420](https://github.com/airtai/faststream/pull/1420){.external-link target="_blank"} **Full Changelog**: [#0.5.3...0.5.4](https://github.com/airtai/faststream/compare/0.5.3...0.5.4){.external-link target="_blank"} diff --git a/faststream/broker/acknowledgement_watcher.py b/faststream/broker/acknowledgement_watcher.py index 4f75b1d66e..dabc6eb87f 100644 --- a/faststream/broker/acknowledgement_watcher.py +++ b/faststream/broker/acknowledgement_watcher.py @@ -178,11 +178,7 @@ async def __ack(self) -> None: await self.message.ack(**self.extra_options) except Exception as er: if self.logger is not None: - self.logger.log( - logging.ERROR, - er, - exc_info=er - ) + self.logger.log(logging.ERROR, er, exc_info=er) else: self.watcher.remove(self.message.message_id) @@ -191,22 +187,14 @@ async def __nack(self) -> None: await self.message.nack(**self.extra_options) except Exception as er: if self.logger is not None: - self.logger.log( - logging.ERROR, - er, - exc_info=er - ) + self.logger.log(logging.ERROR, er, exc_info=er) async def __reject(self) -> None: try: await self.message.reject(**self.extra_options) except Exception as er: if self.logger is not None: - self.logger.log( - logging.ERROR, - er, - exc_info=er - ) + self.logger.log(logging.ERROR, er, exc_info=er) else: self.watcher.remove(self.message.message_id) diff --git a/faststream/kafka/__init__.py b/faststream/kafka/__init__.py index eb83bd8b01..c81b617033 100644 --- a/faststream/kafka/__init__.py +++ b/faststream/kafka/__init__.py @@ -1,3 +1,5 @@ +from aiokafka import TopicPartition + from faststream.kafka.annotations import KafkaMessage from faststream.kafka.broker import KafkaBroker from faststream.kafka.router import KafkaPublisher, KafkaRoute, KafkaRouter @@ -12,4 +14,5 @@ "KafkaPublisher", "TestKafkaBroker", "TestApp", + "TopicPartition", ) diff --git a/faststream/kafka/broker/registrator.py b/faststream/kafka/broker/registrator.py index bed606870a..899e5828d5 100644 --- a/faststream/kafka/broker/registrator.py +++ b/faststream/kafka/broker/registrator.py @@ -25,7 +25,7 @@ from faststream.kafka.subscriber.asyncapi import AsyncAPISubscriber if TYPE_CHECKING: - from aiokafka import ConsumerRecord + from aiokafka import ConsumerRecord, TopicPartition from aiokafka.abc import ConsumerRebalanceListener from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor from fast_depends.dependencies import Depends @@ -336,6 +336,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["Depends"], @@ -660,6 +667,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["Depends"], @@ -984,6 +998,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["Depends"], @@ -1311,6 +1332,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["Depends"], @@ -1402,6 +1430,7 @@ def subscriber( group_id=group_id, listener=listener, pattern=pattern, + partitions=partitions, builder=builder, is_manual=not auto_commit, # subscriber args diff --git a/faststream/kafka/fastapi/fastapi.py b/faststream/kafka/fastapi/fastapi.py index ce988aa329..541940d79e 100644 --- a/faststream/kafka/fastapi/fastapi.py +++ b/faststream/kafka/fastapi/fastapi.py @@ -38,6 +38,7 @@ from asyncio import AbstractEventLoop from enum import Enum + from aiokafka import TopicPartition from aiokafka.abc import AbstractTokenProvider, ConsumerRebalanceListener from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor from fastapi import params @@ -919,6 +920,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["params.Depends"], @@ -1401,6 +1409,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["params.Depends"], @@ -1883,6 +1898,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["params.Depends"], @@ -2368,6 +2390,13 @@ def subscriber( Pattern to match available topics. You must provide either topics or pattern, but not both. """), ] = None, + partitions: Annotated[ + Iterable["TopicPartition"], + Doc(""" + An explicit partitions list to assign. + You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["params.Depends"], @@ -2575,6 +2604,7 @@ def subscriber( batch_timeout_ms=batch_timeout_ms, listener=listener, pattern=pattern, + partitions=partitions, # broker args dependencies=dependencies, parser=parser, diff --git a/faststream/kafka/router.py b/faststream/kafka/router.py index 7bcbd1a48d..98383512a5 100644 --- a/faststream/kafka/router.py +++ b/faststream/kafka/router.py @@ -19,7 +19,8 @@ from faststream.kafka.broker.registrator import KafkaRegistrator if TYPE_CHECKING: - from aiokafka import ConsumerRecord + from aiokafka import ConsumerRecord, TopicPartition + from aiokafka.abc import ConsumerRebalanceListener from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor from fast_depends.dependencies import Depends @@ -376,6 +377,44 @@ def __init__( Optional[int], Doc("Number of messages to consume as one batch."), ] = None, + listener: Annotated[ + Optional["ConsumerRebalanceListener"], + Doc(""" + Optionally include listener + callback, which will be called before and after each rebalance + operation. + As part of group management, the consumer will keep track of + the list of consumers that belong to a particular group and + will trigger a rebalance operation if one of the following + events trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's + assignment has been revoked, and then again when the new + assignment has been received. Note that this listener will + immediately override any listener set in a previous call + to subscribe. It is guaranteed, however, that the partitions + revoked/assigned + through this interface are from topics subscribed in this call. + """), + ] = None, + pattern: Annotated[ + Optional[str], + Doc(""" + Pattern to match available topics. You must provide either topics or pattern, but not both. + """), + ] = None, + partitions: Annotated[ + Optional[Iterable["TopicPartition"]], + Doc(""" + A topic and partition tuple. You can't use 'topics' and 'partitions' in the same time. + """), + ] = (), # broker args dependencies: Annotated[ Iterable["Depends"], @@ -456,6 +495,9 @@ def __init__( max_records=max_records, batch_timeout_ms=batch_timeout_ms, batch=batch, + listener=listener, + pattern=pattern, + partitions=partitions, # basic args dependencies=dependencies, parser=parser, diff --git a/faststream/kafka/subscriber/asyncapi.py b/faststream/kafka/subscriber/asyncapi.py index 4453690cc1..f2897d3fdf 100644 --- a/faststream/kafka/subscriber/asyncapi.py +++ b/faststream/kafka/subscriber/asyncapi.py @@ -22,6 +22,7 @@ from faststream.asyncapi.schema.bindings import kafka from faststream.asyncapi.utils import resolve_payloads from faststream.broker.types import MsgType +from faststream.exceptions import SetupError from faststream.kafka.subscriber.usecase import ( BatchSubscriber, DefaultSubscriber, @@ -29,7 +30,7 @@ ) if TYPE_CHECKING: - from aiokafka import AIOKafkaConsumer, ConsumerRecord + from aiokafka import AIOKafkaConsumer, ConsumerRecord, TopicPartition from aiokafka.abc import ConsumerRebalanceListener from fast_depends.dependencies import Depends @@ -79,6 +80,7 @@ def create( group_id: Optional[str], listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], + partitions: Iterable["TopicPartition"], builder: Callable[..., "AIOKafkaConsumer"], is_manual: bool, # Subscriber args @@ -103,6 +105,7 @@ def create( group_id: Optional[str], listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], + partitions: Iterable["TopicPartition"], builder: Callable[..., "AIOKafkaConsumer"], is_manual: bool, # Subscriber args @@ -127,6 +130,7 @@ def create( group_id: Optional[str], listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], + partitions: Iterable["TopicPartition"], builder: Callable[..., "AIOKafkaConsumer"], is_manual: bool, # Subscriber args @@ -156,6 +160,7 @@ def create( group_id: Optional[str], listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], + partitions: Iterable["TopicPartition"], builder: Callable[..., "AIOKafkaConsumer"], is_manual: bool, # Subscriber args @@ -173,6 +178,17 @@ def create( "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", ]: + if not topics and not partitions and not pattern: + raise SetupError( + "You should provide either `topics` or `partitions` or `pattern`." + ) + elif topics and partitions: + raise SetupError("You can't provide both `topics` and `partitions`.") + elif topics and pattern: + raise SetupError("You can't provide both `topics` and `pattern`.") + elif pattern and partitions: + raise SetupError("You can't provide both `pattern` and `partitions`.") + if batch: return AsyncAPIBatchSubscriber( *topics, @@ -181,6 +197,7 @@ def create( group_id=group_id, listener=listener, pattern=pattern, + partitions=partitions, builder=builder, is_manual=is_manual, no_ack=no_ack, @@ -197,6 +214,7 @@ def create( group_id=group_id, listener=listener, pattern=pattern, + partitions=partitions, builder=builder, is_manual=is_manual, no_ack=no_ack, diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index a84eb883db..67b3909d22 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -7,12 +7,14 @@ Callable, Dict, Iterable, + List, Optional, Sequence, Tuple, ) import anyio +from aiokafka import TopicPartition from aiokafka.errors import ConsumerStoppedError, KafkaError from typing_extensions import override @@ -56,6 +58,7 @@ def __init__( builder: Callable[..., "AIOKafkaConsumer"], listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], + partitions: Iterable["TopicPartition"], is_manual: bool, # Subscriber args default_parser: "AsyncCallable", @@ -94,6 +97,7 @@ def __init__( self.client_id = "" self.__pattern = pattern self.__listener = listener + self.partitions = partitions self.__connection_args: "ConsumerConnectionParams" = {} @override @@ -139,13 +143,18 @@ async def start(self) -> None: client_id=self.client_id, **self.__connection_args, ) - consumer.subscribe( - topics=self.topics, - pattern=self.__pattern, - listener=self.__listener, - ) - await consumer.start() + if self.topics: + consumer.subscribe( + topics=self.topics, + pattern=self.__pattern, + listener=self.__listener, + ) + + elif self.partitions: + consumer.assign(partitions=self.partitions) + + await consumer.start() await super().start() self.task = asyncio.create_task(self._consume()) @@ -213,9 +222,18 @@ def get_routing_hash( ) -> int: return hash("".join((*topics, group_id or ""))) + @property + def topic_names(self) -> List[str]: + if self.__pattern: + return [self.__pattern] + elif self.topics: + return list(self.topics) + else: + return [f"{p.topic}-{p.partition}" for p in self.partitions] + def __hash__(self) -> int: return self.get_routing_hash( - topics=(*self.topics, self.__pattern or ""), + topics=self.topic_names, group_id=self.group_id, ) @@ -236,7 +254,7 @@ def get_log_context( message: Optional["StreamMessage[ConsumerRecord]"], ) -> Dict[str, str]: if message is None: - topic = ",".join(self.topics) + topic = ",".join(self.topic_names) elif isinstance(message.raw_message, Sequence): topic = message.raw_message[0].topic else: @@ -251,6 +269,14 @@ def get_log_context( def add_prefix(self, prefix: str) -> None: self.topics = tuple("".join((prefix, t)) for t in self.topics) + self.partitions = [ + TopicPartition( + topic="".join((prefix, p.topic)), + partition=p.partition, + ) + for p in self.partitions + ] + class DefaultSubscriber(LogicSubscriber["ConsumerRecord"]): def __init__( @@ -260,6 +286,7 @@ def __init__( group_id: Optional[str], listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], + partitions: Iterable["TopicPartition"], builder: Callable[..., "AIOKafkaConsumer"], is_manual: bool, # Subscriber args @@ -277,6 +304,7 @@ def __init__( group_id=group_id, listener=listener, pattern=pattern, + partitions=partitions, builder=builder, is_manual=is_manual, # subscriber args @@ -308,6 +336,7 @@ def __init__( group_id: Optional[str], listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], + partitions: Iterable["TopicPartition"], builder: Callable[..., "AIOKafkaConsumer"], is_manual: bool, # Subscriber args @@ -330,6 +359,7 @@ def __init__( group_id=group_id, listener=listener, pattern=pattern, + partitions=partitions, builder=builder, is_manual=is_manual, # subscriber args diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py old mode 100644 new mode 100755 index fb9e71417f..e28056edf6 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -5,6 +5,7 @@ from typing_extensions import override from faststream.broker.message import encode_message, gen_cor_id +from faststream.kafka import TopicPartition from faststream.kafka.broker import KafkaBroker from faststream.kafka.publisher.asyncapi import AsyncAPIBatchPublisher from faststream.kafka.publisher.producer import AioKafkaFastProducer @@ -31,10 +32,17 @@ def create_publisher_fake_subscriber( broker: KafkaBroker, publisher: "AsyncAPIPublisher[Any]", ) -> "HandlerCallWrapper[Any, Any, Any]": - sub = broker.subscriber( - publisher.topic, - batch=isinstance(publisher, AsyncAPIBatchPublisher), - ) + if publisher.partition: + tp = TopicPartition(topic=publisher.topic, partition=publisher.partition) + sub = broker.subscriber( + partitions=[tp], + batch=isinstance(publisher, AsyncAPIBatchPublisher), + ) + else: + sub = broker.subscriber( + publisher.topic, + batch=isinstance(publisher, AsyncAPIBatchPublisher), + ) if not sub.calls: @@ -92,7 +100,16 @@ async def publish( # type: ignore[override] ) for handler in self.broker._subscribers.values(): # pragma: no branch - if topic in handler.topics: + call: bool = False + + for p in handler.partitions: + if p.topic == topic and (partition is None or p.partition == partition): + call = True + + if not call and topic in handler.topics: + call = True + + if call: return await call_handler( handler=handler, message=[incoming] diff --git a/pyproject.toml b/pyproject.toml index 29ea79f8c3..5356223f23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,13 +79,13 @@ telemetry = ["opentelemetry-sdk>=1.24.0,<2.0.0"] optionals = ["faststream[rabbit,kafka,confluent,nats,redis,telemetry]"] devdocs = [ - "mkdocs-material==9.5.18", + "mkdocs-material==9.5.21", "mkdocs-static-i18n==1.2.2", "mdx-include==1.4.2", "mkdocstrings[python]==0.25.0", "mkdocs-literate-nav==0.6.1", - "mkdocs-git-revision-date-localized-plugin==1.2.4", - "mike==2.0.0", # versioning + "mkdocs-git-revision-date-localized-plugin==1.2.5", + "mike==2.1.1", # versioning "mkdocs-minify-plugin==0.8.0", "mkdocs-macros-plugin==1.0.5", # includes with variables "mkdocs-glightbox==0.3.7", # img zoom @@ -110,7 +110,7 @@ types = [ lint = [ "faststream[types]", - "ruff==0.4.2", + "ruff==0.4.3", "bandit==1.7.8", "semgrep==1.70.0", "codespell==2.2.6", @@ -125,7 +125,7 @@ test-core = [ testing = [ "faststream[test-core]", - "fastapi==0.110.2", + "fastapi==0.111.0", "pydantic-settings>=2.0.0,<3.0.0", "httpx==0.27.0", "PyYAML==6.0.1", diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index 52711b1c39..c738279e1f 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -5,7 +5,7 @@ from aiokafka import AIOKafkaConsumer from faststream.exceptions import AckMessage -from faststream.kafka import KafkaBroker +from faststream.kafka import KafkaBroker, TopicPartition from faststream.kafka.annotations import KafkaMessage from tests.brokers.base.consume import BrokerRealConsumeTestcase from tests.tools import spy_decorator @@ -73,6 +73,30 @@ async def handler(msg: KafkaMessage): assert event.is_set() + @pytest.mark.asyncio() + async def test_manual_partition_consume( + self, queue: str, full_broker: KafkaBroker, event: asyncio.Event + ): + tp1 = TopicPartition(queue, partition=0) + + @full_broker.subscriber(partitions=[tp1]) + async def handler_tp1(msg: KafkaMessage): + event.set() + + async with full_broker: + await full_broker.start() + await asyncio.wait( + ( + asyncio.create_task( + full_broker.publish("hello", queue, partition=0) + ), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + + assert event.is_set() + @pytest.mark.asyncio() @pytest.mark.slow() async def test_consume_ack_manual( diff --git a/tests/brokers/kafka/test_test_client.py b/tests/brokers/kafka/test_test_client.py index f6722b7005..b3219cf79f 100644 --- a/tests/brokers/kafka/test_test_client.py +++ b/tests/brokers/kafka/test_test_client.py @@ -3,7 +3,7 @@ import pytest from faststream import BaseMiddleware -from faststream.kafka import KafkaBroker, TestKafkaBroker +from faststream.kafka import KafkaBroker, TestKafkaBroker, TopicPartition from tests.brokers.base.testclient import BrokerTestclientTestcase @@ -17,6 +17,56 @@ def get_broker(self, apply_types: bool = False): def patch_broker(self, broker: KafkaBroker) -> TestKafkaBroker: return TestKafkaBroker(broker) + async def test_partition_match( + self, + queue: str, + ): + broker = self.get_broker() + + @broker.subscriber(partitions=[TopicPartition(queue, 1)]) + async def m(): + pass + + async with self.patch_broker(broker) as br: + await br.publish("hello", queue) + + m.mock.assert_called_once_with("hello") + + async def test_partition_match_exect( + self, + queue: str, + ): + broker = self.get_broker() + + @broker.subscriber(partitions=[TopicPartition(queue, 1)]) + async def m(): + pass + + async with self.patch_broker(broker) as br: + await test_broker.publish("hello", queue, partition=1) + + m.mock.assert_called_once_with("hello") + + async def test_partition_missmatch( + self, + queue: str, + ): + broker = self.get_broker() + + @broker.subscriber(partitions=[TopicPartition(queue, 1)]) + async def m(): + pass + + @broker.subscriber(queue) + async def m2(): + pass + + async with self.patch_broker(broker) as br: + await br.publish("hello", queue, partition=2) + + assert not m.mock.called + m2.mock.assert_called_once_with("hello") + @pytest.mark.kafka() async def test_with_real_testclient( self,