Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add RMQ channels options, support for prefix for routing_key, a… #1448

Merged
merged 7 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.5"
__version__ = "0.5.6"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
13 changes: 13 additions & 0 deletions faststream/broker/core/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ def __init__(
self._parser = parser
self._decoder = decoder

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
"""Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.
"""
self._middlewares = (*self._middlewares, middleware)

for sub in self._subscribers.values():
sub.add_middleware(middleware)

for pub in self._publishers.values():
pub.add_middleware(middleware)

@abstractmethod
def subscriber(
self,
Expand Down
2 changes: 2 additions & 0 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
TYPE_CHECKING,
Any,
Generic,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -38,6 +39,7 @@ class StreamMessage(Generic[MsgType]):

body: Union[bytes, Any]
headers: "AnyDict" = field(default_factory=dict)
batch_headers: List["AnyDict"] = field(default_factory=list)
path: "AnyDict" = field(default_factory=dict)

content_type: Optional[str] = None
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/publisher/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class PublisherProto(
_middlewares: Iterable["PublisherMiddleware"]
_producer: Optional["ProducerProto"]

@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

@staticmethod
@abstractmethod
def create() -> "PublisherProto[MsgType]":
Expand Down
10 changes: 9 additions & 1 deletion faststream/broker/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
from faststream.asyncapi.message import get_response_schema
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.publisher.proto import PublisherProto
from faststream.broker.types import MsgType, P_HandlerParams, T_HandlerReturn
from faststream.broker.types import (
BrokerMiddleware,
MsgType,
P_HandlerParams,
T_HandlerReturn,
)
from faststream.broker.wrapper.call import HandlerCallWrapper

if TYPE_CHECKING:
Expand Down Expand Up @@ -87,6 +92,9 @@ def __init__(
self.include_in_schema = include_in_schema
self.schema_ = schema_

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
self,
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class SubscriberProto(
_broker_middlewares: Iterable["BrokerMiddleware[MsgType]"]
_producer: Optional["ProducerProto"]

@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

@staticmethod
@abstractmethod
def create() -> "SubscriberProto[MsgType]":
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def __init__(
self.description_ = description_
self.include_in_schema = include_in_schema

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
self,
Expand Down
6 changes: 3 additions & 3 deletions faststream/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def serve(
),
),
is_factory: bool = typer.Option(
False,
"--factory", help="Treat APP as an application factory"
False, "--factory", help="Treat APP as an application factory"
),
) -> None:
"""Serve project AsyncAPI schema."""
Expand Down Expand Up @@ -110,7 +109,8 @@ def gen(
),
is_factory: bool = typer.Option(
False,
"--factory", help="Treat APP as an application factory"
"--factory",
help="Treat APP as an application factory",
),
) -> None:
"""Generate project AsyncAPI schema."""
Expand Down
4 changes: 3 additions & 1 deletion faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def publish(
rpc: bool = typer.Option(False, help="Enable RPC mode and system output"),
is_factory: bool = typer.Option(
False,
"--factory", help="Treat APP as an application factory"
"--factory",
is_flag=True,
help="Treat APP as an application factory",
),
) -> None:
"""Publish a message using the specified broker in a FastStream application.
Expand Down
41 changes: 22 additions & 19 deletions faststream/confluent/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, Union

from faststream.broker.message import decode_message, gen_cor_id
from faststream.confluent.message import FAKE_CONSUMER, KafkaMessage
Expand All @@ -20,18 +20,14 @@ async def parse_message(
message: "Message",
) -> "StreamMessage[Message]":
"""Parses a Kafka message."""
headers = {}
if message.headers() is not None:
for i, j in message.headers(): # type: ignore[union-attr]
if isinstance(j, str):
headers[i] = j
else:
headers[i] = j.decode()
headers = _parse_msg_headers(message.headers())

body = message.value()
offset = message.offset()
_, timestamp = message.timestamp()

handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")

return KafkaMessage(
body=body,
headers=headers,
Expand All @@ -49,28 +45,29 @@ async def parse_message_batch(
message: Tuple["Message", ...],
) -> "StreamMessage[Tuple[Message, ...]]":
"""Parses a batch of messages from a Kafka consumer."""
body: List[Any] = []
batch_headers: List[Dict[str, str]] = []

first = message[0]
last = message[-1]

headers = {}
if first.headers() is not None:
for i, j in first.headers(): # type: ignore[union-attr]
if isinstance(j, str):
headers[i] = j
else:
headers[i] = j.decode()
body = [m.value() for m in message]
first_offset = first.offset()
last_offset = last.offset()
for m in message:
body.append(m.value)
batch_headers.append(_parse_msg_headers(m.headers()))

headers = next(iter(batch_headers), {})

_, first_timestamp = first.timestamp()

handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")

return KafkaMessage(
body=body,
headers=headers,
batch_headers=batch_headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=f"{first_offset}-{last_offset}-{first_timestamp}",
message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}",
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
Expand All @@ -91,3 +88,9 @@ async def decode_message_batch(
) -> "DecodedMessage":
"""Decode a batch of messages."""
return [decode_message(await cls.parse_message(m)) for m in msg.raw_message]


def _parse_msg_headers(
headers: Sequence[Tuple[str, Union[bytes, str]]],
) -> Dict[str, str]:
return {i: j if isinstance(j, str) else j.decode() for i, j in headers}
17 changes: 14 additions & 3 deletions faststream/kafka/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from faststream.broker.message import decode_message, gen_cor_id
from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage
Expand Down Expand Up @@ -39,13 +39,24 @@ async def parse_message_batch(
message: Tuple["ConsumerRecord", ...],
) -> "StreamMessage[Tuple[ConsumerRecord, ...]]":
"""Parses a batch of messages from a Kafka consumer."""
body: List[Any] = []
batch_headers: List[Dict[str, str]] = []

first = message[0]
last = message[-1]
headers = {i: j.decode() for i, j in first.headers}

for m in message:
body.append(m.value)
batch_headers.append({i: j.decode() for i, j in m.headers})

headers = next(iter(batch_headers), {})

handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")

return KafkaMessage(
body=[m.value for m in message],
body=body,
headers=headers,
batch_headers=batch_headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=f"{first.offset}-{last.offset}-{first.timestamp}",
Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,12 +623,12 @@ async def start(self) -> None:
)

except BadRequestError as e:
old_config = (await self.stream.stream_info(stream.name)).config

if (
e.description
== "stream name already in use with a different configuration"
):
old_config = (await self.stream.stream_info(stream.name)).config

self._log(str(e), logging.WARNING, log_context)
await self.stream.update_stream(
config=stream.config,
Expand Down
20 changes: 16 additions & 4 deletions faststream/nats/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional

from faststream.broker.message import StreamMessage, decode_message, gen_cor_id
from faststream.nats.message import NatsBatchMessage, NatsMessage
Expand Down Expand Up @@ -102,15 +102,27 @@ async def parse_batch(
self,
message: List["Msg"],
) -> "StreamMessage[List[Msg]]":
if first_msg := next(iter(message), None):
path = self.get_path(first_msg.subject)
body: List[bytes] = []
batch_headers: List[Dict[str, str]] = []

if message:
path = self.get_path(message[0].subject)

for m in message:
batch_headers.append(m.headers or {})
body.append(m.data)

else:
path = None

headers = next(iter(batch_headers), {})

return NatsBatchMessage(
raw_message=message,
body=[m.data for m in message],
body=body,
path=path or {},
headers=headers,
batch_headers=batch_headers,
)

async def decode_batch(
Expand Down
1 change: 1 addition & 0 deletions faststream/rabbit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
"ReplyConfig",
"RabbitExchange",
"RabbitQueue",
# Annotations
"RabbitMessage",
)
13 changes: 13 additions & 0 deletions faststream/rabbit/annotations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from aio_pika import RobustChannel, RobustConnection
from typing_extensions import Annotated

from faststream.annotations import ContextRepo, Logger, NoCast
Expand All @@ -13,8 +14,20 @@
"RabbitMessage",
"RabbitBroker",
"RabbitProducer",
"Channel",
"Connection",
)

RabbitMessage = Annotated[RM, Context("message")]
RabbitBroker = Annotated[RB, Context("broker")]
RabbitProducer = Annotated[AioPikaFastProducer, Context("broker._producer")]

Channel = Annotated[RobustChannel, Context("broker._channel")]
Connection = Annotated[RobustConnection, Context("broker._connection")]

# NOTE: transaction is not for the public usage yet
# async def _get_transaction(connection: Connection) -> RabbitTransaction:
# async with connection.channel(publisher_confirms=False) as channel:
# yield channel.transaction()

# Transaction = Annotated[RabbitTransaction, Depends(_get_transaction)]
Loading