diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index fdef8a20bc..b06c32adae 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -5,7 +5,7 @@ from aiokafka import AIOKafkaConsumer from faststream.exceptions import AckMessage -from faststream.kafka import KafkaBroker +from faststream.kafka import KafkaBroker, TopicPartition from faststream.kafka.annotations import KafkaMessage from tests.brokers.base.consume import BrokerRealConsumeTestcase from tests.tools import spy_decorator @@ -36,10 +36,10 @@ async def handler(msg): @pytest.mark.asyncio() @pytest.mark.slow() async def test_consume_ack( - self, - queue: str, - full_broker: KafkaBroker, - event: asyncio.Event, + self, + queue: str, + full_broker: KafkaBroker, + event: asyncio.Event, ): @full_broker.subscriber(queue, group_id="test", auto_commit=False) async def handler(msg: KafkaMessage): @@ -49,7 +49,7 @@ async def handler(msg: KafkaMessage): await full_broker.start() with patch.object( - AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) ) as m: await asyncio.wait( ( @@ -67,13 +67,37 @@ async def handler(msg: KafkaMessage): assert event.is_set() + @pytest.mark.asyncio() + async def test_manual_partition_consume( + self, + queue: str, + full_broker: KafkaBroker, + event: asyncio.Event): + tp1 = TopicPartition(queue, partition=0) + + @full_broker.subscriber(partitions=[tp1]) + async def handler_tp1(msg: KafkaMessage): + event.set() + + async with full_broker: + await full_broker.start() + await asyncio.wait( + ( + asyncio.create_task(full_broker.publish("hello", queue, partition=0)), + asyncio.create_task(event.wait()) + ), + timeout=10 + ) + + assert event.is_set() + @pytest.mark.asyncio() @pytest.mark.slow() async def test_consume_ack_manual( - self, - queue: str, - full_broker: KafkaBroker, - event: asyncio.Event, + self, + queue: str, + full_broker: KafkaBroker, + event: asyncio.Event, ): @full_broker.subscriber(queue, group_id="test", auto_commit=False) async def handler(msg: KafkaMessage): @@ -84,7 +108,7 @@ async def handler(msg: KafkaMessage): await full_broker.start() with patch.object( - AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) ) as m: await asyncio.wait( ( @@ -105,10 +129,10 @@ async def handler(msg: KafkaMessage): @pytest.mark.asyncio() @pytest.mark.slow() async def test_consume_ack_raise( - self, - queue: str, - full_broker: KafkaBroker, - event: asyncio.Event, + self, + queue: str, + full_broker: KafkaBroker, + event: asyncio.Event, ): @full_broker.subscriber(queue, group_id="test", auto_commit=False) async def handler(msg: KafkaMessage): @@ -119,7 +143,7 @@ async def handler(msg: KafkaMessage): await full_broker.start() with patch.object( - AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) ) as m: await asyncio.wait( ( @@ -140,10 +164,10 @@ async def handler(msg: KafkaMessage): @pytest.mark.asyncio() @pytest.mark.slow() async def test_nack( - self, - queue: str, - full_broker: KafkaBroker, - event: asyncio.Event, + self, + queue: str, + full_broker: KafkaBroker, + event: asyncio.Event, ): @full_broker.subscriber(queue, group_id="test", auto_commit=False) async def handler(msg: KafkaMessage): @@ -154,7 +178,7 @@ async def handler(msg: KafkaMessage): await full_broker.start() with patch.object( - AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) ) as m: await asyncio.wait( ( @@ -175,10 +199,10 @@ async def handler(msg: KafkaMessage): @pytest.mark.asyncio() @pytest.mark.slow() async def test_consume_no_ack( - self, - queue: str, - full_broker: KafkaBroker, - event: asyncio.Event, + self, + queue: str, + full_broker: KafkaBroker, + event: asyncio.Event, ): @full_broker.subscriber(queue, group_id="test", no_ack=True) async def handler(msg: KafkaMessage): @@ -186,7 +210,7 @@ async def handler(msg: KafkaMessage): await full_broker.start() with patch.object( - AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) ) as m: await asyncio.wait( (