From 4e509f56048d1f855e1b45204b7627af9c589ae3 Mon Sep 17 00:00:00 2001 From: Jonathan Serafini Date: Wed, 14 Aug 2024 05:18:43 -0400 Subject: [PATCH] resolve missing seek on kafka fakeconsumer (#1682) --- faststream/kafka/message.py | 5 +++++ tests/brokers/kafka/test_test_client.py | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/faststream/kafka/message.py b/faststream/kafka/message.py index 131f0b6bba..02d8bbbfe5 100644 --- a/faststream/kafka/message.py +++ b/faststream/kafka/message.py @@ -20,6 +20,9 @@ class FakeConsumer: async def commit(self) -> None: pass + def seek(self, **kwargs: Any) -> None: + pass + FAKE_CONSUMER = FakeConsumer() @@ -54,10 +57,12 @@ async def nack(self) -> None: self.raw_message.topic, # type: ignore[union-attr] self.raw_message.partition, # type: ignore[union-attr] ) + self.consumer.seek( # type: ignore[attr-defined] partition=topic_partition, offset=self.raw_message.offset, # type: ignore[union-attr] ) + await super().nack() diff --git a/tests/brokers/kafka/test_test_client.py b/tests/brokers/kafka/test_test_client.py index 7940645f98..6842e7fc11 100644 --- a/tests/brokers/kafka/test_test_client.py +++ b/tests/brokers/kafka/test_test_client.py @@ -1,11 +1,15 @@ import asyncio +from unittest.mock import patch import pytest from faststream import BaseMiddleware from faststream.kafka import KafkaBroker, TestKafkaBroker, TopicPartition +from faststream.kafka.annotations import KafkaMessage +from faststream.kafka.message import FAKE_CONSUMER from faststream.kafka.testing import FakeProducer from tests.brokers.base.testclient import BrokerTestclientTestcase +from tests.tools import spy_decorator @pytest.mark.asyncio() @@ -71,6 +75,23 @@ async def m2(msg): assert not m.mock.called m2.mock.assert_called_once_with("hello") + async def test_message_nack_seek( + self, + queue: str, + ): + broker = self.get_broker(apply_types=True) + + @broker.subscriber(queue) + async def m(msg: KafkaMessage): + await msg.nack() + + async with self.patch_broker(broker) as br: + with patch.object( + FAKE_CONSUMER, "seek", spy_decorator(FAKE_CONSUMER.seek) + ) as mocked: + await br.publish("hello", queue) + mocked.mock.assert_called_once() + @pytest.mark.kafka() async def test_with_real_testclient( self,