Skip to content

Commit

Permalink
Fix batch nack (#1689)
Browse files Browse the repository at this point in the history
* Add missing seek in confluent FakeConsumer

* Fix nack for batch consumers in kafka and confluent brokers
  • Loading branch information
kumaranvpl authored Aug 16, 2024
1 parent 4bc5193 commit 16417cc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
14 changes: 11 additions & 3 deletions faststream/confluent/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class FakeConsumer:
async def commit(self) -> None:
pass

async def seek(self, **kwargs: Any) -> None:
pass


FAKE_CONSUMER = FakeConsumer()

Expand Down Expand Up @@ -56,9 +59,14 @@ async def ack(self) -> None:
async def nack(self) -> None:
"""Reject the Kafka message."""
if self.is_manual and not self.committed:
raw_message = (
self.raw_message[0]
if isinstance(self.raw_message, tuple)
else self.raw_message
)
await self.consumer.seek( # type: ignore[attr-defined]
topic=self.raw_message.topic(), # type: ignore[union-attr]
partition=self.raw_message.partition(), # type: ignore[union-attr]
offset=self.raw_message.offset(), # type: ignore[union-attr]
topic=raw_message.topic(),
partition=raw_message.partition(),
offset=raw_message.offset(),
)
await super().nack()
11 changes: 8 additions & 3 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ def __init__(
async def nack(self) -> None:
"""Reject the Kafka message."""
if not self.committed:
raw_message = (
self.raw_message[0]
if isinstance(self.raw_message, tuple)
else self.raw_message
)
topic_partition = AIOKafkaTopicPartition(
self.raw_message.topic, # type: ignore[union-attr]
self.raw_message.partition, # type: ignore[union-attr]
raw_message.topic,
raw_message.partition,
)

self.consumer.seek( # type: ignore[attr-defined]
partition=topic_partition,
offset=self.raw_message.offset, # type: ignore[union-attr]
offset=raw_message.offset,
)

await super().nack()
Expand Down
27 changes: 27 additions & 0 deletions tests/brokers/confluent/test_test_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
from unittest.mock import patch

import pytest

from faststream import BaseMiddleware
from faststream.confluent import KafkaBroker, TestKafkaBroker
from faststream.confluent.annotations import KafkaMessage
from faststream.confluent.message import FAKE_CONSUMER
from faststream.confluent.testing import FakeProducer
from tests.brokers.base.testclient import BrokerTestclientTestcase
from tests.tools import spy_decorator

from .basic import ConfluentTestcaseConfig

Expand All @@ -25,6 +29,29 @@ def patch_broker(self, broker: KafkaBroker) -> TestKafkaBroker:
def get_fake_producer_class(self) -> type:
return FakeProducer

async def test_message_nack_seek(
self,
queue: str,
):
broker = self.get_broker(apply_types=True)

@broker.subscriber(
queue,
group_id=f"{queue}-consume",
auto_commit=False,
auto_offset_reset="earliest",
)
async def m(msg: KafkaMessage):
await msg.nack()

async with self.patch_broker(broker) as br:
with patch.object(
FAKE_CONSUMER, "seek", spy_decorator(FAKE_CONSUMER.seek)
) as mocked:
await br.publish("hello", queue)
m.mock.assert_called_once_with("hello")
mocked.mock.assert_called_once()

@pytest.mark.confluent()
async def test_with_real_testclient(
self,
Expand Down

0 comments on commit 16417cc

Please sign in to comment.