diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index c738279e1f..b96b26459e 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -75,21 +75,24 @@ async def handler(msg: KafkaMessage): @pytest.mark.asyncio() async def test_manual_partition_consume( - self, queue: str, full_broker: KafkaBroker, event: asyncio.Event + self, + queue: str, + event: asyncio.Event, ): + consume_broker = self.get_broker() + tp1 = TopicPartition(queue, partition=0) - @full_broker.subscriber(partitions=[tp1]) - async def handler_tp1(msg: KafkaMessage): + @consume_broker.subscriber(partitions=[tp1]) + async def handler_tp1(msg): event.set() - async with full_broker: - await full_broker.start() + async with self.patch_broker(consume_broker) as br: + await br.start() + await asyncio.wait( ( - asyncio.create_task( - full_broker.publish("hello", queue, partition=0) - ), + asyncio.create_task(br.publish("hello", queue, partition=0)), asyncio.create_task(event.wait()), ), timeout=10,