Skip to content

Commit

Permalink
fix: include NatsRouter streams to original broker (#1509)
Browse files Browse the repository at this point in the history
* fix: includer NatsRouter streams to original broker

* chore: limit typing-extensions version only for tests

* docs: generate API References

* fix: remove debug message

* chore: update ruff

* chore: use GHA concurency to cancel previous run at push

* chore: test GHA cancelation

* chore: add GHA concurency to generating API CI

---------

Co-authored-by: Lancetnik <[email protected]>
  • Loading branch information
Lancetnik and Lancetnik authored Jun 7, 2024
1 parent 4ebaa65 commit a3c353f
Show file tree
Hide file tree
Showing 23 changed files with 140 additions and 57 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/docs_update-references.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:
paths:
- faststream/**

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

permissions:
contents: write

Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ on:
schedule:
- cron: '39 20 * * 0'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
analyze:
if: github.event.pull_request.draft == false
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_dependency-review.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ on:
paths:
- pyproject.toml

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

permissions:
contents: read

Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ on:
types:
- checks_requested

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
static_analysis:
if: github.event.pull_request.draft == false
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ search:
- message
- [ConsumerProtocol](api/faststream/kafka/message/ConsumerProtocol.md)
- [FakeConsumer](api/faststream/kafka/message/FakeConsumer.md)
- [KafkaAckableMessage](api/faststream/kafka/message/KafkaAckableMessage.md)
- [KafkaMessage](api/faststream/kafka/message/KafkaMessage.md)
- opentelemetry
- [KafkaTelemetryMiddleware](api/faststream/kafka/opentelemetry/KafkaTelemetryMiddleware.md)
Expand All @@ -551,6 +552,7 @@ search:
- [KafkaTelemetrySettingsProvider](api/faststream/kafka/opentelemetry/provider/KafkaTelemetrySettingsProvider.md)
- [telemetry_attributes_provider_factory](api/faststream/kafka/opentelemetry/provider/telemetry_attributes_provider_factory.md)
- parser
- [AioKafkaBatchParser](api/faststream/kafka/parser/AioKafkaBatchParser.md)
- [AioKafkaParser](api/faststream/kafka/parser/AioKafkaParser.md)
- publisher
- asyncapi
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/message/KafkaAckableMessage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.message.KafkaAckableMessage
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/parser/AioKafkaBatchParser.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.parser.AioKafkaBatchParser
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.10"
__version__ = "0.5.11"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
6 changes: 3 additions & 3 deletions faststream/confluent/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def get_publish_attrs_from_kwargs(

return attrs

@staticmethod
def get_publish_destination_name(
self,
kwargs: "AnyDict",
) -> str:
return cast(str, kwargs["topic"])
Expand Down Expand Up @@ -66,8 +66,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[Message]",
) -> str:
return cast(str, msg.raw_message.topic())
Expand Down Expand Up @@ -95,8 +95,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[Tuple[Message, ...]]",
) -> str:
return cast(str, msg.raw_message[0].topic())
Expand Down
2 changes: 2 additions & 0 deletions faststream/kafka/annotations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from aiokafka import AIOKafkaConsumer
from typing_extensions import Annotated

from faststream.annotations import ContextRepo, Logger, NoCast
Expand All @@ -15,6 +16,7 @@
"KafkaProducer",
)

Consumer = Annotated[AIOKafkaConsumer, Context("handler_.consumer")]
KafkaMessage = Annotated[KM, Context("message")]
KafkaBroker = Annotated[KB, Context("broker")]
KafkaProducer = Annotated[AioKafkaFastProducer, Context("broker._producer")]
6 changes: 3 additions & 3 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ def __init__(
self,
*args: Any,
consumer: ConsumerProtocol,
is_manual: bool = False,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)

self.is_manual = is_manual
self.consumer = consumer


class KafkaAckableMessage(KafkaMessage):
async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if self.is_manual and not self.committed:
if not self.committed:
await self.consumer.commit()
await super().ack()
6 changes: 3 additions & 3 deletions faststream/kafka/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def get_publish_attrs_from_kwargs(

return attrs

@staticmethod
def get_publish_destination_name(
self,
kwargs: "AnyDict",
) -> str:
return cast(str, kwargs["topic"])
Expand Down Expand Up @@ -66,8 +66,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[ConsumerRecord]",
) -> str:
return cast(str, msg.raw_message.topic)
Expand Down Expand Up @@ -96,8 +96,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
) -> str:
return cast(str, msg.raw_message[0].topic)
Expand Down
43 changes: 25 additions & 18 deletions faststream/kafka/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type

from faststream.broker.message import decode_message, gen_cor_id
from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage
Expand All @@ -15,14 +15,17 @@
class AioKafkaParser:
"""A class to parse Kafka messages."""

@staticmethod
def __init__(self, msg_class: Type[KafkaMessage]) -> None:
self.msg_class = msg_class

async def parse_message(
self,
message: "ConsumerRecord",
) -> "StreamMessage[ConsumerRecord]":
"""Parses a Kafka message."""
headers = {i: j.decode() for i, j in message.headers}
handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")
return KafkaMessage(
handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")
return self.msg_class(
body=message.value,
headers=headers,
reply_to=headers.get("reply_to", ""),
Expand All @@ -31,11 +34,19 @@ async def parse_message(
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
is_manual=getattr(handler, "is_manual", True),
)

@staticmethod
async def parse_message_batch(
async def decode_message(
self,
msg: "StreamMessage[ConsumerRecord]",
) -> "DecodedMessage":
"""Decodes a message."""
return decode_message(msg)


class AioKafkaBatchParser(AioKafkaParser):
async def parse_message(
self,
message: Tuple["ConsumerRecord", ...],
) -> "StreamMessage[Tuple[ConsumerRecord, ...]]":
"""Parses a batch of messages from a Kafka consumer."""
Expand All @@ -53,7 +64,7 @@ async def parse_message_batch(

handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")

return KafkaMessage(
return self.msg_class(
body=body,
headers=headers,
batch_headers=batch_headers,
Expand All @@ -63,18 +74,14 @@ async def parse_message_batch(
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
is_manual=getattr(handler, "is_manual", True),
)

@staticmethod
async def decode_message(msg: "StreamMessage[ConsumerRecord]") -> "DecodedMessage":
"""Decodes a message."""
return decode_message(msg)

@classmethod
async def decode_message_batch(
cls,
async def decode_message(
self,
msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
) -> "DecodedMessage":
"""Decode a batch of messages."""
return [decode_message(await cls.parse_message(m)) for m in msg.raw_message]
return [
decode_message(await super(AioKafkaBatchParser, self).parse_message(m))
for m in msg.raw_message
]
23 changes: 14 additions & 9 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
CustomCallable,
MsgType,
)
from faststream.kafka.parser import AioKafkaParser
from faststream.kafka.message import KafkaAckableMessage, KafkaMessage
from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer, ConsumerRecord
Expand Down Expand Up @@ -60,7 +61,6 @@ def __init__(
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Iterable["TopicPartition"],
is_manual: bool,
# Subscriber args
default_parser: "AsyncCallable",
default_decoder: "AsyncCallable",
Expand Down Expand Up @@ -93,7 +93,6 @@ def __init__(
self.partitions = partitions
self.group_id = group_id

self.is_manual = is_manual
self.builder = None
self.consumer = None
self.task = None
Expand Down Expand Up @@ -306,17 +305,20 @@ def __init__(
description_: Optional[str],
include_in_schema: bool,
) -> None:
parser = AioKafkaParser(
msg_class=KafkaAckableMessage if is_manual else KafkaMessage
)

super().__init__(
*topics,
group_id=group_id,
listener=listener,
pattern=pattern,
connection_args=connection_args,
partitions=partitions,
is_manual=is_manual,
# subscriber args
default_parser=AioKafkaParser.parse_message,
default_decoder=AioKafkaParser.decode_message,
default_parser=parser.parse_message,
default_decoder=parser.decode_message,
# Propagated args
no_ack=no_ack,
no_reply=no_reply,
Expand Down Expand Up @@ -363,17 +365,20 @@ def __init__(
self.batch_timeout_ms = batch_timeout_ms
self.max_records = max_records

parser = AioKafkaBatchParser(
msg_class=KafkaAckableMessage if is_manual else KafkaMessage
)

super().__init__(
*topics,
group_id=group_id,
listener=listener,
pattern=pattern,
connection_args=connection_args,
partitions=partitions,
is_manual=is_manual,
# subscriber args
default_parser=AioKafkaParser.parse_message_batch,
default_decoder=AioKafkaParser.decode_message_batch,
default_parser=parser.parse_message,
default_decoder=parser.decode_message,
# Propagated args
no_ack=no_ack,
no_reply=no_reply,
Expand Down
23 changes: 22 additions & 1 deletion faststream/nats/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

if TYPE_CHECKING:
from fast_depends.dependencies import Depends
from nats.aio.msg import Msg # noqa: F401
from nats.aio.msg import Msg

from faststream.broker.types import (
BrokerMiddleware,
CustomCallable,
Filter,
PublisherMiddleware,
Expand Down Expand Up @@ -348,3 +349,23 @@ def publisher( # type: ignore[override]
),
)
return publisher

@override
def include_router( # type: ignore[override]
self,
router: "NatsRegistrator",
*,
prefix: str = "",
dependencies: Iterable["Depends"] = (),
middlewares: Iterable["BrokerMiddleware[Msg]"] = (),
include_in_schema: Optional[bool] = None,
) -> None:
self._stream_builder.objects.update(router._stream_builder.objects)

return super().include_router(
router,
prefix=prefix,
dependencies=dependencies,
middlewares=middlewares,
include_in_schema=include_in_schema,
)
Loading

0 comments on commit a3c353f

Please sign in to comment.