Skip to content

Commit

Permalink
feat: add tests for manual partition consume
Browse files Browse the repository at this point in the history
  • Loading branch information
spataphore1337 committed May 7, 2024
1 parent 500fb60 commit bf21bfc
Showing 1 changed file with 50 additions and 26 deletions.
76 changes: 50 additions & 26 deletions tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from aiokafka import AIOKafkaConsumer

from faststream.exceptions import AckMessage
from faststream.kafka import KafkaBroker
from faststream.kafka import KafkaBroker, TopicPartition
from faststream.kafka.annotations import KafkaMessage
from tests.brokers.base.consume import BrokerRealConsumeTestcase
from tests.tools import spy_decorator
Expand Down Expand Up @@ -36,10 +36,10 @@ async def handler(msg):
@pytest.mark.asyncio()
@pytest.mark.slow()
async def test_consume_ack(
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
):
@full_broker.subscriber(queue, group_id="test", auto_commit=False)
async def handler(msg: KafkaMessage):
Expand All @@ -49,7 +49,7 @@ async def handler(msg: KafkaMessage):
await full_broker.start()

with patch.object(
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
) as m:
await asyncio.wait(
(
Expand All @@ -67,13 +67,37 @@ async def handler(msg: KafkaMessage):

assert event.is_set()

@pytest.mark.asyncio()
async def test_manual_partition_consume(
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event):
tp1 = TopicPartition(queue, partition=0)

@full_broker.subscriber(partitions=[tp1])
async def handler_tp1(msg: KafkaMessage):
event.set()

async with full_broker:
await full_broker.start()
await asyncio.wait(
(
asyncio.create_task(full_broker.publish("hello", queue, partition=0)),
asyncio.create_task(event.wait())
),
timeout=10
)

assert event.is_set()

@pytest.mark.asyncio()
@pytest.mark.slow()
async def test_consume_ack_manual(
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
):
@full_broker.subscriber(queue, group_id="test", auto_commit=False)
async def handler(msg: KafkaMessage):
Expand All @@ -84,7 +108,7 @@ async def handler(msg: KafkaMessage):
await full_broker.start()

with patch.object(
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
) as m:
await asyncio.wait(
(
Expand All @@ -105,10 +129,10 @@ async def handler(msg: KafkaMessage):
@pytest.mark.asyncio()
@pytest.mark.slow()
async def test_consume_ack_raise(
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
):
@full_broker.subscriber(queue, group_id="test", auto_commit=False)
async def handler(msg: KafkaMessage):
Expand All @@ -119,7 +143,7 @@ async def handler(msg: KafkaMessage):
await full_broker.start()

with patch.object(
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
) as m:
await asyncio.wait(
(
Expand All @@ -140,10 +164,10 @@ async def handler(msg: KafkaMessage):
@pytest.mark.asyncio()
@pytest.mark.slow()
async def test_nack(
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
):
@full_broker.subscriber(queue, group_id="test", auto_commit=False)
async def handler(msg: KafkaMessage):
Expand All @@ -154,7 +178,7 @@ async def handler(msg: KafkaMessage):
await full_broker.start()

with patch.object(
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
) as m:
await asyncio.wait(
(
Expand All @@ -175,18 +199,18 @@ async def handler(msg: KafkaMessage):
@pytest.mark.asyncio()
@pytest.mark.slow()
async def test_consume_no_ack(
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
self,
queue: str,
full_broker: KafkaBroker,
event: asyncio.Event,
):
@full_broker.subscriber(queue, group_id="test", no_ack=True)
async def handler(msg: KafkaMessage):
event.set()

await full_broker.start()
with patch.object(
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit)
) as m:
await asyncio.wait(
(
Expand Down

0 comments on commit bf21bfc

Please sign in to comment.