Skip to content

Commit

Permalink
resolve missing seek on kafka fakeconsumer (#1682)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanSerafini authored Aug 14, 2024
1 parent f2d6178 commit 4e509f5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
5 changes: 5 additions & 0 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class FakeConsumer:
async def commit(self) -> None:
pass

def seek(self, **kwargs: Any) -> None:
pass


FAKE_CONSUMER = FakeConsumer()

Expand Down Expand Up @@ -54,10 +57,12 @@ async def nack(self) -> None:
self.raw_message.topic, # type: ignore[union-attr]
self.raw_message.partition, # type: ignore[union-attr]
)

self.consumer.seek( # type: ignore[attr-defined]
partition=topic_partition,
offset=self.raw_message.offset, # type: ignore[union-attr]
)

await super().nack()


Expand Down
21 changes: 21 additions & 0 deletions tests/brokers/kafka/test_test_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
from unittest.mock import patch

import pytest

from faststream import BaseMiddleware
from faststream.kafka import KafkaBroker, TestKafkaBroker, TopicPartition
from faststream.kafka.annotations import KafkaMessage
from faststream.kafka.message import FAKE_CONSUMER
from faststream.kafka.testing import FakeProducer
from tests.brokers.base.testclient import BrokerTestclientTestcase
from tests.tools import spy_decorator


@pytest.mark.asyncio()
Expand Down Expand Up @@ -71,6 +75,23 @@ async def m2(msg):
assert not m.mock.called
m2.mock.assert_called_once_with("hello")

async def test_message_nack_seek(
self,
queue: str,
):
broker = self.get_broker(apply_types=True)

@broker.subscriber(queue)
async def m(msg: KafkaMessage):
await msg.nack()

async with self.patch_broker(broker) as br:
with patch.object(
FAKE_CONSUMER, "seek", spy_decorator(FAKE_CONSUMER.seek)
) as mocked:
await br.publish("hello", queue)
mocked.mock.assert_called_once()

@pytest.mark.kafka()
async def test_with_real_testclient(
self,
Expand Down

0 comments on commit 4e509f5

Please sign in to comment.