Skip to content

Commit

Permalink
fix #1967: correct empty kafka message body processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Dec 5, 2024
1 parent 93e80af commit c565d84
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 5 deletions.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.32"
__version__ = "0.5.33"

SERVICE_NAME = f"faststream-{__version__}"
4 changes: 2 additions & 2 deletions faststream/confluent/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def parse_message(
"""Parses a Kafka message."""
headers = _parse_msg_headers(message.headers() or ())

body = message.value()
body = message.value() or b""
offset = message.offset()
_, timestamp = message.timestamp()

Expand Down Expand Up @@ -52,7 +52,7 @@ async def parse_message_batch(
last = message[-1]

for m in message:
body.append(m.value())
body.append(m.value() or b"")
batch_headers.append(_parse_msg_headers(m.headers() or ()))

headers = next(iter(batch_headers), {})
Expand Down
4 changes: 2 additions & 2 deletions faststream/kafka/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def parse_message(
handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")

return self.msg_class(
body=message.value,
body=message.value or b"",
headers=headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
Expand Down Expand Up @@ -72,7 +72,7 @@ async def parse_message(
last = message[-1]

for m in message:
body.append(m.value)
body.append(m.value or b"")
batch_headers.append({i: j.decode() for i, j in m.headers})

headers = next(iter(batch_headers), {})
Expand Down
54 changes: 54 additions & 0 deletions tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,57 @@ async def handler(msg):
assert event.is_set()
assert event2.is_set()
assert mock.call_count == 2, mock.call_count

@pytest.mark.asyncio
async def test_consume_without_value(
self,
mock: MagicMock,
queue: str,
event: asyncio.Event,
) -> None:
consume_broker = self.get_broker()

@consume_broker.subscriber(queue)
async def handler(msg):
event.set()
mock(msg)

async with self.patch_broker(consume_broker) as br:
await br.start()

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.send(queue, key=b"")),
asyncio.create_task(event.wait()),
),
timeout=3,
)

mock.assert_called_once_with(b"")

@pytest.mark.asyncio
async def test_consume_batch_without_value(
self,
mock: MagicMock,
queue: str,
event: asyncio.Event,
) -> None:
consume_broker = self.get_broker()

@consume_broker.subscriber(queue, batch=True)
async def handler(msg):
event.set()
mock(msg)

async with self.patch_broker(consume_broker) as br:
await br.start()

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.send(queue, key=b"")),
asyncio.create_task(event.wait()),
),
timeout=3,
)

mock.assert_called_once_with([b""])

0 comments on commit c565d84

Please sign in to comment.