From a173bb53e5c8c38eb6595331d16f1d3a8fc66b5a Mon Sep 17 00:00:00 2001 From: sheldy <85823514+sheldygg@users.noreply.github.com> Date: Fri, 29 Nov 2024 16:29:44 +0100 Subject: [PATCH] Fix middlewares order (#1935) * Fix middlewares order for publishing scope * Missing middlewares * ruff * add tests * ruff tests * pre-commit * Add test for `aenter`, `aexit` * pre-commit * Update tests * Fix tests * Fix tests again * lint: fix mypy * Update tests * pre-commit * Fix tests * Update tests, again :) * format tests * Remove logger from tests * format * fix: correct NATS Publisher middlewares order * fix: correct All Publisher middlewares order * Try to fix types * format * chore: fix CI * fix: correct confluent middlewares order * tests: make order middlewares tests in-memory --------- Co-authored-by: Nikita Pastukhov Co-authored-by: Pastukhov Nikita --- faststream/broker/core/abc.py | 3 +- faststream/broker/core/usecase.py | 8 +- faststream/broker/fastapi/router.py | 2 +- faststream/broker/publisher/fake.py | 8 +- faststream/broker/publisher/proto.py | 18 +- faststream/broker/publisher/usecase.py | 6 +- faststream/broker/router.py | 3 +- faststream/broker/subscriber/call_item.py | 5 +- faststream/broker/subscriber/proto.py | 15 +- faststream/broker/subscriber/usecase.py | 27 +- faststream/broker/utils.py | 6 +- faststream/broker/wrapper/proto.py | 7 +- faststream/confluent/broker/broker.py | 2 +- faststream/confluent/broker/registrator.py | 16 +- faststream/confluent/fastapi/fastapi.py | 18 +- faststream/confluent/publisher/asyncapi.py | 26 +- faststream/confluent/publisher/usecase.py | 23 +- faststream/confluent/router.py | 6 +- faststream/confluent/subscriber/factory.py | 17 +- faststream/confluent/subscriber/usecase.py | 6 +- faststream/kafka/broker/broker.py | 5 +- faststream/kafka/broker/registrator.py | 16 +- faststream/kafka/fastapi/fastapi.py | 12 +- faststream/kafka/publisher/asyncapi.py | 18 +- faststream/kafka/publisher/usecase.py | 23 +- faststream/kafka/router.py | 6 +- faststream/kafka/subscriber/factory.py | 9 +- faststream/kafka/subscriber/usecase.py | 8 +- faststream/nats/broker/broker.py | 3 +- faststream/nats/broker/registrator.py | 8 +- faststream/nats/fastapi/fastapi.py | 6 +- faststream/nats/publisher/asyncapi.py | 6 +- faststream/nats/publisher/usecase.py | 15 +- faststream/nats/router.py | 7 +- faststream/nats/subscriber/factory.py | 4 +- faststream/nats/subscriber/usecase.py | 22 +- faststream/rabbit/broker/broker.py | 3 +- faststream/rabbit/broker/registrator.py | 6 +- faststream/rabbit/fastapi/router.py | 4 +- faststream/rabbit/publisher/asyncapi.py | 6 +- faststream/rabbit/publisher/usecase.py | 15 +- faststream/rabbit/router.py | 17 +- faststream/rabbit/subscriber/factory.py | 4 +- faststream/rabbit/subscriber/usecase.py | 2 +- faststream/redis/broker/broker.py | 5 +- faststream/redis/broker/registrator.py | 6 +- faststream/redis/fastapi/fastapi.py | 6 +- faststream/redis/publisher/asyncapi.py | 6 +- faststream/redis/publisher/usecase.py | 52 +-- faststream/redis/router.py | 17 +- faststream/redis/subscriber/factory.py | 4 +- faststream/redis/subscriber/usecase.py | 16 +- tests/brokers/base/middlewares.py | 342 +++++++++++++++++++- tests/brokers/confluent/test_middlewares.py | 10 +- tests/brokers/kafka/test_middlewares.py | 10 +- tests/brokers/nats/test_middlewares.py | 10 +- tests/brokers/rabbit/test_middlewares.py | 10 +- tests/brokers/redis/test_middlewares.py | 10 +- 58 files changed, 685 insertions(+), 266 deletions(-) diff --git a/faststream/broker/core/abc.py b/faststream/broker/core/abc.py index eb1a49bb7b..c514814b96 100644 --- a/faststream/broker/core/abc.py +++ b/faststream/broker/core/abc.py @@ -6,6 +6,7 @@ Iterable, Mapping, Optional, + Sequence, ) from faststream.broker.types import MsgType @@ -30,7 +31,7 @@ def __init__( *, prefix: str, dependencies: Iterable["Depends"], - middlewares: Iterable["BrokerMiddleware[MsgType]"], + middlewares: Sequence["BrokerMiddleware[MsgType]"], parser: Optional["CustomCallable"], decoder: Optional["CustomCallable"], include_in_schema: Optional[bool], diff --git a/faststream/broker/core/usecase.py b/faststream/broker/core/usecase.py index 7bcf35f708..6da370b38d 100644 --- a/faststream/broker/core/usecase.py +++ b/faststream/broker/core/usecase.py @@ -75,7 +75,7 @@ def __init__( Doc("Dependencies to apply to all broker subscribers."), ], middlewares: Annotated[ - Iterable["BrokerMiddleware[MsgType]"], + Sequence["BrokerMiddleware[MsgType]"], Doc("Middlewares to apply to all broker publishers/subscribers."), ], graceful_timeout: Annotated[ @@ -342,7 +342,7 @@ async def publish( publish = producer.publish - for m in self._middlewares: + for m in self._middlewares[::-1]: publish = partial(m(None).publish_scope, publish) return await publish(msg, correlation_id=correlation_id, **kwargs) @@ -359,7 +359,7 @@ async def request( assert producer, NOT_CONNECTED_YET # nosec B101 request = producer.request - for m in self._middlewares: + for m in self._middlewares[::-1]: request = partial(m(None).publish_scope, request) published_msg = await request( @@ -370,7 +370,7 @@ async def request( async with AsyncExitStack() as stack: return_msg = return_input - for m in self._middlewares: + for m in self._middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) diff --git a/faststream/broker/fastapi/router.py b/faststream/broker/fastapi/router.py index b7249491ff..6df6a85825 100644 --- a/faststream/broker/fastapi/router.py +++ b/faststream/broker/fastapi/router.py @@ -105,7 +105,7 @@ class StreamRouter( def __init__( self, *connection_args: Any, - middlewares: Iterable["BrokerMiddleware[MsgType]"] = (), + middlewares: Sequence["BrokerMiddleware[MsgType]"] = (), prefix: str = "", tags: Optional[List[Union[str, Enum]]] = None, dependencies: Optional[Sequence["params.Depends"]] = None, diff --git a/faststream/broker/publisher/fake.py b/faststream/broker/publisher/fake.py index 7ef19903d3..83d681726b 100644 --- a/faststream/broker/publisher/fake.py +++ b/faststream/broker/publisher/fake.py @@ -1,6 +1,6 @@ from functools import partial from itertools import chain -from typing import TYPE_CHECKING, Any, Iterable, Optional +from typing import TYPE_CHECKING, Any, Optional, Sequence from faststream.broker.publisher.proto import BasePublisherProto @@ -17,7 +17,7 @@ def __init__( method: "AsyncFunc", *, publish_kwargs: "AnyDict", - middlewares: Iterable["PublisherMiddleware"] = (), + middlewares: Sequence["PublisherMiddleware"] = (), ) -> None: """Initialize an object.""" self.method = method @@ -29,7 +29,7 @@ async def publish( message: "SendableMessage", *, correlation_id: Optional[str] = None, - _extra_middlewares: Iterable["PublisherMiddleware"] = (), + _extra_middlewares: Sequence["PublisherMiddleware"] = (), **kwargs: Any, ) -> Any: """Publish a message.""" @@ -51,7 +51,7 @@ async def request( /, *, correlation_id: Optional[str] = None, - _extra_middlewares: Iterable["PublisherMiddleware"] = (), + _extra_middlewares: Sequence["PublisherMiddleware"] = (), ) -> Any: raise NotImplementedError( "`FakePublisher` can be used only to publish " diff --git a/faststream/broker/publisher/proto.py b/faststream/broker/publisher/proto.py index 67ef329f19..9bb1a7be97 100644 --- a/faststream/broker/publisher/proto.py +++ b/faststream/broker/publisher/proto.py @@ -1,5 +1,13 @@ from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Optional, Protocol +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Generic, + Optional, + Protocol, + Sequence, +) from typing_extensions import override @@ -53,7 +61,7 @@ async def publish( /, *, correlation_id: Optional[str] = None, - _extra_middlewares: Iterable["PublisherMiddleware"] = (), + _extra_middlewares: Sequence["PublisherMiddleware"] = (), ) -> Optional[Any]: """Publishes a message asynchronously.""" ... @@ -65,7 +73,7 @@ async def request( /, *, correlation_id: Optional[str] = None, - _extra_middlewares: Iterable["PublisherMiddleware"] = (), + _extra_middlewares: Sequence["PublisherMiddleware"] = (), ) -> Optional[Any]: """Publishes a message synchronously.""" ... @@ -79,8 +87,8 @@ class PublisherProto( ): schema_: Any - _broker_middlewares: Iterable["BrokerMiddleware[MsgType]"] - _middlewares: Iterable["PublisherMiddleware"] + _broker_middlewares: Sequence["BrokerMiddleware[MsgType]"] + _middlewares: Sequence["PublisherMiddleware"] _producer: Optional["ProducerProto"] @abstractmethod diff --git a/faststream/broker/publisher/usecase.py b/faststream/broker/publisher/usecase.py index 34a192670d..6ac51053c9 100644 --- a/faststream/broker/publisher/usecase.py +++ b/faststream/broker/publisher/usecase.py @@ -4,9 +4,9 @@ TYPE_CHECKING, Any, Callable, - Iterable, List, Optional, + Sequence, Tuple, ) from unittest.mock import MagicMock @@ -49,11 +49,11 @@ def __init__( self, *, broker_middlewares: Annotated[ - Iterable["BrokerMiddleware[MsgType]"], + Sequence["BrokerMiddleware[MsgType]"], Doc("Top-level middlewares to use in direct `.publish` call."), ], middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares."), ], # AsyncAPI args diff --git a/faststream/broker/router.py b/faststream/broker/router.py index dfab000974..e9dcb399d0 100644 --- a/faststream/broker/router.py +++ b/faststream/broker/router.py @@ -4,6 +4,7 @@ Callable, Iterable, Optional, + Sequence, ) from faststream.broker.core.abc import ABCBroker @@ -64,7 +65,7 @@ def __init__( # base options prefix: str, dependencies: Iterable["Depends"], - middlewares: Iterable["BrokerMiddleware[MsgType]"], + middlewares: Sequence["BrokerMiddleware[MsgType]"], parser: Optional["CustomCallable"], decoder: Optional["CustomCallable"], include_in_schema: Optional[bool], diff --git a/faststream/broker/subscriber/call_item.py b/faststream/broker/subscriber/call_item.py index c7c32b3609..a00d7e790c 100644 --- a/faststream/broker/subscriber/call_item.py +++ b/faststream/broker/subscriber/call_item.py @@ -9,6 +9,7 @@ Generic, Iterable, Optional, + Sequence, cast, ) @@ -54,7 +55,7 @@ def __init__( filter: "AsyncFilter[StreamMessage[MsgType]]", item_parser: Optional["CustomCallable"], item_decoder: Optional["CustomCallable"], - item_middlewares: Iterable["SubscriberMiddleware[StreamMessage[MsgType]]"], + item_middlewares: Sequence["SubscriberMiddleware[StreamMessage[MsgType]]"], dependencies: Iterable["Depends"], ) -> None: self.handler = handler @@ -157,7 +158,7 @@ async def call( """Execute wrapped handler with consume middlewares.""" call: AsyncFuncAny = self.handler.call_wrapped - for middleware in chain(self.item_middlewares, _extra_middlewares): + for middleware in chain(self.item_middlewares[::-1], _extra_middlewares): call = partial(middleware, call) try: diff --git a/faststream/broker/subscriber/proto.py b/faststream/broker/subscriber/proto.py index faae744ef0..be296eec17 100644 --- a/faststream/broker/subscriber/proto.py +++ b/faststream/broker/subscriber/proto.py @@ -1,5 +1,14 @@ from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Sequence, +) from typing_extensions import Self, override @@ -33,7 +42,7 @@ class SubscriberProto( running: bool _broker_dependencies: Iterable["Depends"] - _broker_middlewares: Iterable["BrokerMiddleware[MsgType]"] + _broker_middlewares: Sequence["BrokerMiddleware[MsgType]"] _producer: Optional["ProducerProto"] @abstractmethod @@ -98,6 +107,6 @@ def add_call( filter_: "Filter[Any]", parser_: "CustomCallable", decoder_: "CustomCallable", - middlewares_: Iterable["SubscriberMiddleware[Any]"], + middlewares_: Sequence["SubscriberMiddleware[Any]"], dependencies_: Iterable["Depends"], ) -> Self: ... diff --git a/faststream/broker/subscriber/usecase.py b/faststream/broker/subscriber/usecase.py index d9974efe43..9641a99513 100644 --- a/faststream/broker/subscriber/usecase.py +++ b/faststream/broker/subscriber/usecase.py @@ -10,6 +10,7 @@ Iterable, List, Optional, + Sequence, Tuple, Union, overload, @@ -53,11 +54,11 @@ class _CallOptions: __slots__ = ( - "filter", - "parser", "decoder", - "middlewares", "dependencies", + "filter", + "middlewares", + "parser", ) def __init__( @@ -66,7 +67,7 @@ def __init__( filter: "Filter[Any]", parser: Optional["CustomCallable"], decoder: Optional["CustomCallable"], - middlewares: Iterable["SubscriberMiddleware[Any]"], + middlewares: Sequence["SubscriberMiddleware[Any]"], dependencies: Iterable["Depends"], ) -> None: self.filter = filter @@ -98,7 +99,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], + broker_middlewares: Sequence["BrokerMiddleware[MsgType]"], default_parser: "AsyncCallable", default_decoder: "AsyncCallable", # AsyncAPI information @@ -211,7 +212,7 @@ def add_call( filter_: "Filter[Any]", parser_: Optional["CustomCallable"], decoder_: Optional["CustomCallable"], - middlewares_: Iterable["SubscriberMiddleware[Any]"], + middlewares_: Sequence["SubscriberMiddleware[Any]"], dependencies_: Iterable["Depends"], ) -> Self: self._call_options = _CallOptions( @@ -231,7 +232,7 @@ def __call__( filter: Optional["Filter[Any]"] = None, parser: Optional["CustomCallable"] = None, decoder: Optional["CustomCallable"] = None, - middlewares: Iterable["SubscriberMiddleware[Any]"] = (), + middlewares: Sequence["SubscriberMiddleware[Any]"] = (), dependencies: Iterable["Depends"] = (), ) -> Callable[ [Callable[P_HandlerParams, T_HandlerReturn]], @@ -246,7 +247,7 @@ def __call__( filter: Optional["Filter[Any]"] = None, parser: Optional["CustomCallable"] = None, decoder: Optional["CustomCallable"] = None, - middlewares: Iterable["SubscriberMiddleware[Any]"] = (), + middlewares: Sequence["SubscriberMiddleware[Any]"] = (), dependencies: Iterable["Depends"] = (), ) -> "HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]": ... @@ -257,7 +258,7 @@ def __call__( filter: Optional["Filter[Any]"] = None, parser: Optional["CustomCallable"] = None, decoder: Optional["CustomCallable"] = None, - middlewares: Iterable["SubscriberMiddleware[Any]"] = (), + middlewares: Sequence["SubscriberMiddleware[Any]"] = (), dependencies: Iterable["Depends"] = (), ) -> Any: if (options := self._call_options) is None: @@ -367,7 +368,9 @@ async def process_message(self, msg: MsgType) -> "Response": await h.call( message=message, # consumer middlewares - _extra_middlewares=(m.consume_scope for m in middlewares), + _extra_middlewares=( + m.consume_scope for m in middlewares[::-1] + ), ) ) @@ -382,7 +385,9 @@ async def process_message(self, msg: MsgType) -> "Response": result_msg.body, **result_msg.as_publish_kwargs(), # publisher middlewares - _extra_middlewares=(m.publish_scope for m in middlewares), + _extra_middlewares=[ + m.publish_scope for m in middlewares[::-1] + ], ) # Return data for tests diff --git a/faststream/broker/utils.py b/faststream/broker/utils.py index 067446de40..f605fc1d20 100644 --- a/faststream/broker/utils.py +++ b/faststream/broker/utils.py @@ -8,8 +8,8 @@ AsyncContextManager, Awaitable, Callable, - Iterable, Optional, + Sequence, Type, Union, cast, @@ -37,7 +37,7 @@ async def process_msg( msg: Optional[MsgType], - middlewares: Iterable["BrokerMiddleware[MsgType]"], + middlewares: Sequence["BrokerMiddleware[MsgType]"], parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]], decoder: Callable[["StreamMessage[MsgType]"], "Any"], ) -> Optional["StreamMessage[MsgType]"]: @@ -50,7 +50,7 @@ async def process_msg( Awaitable[StreamMessage[MsgType]], ] = return_input - for m in middlewares: + for m in middlewares[::-1]: mid = m(msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) diff --git a/faststream/broker/wrapper/proto.py b/faststream/broker/wrapper/proto.py index 725e31ceee..9dfcb98af5 100644 --- a/faststream/broker/wrapper/proto.py +++ b/faststream/broker/wrapper/proto.py @@ -5,6 +5,7 @@ Iterable, Optional, Protocol, + Sequence, Union, overload, ) @@ -35,7 +36,7 @@ def __call__( filter: Optional["Filter[Any]"] = None, parser: Optional["CustomCallable"] = None, decoder: Optional["CustomCallable"] = None, - middlewares: Iterable["SubscriberMiddleware[Any]"] = (), + middlewares: Sequence["SubscriberMiddleware[Any]"] = (), dependencies: Iterable["Depends"] = (), ) -> Callable[ [Callable[P_HandlerParams, T_HandlerReturn]], @@ -53,7 +54,7 @@ def __call__( filter: Optional["Filter[Any]"] = None, parser: Optional["CustomCallable"] = None, decoder: Optional["CustomCallable"] = None, - middlewares: Iterable["SubscriberMiddleware[Any]"] = (), + middlewares: Sequence["SubscriberMiddleware[Any]"] = (), dependencies: Iterable["Depends"] = (), ) -> "HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]": ... @@ -68,7 +69,7 @@ def __call__( filter: Optional["Filter[Any]"] = None, parser: Optional["CustomCallable"] = None, decoder: Optional["CustomCallable"] = None, - middlewares: Iterable["SubscriberMiddleware[Any]"] = (), + middlewares: Sequence["SubscriberMiddleware[Any]"] = (), dependencies: Iterable["Depends"] = (), ) -> Union[ "HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]", diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 329a9440de..201249349e 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -544,7 +544,7 @@ async def publish_batch( correlation_id = correlation_id or gen_cor_id() call: AsyncFunc = self._producer.publish_batch - for m in self._middlewares: + for m in self._middlewares[::-1]: call = partial(m(None).publish_scope, call) await call( diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index 38e99fc877..3d4301f7a4 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -295,7 +295,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -577,7 +577,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -859,7 +859,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -1144,7 +1144,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -1295,7 +1295,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -1369,7 +1369,7 @@ def publisher( ], # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -1443,7 +1443,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -1520,7 +1520,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args diff --git a/faststream/confluent/fastapi/fastapi.py b/faststream/confluent/fastapi/fastapi.py index bc2c2a1d71..88e3b970c0 100644 --- a/faststream/confluent/fastapi/fastapi.py +++ b/faststream/confluent/fastapi/fastapi.py @@ -267,7 +267,7 @@ def __init__( Doc("Custom parser object."), ] = None, middlewares: Annotated[ - Iterable[ + Sequence[ Union[ "BrokerMiddleware[Message]", "BrokerMiddleware[Tuple[Message, ...]]", @@ -834,7 +834,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -1239,7 +1239,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -1630,7 +1630,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -2038,7 +2038,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -2306,7 +2306,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -2380,7 +2380,7 @@ def publisher( ], # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -2454,7 +2454,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -2531,7 +2531,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args diff --git a/faststream/confluent/publisher/asyncapi.py b/faststream/confluent/publisher/asyncapi.py index f41834b9c2..552b792601 100644 --- a/faststream/confluent/publisher/asyncapi.py +++ b/faststream/confluent/publisher/asyncapi.py @@ -2,9 +2,9 @@ TYPE_CHECKING, Any, Dict, - Iterable, Literal, Optional, + Sequence, Tuple, Union, cast, @@ -72,8 +72,8 @@ def create( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[ConfluentMsg]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[ConfluentMsg]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -92,8 +92,8 @@ def create( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -113,10 +113,10 @@ def create( reply_to: str, # Publisher args broker_middlewares: Union[ - Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], - Iterable["BrokerMiddleware[ConfluentMsg]"], + Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + Sequence["BrokerMiddleware[ConfluentMsg]"], ], - middlewares: Iterable["PublisherMiddleware"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -139,10 +139,10 @@ def create( reply_to: str, # Publisher args broker_middlewares: Union[ - Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], - Iterable["BrokerMiddleware[ConfluentMsg]"], + Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + Sequence["BrokerMiddleware[ConfluentMsg]"], ], - middlewares: Iterable["PublisherMiddleware"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -162,7 +162,7 @@ def create( headers=headers, reply_to=reply_to, broker_middlewares=cast( - Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], broker_middlewares, ), middlewares=middlewares, @@ -180,7 +180,7 @@ def create( headers=headers, reply_to=reply_to, broker_middlewares=cast( - Iterable["BrokerMiddleware[ConfluentMsg]"], broker_middlewares + Sequence["BrokerMiddleware[ConfluentMsg]"], broker_middlewares ), middlewares=middlewares, schema_=schema_, diff --git a/faststream/confluent/publisher/usecase.py b/faststream/confluent/publisher/usecase.py index 0f7712139f..3601b2a018 100644 --- a/faststream/confluent/publisher/usecase.py +++ b/faststream/confluent/publisher/usecase.py @@ -9,6 +9,7 @@ Dict, Iterable, Optional, + Sequence, Tuple, Union, cast, @@ -43,8 +44,8 @@ def __init__( headers: Optional[Dict[str, str]], reply_to: Optional[str], # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[MsgType]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -105,11 +106,11 @@ async def request( request: AsyncFunc = self._producer.request for pub_m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): request = partial(pub_m, request) @@ -117,7 +118,7 @@ async def request( async with AsyncExitStack() as stack: return_msg: Callable[[KafkaMessage], Awaitable[KafkaMessage]] = return_input - for m in self._broker_middlewares: + for m in self._broker_middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) @@ -140,8 +141,8 @@ def __init__( headers: Optional[Dict[str, str]], reply_to: Optional[str], # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[Message]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[Message]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -198,11 +199,11 @@ async def publish( call: AsyncFunc = self._producer.publish for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -273,11 +274,11 @@ async def publish( call: AsyncFunc = self._producer.publish_batch for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) diff --git a/faststream/confluent/router.py b/faststream/confluent/router.py index 76364390ea..cdb7a6772a 100644 --- a/faststream/confluent/router.py +++ b/faststream/confluent/router.py @@ -88,7 +88,7 @@ def __init__( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -378,7 +378,7 @@ def __init__( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -492,7 +492,7 @@ def __init__( ), ] = (), middlewares: Annotated[ - Iterable[ + Sequence[ Union[ "BrokerMiddleware[Message]", "BrokerMiddleware[Tuple[Message, ...]]", diff --git a/faststream/confluent/subscriber/factory.py b/faststream/confluent/subscriber/factory.py index b0c72deb8d..e2585d76b2 100644 --- a/faststream/confluent/subscriber/factory.py +++ b/faststream/confluent/subscriber/factory.py @@ -40,7 +40,7 @@ def create_subscriber( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + broker_middlewares: Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -64,7 +64,7 @@ def create_subscriber( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[ConfluentMsg]"], + broker_middlewares: Sequence["BrokerMiddleware[ConfluentMsg]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -89,8 +89,8 @@ def create_subscriber( retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Union[ - Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], - Iterable["BrokerMiddleware[ConfluentMsg]"], + Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + Sequence["BrokerMiddleware[ConfluentMsg]"], ], # AsyncAPI args title_: Optional[str], @@ -118,8 +118,8 @@ def create_subscriber( retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Union[ - Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], - Iterable["BrokerMiddleware[ConfluentMsg]"], + Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + Sequence["BrokerMiddleware[ConfluentMsg]"], ], # AsyncAPI args title_: Optional[str], @@ -143,7 +143,7 @@ def create_subscriber( retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=cast( - Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], + Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], broker_middlewares, ), title_=title_, @@ -163,7 +163,8 @@ def create_subscriber( retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=cast( - Iterable["BrokerMiddleware[ConfluentMsg]"], broker_middlewares + Sequence["BrokerMiddleware[ConfluentMsg]"], + broker_middlewares, ), title_=title_, description_=description_, diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index 2e11b9851a..0045163938 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -65,7 +65,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], + broker_middlewares: Sequence["BrokerMiddleware[MsgType]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -286,7 +286,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[Message]"], + broker_middlewares: Sequence["BrokerMiddleware[Message]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -350,7 +350,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[Tuple[Message, ...]]"], + broker_middlewares: Sequence["BrokerMiddleware[Tuple[Message, ...]]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index e3a5ec4e67..86123612f5 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -9,6 +9,7 @@ List, Literal, Optional, + Sequence, Tuple, Type, TypeVar, @@ -440,7 +441,7 @@ def __init__( Doc("Dependencies to apply to all broker subscribers."), ] = (), middlewares: Annotated[ - Iterable[ + Sequence[ Union[ "BrokerMiddleware[ConsumerRecord]", "BrokerMiddleware[Tuple[ConsumerRecord, ...]]", @@ -877,7 +878,7 @@ async def publish_batch( call: AsyncFunc = self._producer.publish_batch - for m in self._middlewares: + for m in self._middlewares[::-1]: call = partial(m(None).publish_scope, call) await call( diff --git a/faststream/kafka/broker/registrator.py b/faststream/kafka/broker/registrator.py index 0c2c3e1ce2..45ef5b5a64 100644 --- a/faststream/kafka/broker/registrator.py +++ b/faststream/kafka/broker/registrator.py @@ -404,7 +404,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -785,7 +785,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -1166,7 +1166,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -1550,7 +1550,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), max_workers: Annotated[ @@ -1725,7 +1725,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -1799,7 +1799,7 @@ def publisher( ], # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -1873,7 +1873,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -1950,7 +1950,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args diff --git a/faststream/kafka/fastapi/fastapi.py b/faststream/kafka/fastapi/fastapi.py index 5bad796902..0ce8e3ac3a 100644 --- a/faststream/kafka/fastapi/fastapi.py +++ b/faststream/kafka/fastapi/fastapi.py @@ -276,7 +276,7 @@ def __init__( Doc("Custom parser object."), ] = None, middlewares: Annotated[ - Iterable[ + Sequence[ Union[ "BrokerMiddleware[ConsumerRecord]", "BrokerMiddleware[Tuple[ConsumerRecord, ...]]", @@ -946,7 +946,7 @@ def subscriber( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -2736,7 +2736,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -2810,7 +2810,7 @@ def publisher( ], # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -2884,7 +2884,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -2961,7 +2961,7 @@ def publisher( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args diff --git a/faststream/kafka/publisher/asyncapi.py b/faststream/kafka/publisher/asyncapi.py index 623f9a2761..7830c01807 100644 --- a/faststream/kafka/publisher/asyncapi.py +++ b/faststream/kafka/publisher/asyncapi.py @@ -2,9 +2,9 @@ TYPE_CHECKING, Any, Dict, - Iterable, Literal, Optional, + Sequence, Tuple, Union, overload, @@ -71,8 +71,8 @@ def create( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[Tuple[ConsumerRecord, ...]]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[Tuple[ConsumerRecord, ...]]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -91,8 +91,8 @@ def create( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[ConsumerRecord]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -111,10 +111,10 @@ def create( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable[ + broker_middlewares: Sequence[ "BrokerMiddleware[Union[Tuple[ConsumerRecord, ...], ConsumerRecord]]" ], - middlewares: Iterable["PublisherMiddleware"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -136,10 +136,10 @@ def create( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable[ + broker_middlewares: Sequence[ "BrokerMiddleware[Union[Tuple[ConsumerRecord, ...], ConsumerRecord]]" ], - middlewares: Iterable["PublisherMiddleware"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], diff --git a/faststream/kafka/publisher/usecase.py b/faststream/kafka/publisher/usecase.py index aa95525254..5c5f34efe1 100644 --- a/faststream/kafka/publisher/usecase.py +++ b/faststream/kafka/publisher/usecase.py @@ -9,6 +9,7 @@ Dict, Iterable, Optional, + Sequence, Tuple, Union, cast, @@ -43,8 +44,8 @@ def __init__( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[MsgType]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -149,11 +150,11 @@ async def request( request: AsyncFunc = self._producer.request for pub_m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): request = partial(pub_m, request) @@ -170,7 +171,7 @@ async def request( async with AsyncExitStack() as stack: return_msg: Callable[[KafkaMessage], Awaitable[KafkaMessage]] = return_input - for m in self._broker_middlewares: + for m in self._broker_middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) @@ -193,8 +194,8 @@ def __init__( headers: Optional[Dict[str, str]], reply_to: str, # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[ConsumerRecord]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -299,11 +300,11 @@ async def publish( call: AsyncFunc = self._producer.publish for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -473,11 +474,11 @@ async def publish( call: AsyncFunc = self._producer.publish_batch for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) diff --git a/faststream/kafka/router.py b/faststream/kafka/router.py index 102240e2ca..71638c140e 100644 --- a/faststream/kafka/router.py +++ b/faststream/kafka/router.py @@ -90,7 +90,7 @@ def __init__( ] = False, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -481,7 +481,7 @@ def __init__( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[KafkaMessage]"], + Sequence["SubscriberMiddleware[KafkaMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -607,7 +607,7 @@ def __init__( ), ] = (), middlewares: Annotated[ - Iterable[ + Sequence[ Union[ "BrokerMiddleware[ConsumerRecord]", "BrokerMiddleware[Tuple[ConsumerRecord, ...]]", diff --git a/faststream/kafka/subscriber/factory.py b/faststream/kafka/subscriber/factory.py index 162866cf39..74c3f78dbe 100644 --- a/faststream/kafka/subscriber/factory.py +++ b/faststream/kafka/subscriber/factory.py @@ -3,6 +3,7 @@ Iterable, Literal, Optional, + Sequence, Tuple, Union, overload, @@ -43,7 +44,7 @@ def create_subscriber( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[Tuple[ConsumerRecord, ...]]"], + broker_middlewares: Sequence["BrokerMiddleware[Tuple[ConsumerRecord, ...]]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -70,7 +71,7 @@ def create_subscriber( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[ConsumerRecord]"], + broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -97,7 +98,7 @@ def create_subscriber( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable[ + broker_middlewares: Sequence[ "BrokerMiddleware[Union[ConsumerRecord, Tuple[ConsumerRecord, ...]]]" ], # AsyncAPI args @@ -128,7 +129,7 @@ def create_subscriber( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable[ + broker_middlewares: Sequence[ "BrokerMiddleware[Union[ConsumerRecord, Tuple[ConsumerRecord, ...]]]" ], # AsyncAPI args diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index 2bd4d7162e..ea9c3e7b0f 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -69,7 +69,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], + broker_middlewares: Sequence["BrokerMiddleware[MsgType]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -309,7 +309,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[ConsumerRecord]"], + broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -390,7 +390,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable[ + broker_middlewares: Sequence[ "BrokerMiddleware[Sequence[Tuple[ConsumerRecord, ...]]]" ], # AsyncAPI args @@ -485,7 +485,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[ConsumerRecord]"], + broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], diff --git a/faststream/nats/broker/broker.py b/faststream/nats/broker/broker.py index 3653727da2..bbf718f232 100644 --- a/faststream/nats/broker/broker.py +++ b/faststream/nats/broker/broker.py @@ -8,6 +8,7 @@ Iterable, List, Optional, + Sequence, Type, Union, ) @@ -394,7 +395,7 @@ def __init__( Doc("Dependencies to apply to all broker subscribers."), ] = (), middlewares: Annotated[ - Iterable["BrokerMiddleware[Msg]"], + Sequence["BrokerMiddleware[Msg]"], Doc("Middlewares to apply to all broker publishers/subscribers."), ] = (), # AsyncAPI args diff --git a/faststream/nats/broker/registrator.py b/faststream/nats/broker/registrator.py index 8be4f57803..ad9cee7404 100644 --- a/faststream/nats/broker/registrator.py +++ b/faststream/nats/broker/registrator.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Sequence, Union, cast from nats.js import api from typing_extensions import Annotated, Doc, deprecated, override @@ -153,7 +153,7 @@ def subscriber( # type: ignore[override] Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[NatsMessage]"], + Sequence["SubscriberMiddleware[NatsMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -294,7 +294,7 @@ def publisher( # type: ignore[override] ] = None, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information @@ -362,7 +362,7 @@ def include_router( # type: ignore[override] *, prefix: str = "", dependencies: Iterable["Depends"] = (), - middlewares: Iterable["BrokerMiddleware[Msg]"] = (), + middlewares: Sequence["BrokerMiddleware[Msg]"] = (), include_in_schema: Optional[bool] = None, ) -> None: sub_streams = router._stream_builder.objects.copy() diff --git a/faststream/nats/fastapi/fastapi.py b/faststream/nats/fastapi/fastapi.py index 6701ef3697..d1a8ce5846 100644 --- a/faststream/nats/fastapi/fastapi.py +++ b/faststream/nats/fastapi/fastapi.py @@ -237,7 +237,7 @@ def __init__( Doc("Custom parser object."), ] = None, middlewares: Annotated[ - Iterable["BrokerMiddleware[Msg]"], + Sequence["BrokerMiddleware[Msg]"], Doc("Middlewares to apply to all broker publishers/subscribers."), ] = (), # AsyncAPI args @@ -693,7 +693,7 @@ def subscriber( # type: ignore[override] Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[NatsMessage]"], + Sequence["SubscriberMiddleware[NatsMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -945,7 +945,7 @@ def publisher( # type: ignore[override] ] = None, # specific middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information diff --git a/faststream/nats/publisher/asyncapi.py b/faststream/nats/publisher/asyncapi.py index 1546b675f8..7ce50295d7 100644 --- a/faststream/nats/publisher/asyncapi.py +++ b/faststream/nats/publisher/asyncapi.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence from typing_extensions import override @@ -60,8 +60,8 @@ def create( # type: ignore[override] stream: Optional["JStream"], timeout: Optional[float], # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], diff --git a/faststream/nats/publisher/usecase.py b/faststream/nats/publisher/usecase.py index 8d74bbbe4b..eaa014ce75 100644 --- a/faststream/nats/publisher/usecase.py +++ b/faststream/nats/publisher/usecase.py @@ -9,6 +9,7 @@ Dict, Iterable, Optional, + Sequence, Union, ) @@ -42,8 +43,8 @@ def __init__( stream: Optional["JStream"], timeout: Optional[float], # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -129,11 +130,11 @@ async def publish( call: AsyncFunc = self._producer.publish for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -190,11 +191,11 @@ async def request( request: AsyncFunc = self._producer.request for pub_m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): request = partial(pub_m, request) @@ -205,7 +206,7 @@ async def request( async with AsyncExitStack() as stack: return_msg: Callable[[NatsMessage], Awaitable[NatsMessage]] = return_input - for m in self._broker_middlewares: + for m in self._broker_middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) diff --git a/faststream/nats/router.py b/faststream/nats/router.py index ed838f133a..b9e029c594 100644 --- a/faststream/nats/router.py +++ b/faststream/nats/router.py @@ -6,6 +6,7 @@ Dict, Iterable, Optional, + Sequence, Union, ) @@ -71,7 +72,7 @@ def __init__( ] = None, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information @@ -242,7 +243,7 @@ def __init__( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[NatsMessage]"], + Sequence["SubscriberMiddleware[NatsMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -354,7 +355,7 @@ def __init__( ), ] = (), middlewares: Annotated[ - Iterable["BrokerMiddleware[Msg]"], + Sequence["BrokerMiddleware[Msg]"], Doc("Router middlewares to apply to all routers' publishers/subscribers."), ] = (), parser: Annotated[ diff --git a/faststream/nats/subscriber/factory.py b/faststream/nats/subscriber/factory.py index 5adf4af55b..4d42041743 100644 --- a/faststream/nats/subscriber/factory.py +++ b/faststream/nats/subscriber/factory.py @@ -1,5 +1,5 @@ import warnings -from typing import TYPE_CHECKING, Any, Iterable, Optional, Union +from typing import TYPE_CHECKING, Any, Iterable, Optional, Sequence, Union from nats.aio.subscription import ( DEFAULT_SUB_PENDING_BYTES_LIMIT, @@ -63,7 +63,7 @@ def create_subscriber( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[Any]"], + broker_middlewares: Sequence["BrokerMiddleware[Any]"], # AsyncAPI information title_: Optional[str], description_: Optional[str], diff --git a/faststream/nats/subscriber/usecase.py b/faststream/nats/subscriber/usecase.py index 5d16299f14..8b960a5a74 100644 --- a/faststream/nats/subscriber/usecase.py +++ b/faststream/nats/subscriber/usecase.py @@ -88,7 +88,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], + broker_middlewares: Sequence["BrokerMiddleware[MsgType]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -263,7 +263,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], + broker_middlewares: Sequence["BrokerMiddleware[MsgType]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -336,7 +336,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -448,7 +448,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -510,7 +510,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -644,7 +644,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -711,7 +711,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -795,7 +795,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[Msg]"], + broker_middlewares: Sequence["BrokerMiddleware[Msg]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -864,7 +864,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[List[Msg]]"], + broker_middlewares: Sequence["BrokerMiddleware[List[Msg]]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -979,7 +979,7 @@ def __init__( config: "ConsumerConfig", kv_watch: "KvWatch", broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[KeyValue.Entry]"], + broker_middlewares: Sequence["BrokerMiddleware[KeyValue.Entry]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -1135,7 +1135,7 @@ def __init__( config: "ConsumerConfig", obj_watch: "ObjWatch", broker_dependencies: Iterable[Depends], - broker_middlewares: Iterable["BrokerMiddleware[List[Msg]]"], + broker_middlewares: Sequence["BrokerMiddleware[List[Msg]]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 38daf8336d..2e650372d2 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -5,6 +5,7 @@ Callable, Iterable, Optional, + Sequence, Type, Union, cast, @@ -165,7 +166,7 @@ def __init__( Doc("Dependencies to apply to all broker subscribers."), ] = (), middlewares: Annotated[ - Iterable["BrokerMiddleware[IncomingMessage]"], + Sequence["BrokerMiddleware[IncomingMessage]"], Doc("Middlewares to apply to all broker publishers/subscribers."), ] = (), # AsyncAPI args diff --git a/faststream/rabbit/broker/registrator.py b/faststream/rabbit/broker/registrator.py index 8c6b0ba99c..6c6ff357d6 100644 --- a/faststream/rabbit/broker/registrator.py +++ b/faststream/rabbit/broker/registrator.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Sequence, Union, cast from typing_extensions import Annotated, Doc, deprecated, override @@ -81,7 +81,7 @@ def subscriber( # type: ignore[override] Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[RabbitMessage]"], + Sequence["SubscriberMiddleware[RabbitMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -209,7 +209,7 @@ def publisher( # type: ignore[override] ] = None, # specific middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information diff --git a/faststream/rabbit/fastapi/router.py b/faststream/rabbit/fastapi/router.py index b634c2738f..030b460907 100644 --- a/faststream/rabbit/fastapi/router.py +++ b/faststream/rabbit/fastapi/router.py @@ -154,7 +154,7 @@ def __init__( Doc("Custom parser object."), ] = None, middlewares: Annotated[ - Iterable["BrokerMiddleware[IncomingMessage]"], + Sequence["BrokerMiddleware[IncomingMessage]"], Doc("Middlewares to apply to all broker publishers/subscribers."), ] = (), # AsyncAPI args @@ -773,7 +773,7 @@ def publisher( ] = None, # specific middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information diff --git a/faststream/rabbit/publisher/asyncapi.py b/faststream/rabbit/publisher/asyncapi.py index 415365481d..d8328ab05c 100644 --- a/faststream/rabbit/publisher/asyncapi.py +++ b/faststream/rabbit/publisher/asyncapi.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence from typing_extensions import override @@ -113,8 +113,8 @@ def create( # type: ignore[override] exchange: "RabbitExchange", message_kwargs: "PublishKwargs", # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[IncomingMessage]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], diff --git a/faststream/rabbit/publisher/usecase.py b/faststream/rabbit/publisher/usecase.py index 041991542a..303b71503d 100644 --- a/faststream/rabbit/publisher/usecase.py +++ b/faststream/rabbit/publisher/usecase.py @@ -9,6 +9,7 @@ Callable, Iterable, Optional, + Sequence, Union, ) @@ -126,8 +127,8 @@ def __init__( exchange: "RabbitExchange", message_kwargs: "PublishKwargs", # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[IncomingMessage]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -277,11 +278,11 @@ async def publish( call: AsyncFunc = self._producer.publish for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -349,11 +350,11 @@ async def request( request: AsyncFunc = self._producer.request for pub_m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): request = partial(pub_m, request) @@ -366,7 +367,7 @@ async def request( return_msg: Callable[[RabbitMessage], Awaitable[RabbitMessage]] = ( return_input ) - for m in self._broker_middlewares: + for m in self._broker_middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) diff --git a/faststream/rabbit/router.py b/faststream/rabbit/router.py index 7e1986c8c6..8a9fdd87e6 100644 --- a/faststream/rabbit/router.py +++ b/faststream/rabbit/router.py @@ -1,4 +1,13 @@ -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, Optional, Union +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Iterable, + Optional, + Sequence, + Union, +) from typing_extensions import Annotated, Doc, deprecated @@ -86,7 +95,7 @@ def __init__( ] = None, # basic args middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI args @@ -233,7 +242,7 @@ def __init__( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[RabbitMessage]"], + Sequence["SubscriberMiddleware[RabbitMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -323,7 +332,7 @@ def __init__( ), ] = (), middlewares: Annotated[ - Iterable["BrokerMiddleware[IncomingMessage]"], + Sequence["BrokerMiddleware[IncomingMessage]"], Doc("Router middlewares to apply to all routers' publishers/subscribers."), ] = (), parser: Annotated[ diff --git a/faststream/rabbit/subscriber/factory.py b/faststream/rabbit/subscriber/factory.py index e3a79bb3a7..abf1cd8351 100644 --- a/faststream/rabbit/subscriber/factory.py +++ b/faststream/rabbit/subscriber/factory.py @@ -1,5 +1,5 @@ import warnings -from typing import TYPE_CHECKING, Iterable, Optional, Union +from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber @@ -23,7 +23,7 @@ def create_subscriber( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], + broker_middlewares: Sequence["BrokerMiddleware[IncomingMessage]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], diff --git a/faststream/rabbit/subscriber/usecase.py b/faststream/rabbit/subscriber/usecase.py index 5ed53edf24..0401879237 100644 --- a/faststream/rabbit/subscriber/usecase.py +++ b/faststream/rabbit/subscriber/usecase.py @@ -62,7 +62,7 @@ def __init__( no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], + broker_middlewares: Sequence["BrokerMiddleware[IncomingMessage]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index fdafa04c02..cd6661c282 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -7,6 +7,7 @@ Iterable, Mapping, Optional, + Sequence, Type, Union, ) @@ -134,7 +135,7 @@ def __init__( Doc("Dependencies to apply to all broker subscribers."), ] = (), middlewares: Annotated[ - Iterable["BrokerMiddleware[BaseMessage]"], + Sequence["BrokerMiddleware[BaseMessage]"], Doc("Middlewares to apply to all broker publishers/subscribers."), ] = (), # AsyncAPI args @@ -509,7 +510,7 @@ async def publish_batch( call: AsyncFunc = self._producer.publish_batch - for m in self._middlewares: + for m in self._middlewares[::-1]: call = partial(m(None).publish_scope, call) await call( diff --git a/faststream/redis/broker/registrator.py b/faststream/redis/broker/registrator.py index d02d1f1762..ef4161ea40 100644 --- a/faststream/redis/broker/registrator.py +++ b/faststream/redis/broker/registrator.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Sequence, Union, cast from typing_extensions import Annotated, Doc, deprecated, override @@ -62,7 +62,7 @@ def subscriber( # type: ignore[override] Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[UnifyRedisMessage]"], + Sequence["SubscriberMiddleware[UnifyRedisMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -164,7 +164,7 @@ def publisher( # type: ignore[override] Doc("Reply message destination PubSub object name."), ] = "", middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information diff --git a/faststream/redis/fastapi/fastapi.py b/faststream/redis/fastapi/fastapi.py index 85ce2bf1e7..cef4084784 100644 --- a/faststream/redis/fastapi/fastapi.py +++ b/faststream/redis/fastapi/fastapi.py @@ -103,7 +103,7 @@ def __init__( Doc("Custom parser object."), ] = None, middlewares: Annotated[ - Iterable["BrokerMiddleware[UnifyRedisDict]"], + Sequence["BrokerMiddleware[UnifyRedisDict]"], Doc("Middlewares to apply to all broker publishers/subscribers."), ] = (), # AsyncAPI args @@ -464,7 +464,7 @@ def subscriber( # type: ignore[override] Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[UnifyRedisMessage]"], + Sequence["SubscriberMiddleware[UnifyRedisMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -687,7 +687,7 @@ def publisher( Doc("Reply message destination PubSub object name."), ] = "", middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information diff --git a/faststream/redis/publisher/asyncapi.py b/faststream/redis/publisher/asyncapi.py index f79709667c..fe1d4d7a90 100644 --- a/faststream/redis/publisher/asyncapi.py +++ b/faststream/redis/publisher/asyncapi.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union from typing_extensions import TypeAlias, override @@ -68,8 +68,8 @@ def create( # type: ignore[override] stream: Union["StreamSub", str, None], headers: Optional["AnyDict"], reply_to: str, - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args title_: Optional[str], description_: Optional[str], diff --git a/faststream/redis/publisher/usecase.py b/faststream/redis/publisher/usecase.py index f517dbee5f..130f5b4f6b 100644 --- a/faststream/redis/publisher/usecase.py +++ b/faststream/redis/publisher/usecase.py @@ -3,7 +3,7 @@ from copy import deepcopy from functools import partial from itertools import chain -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, Optional +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, Optional, Sequence from typing_extensions import Annotated, Doc, deprecated, override @@ -32,8 +32,8 @@ def __init__( reply_to: str, headers: Optional["AnyDict"], # Publisher args - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI args schema_: Optional[Any], title_: Optional[str], @@ -68,8 +68,8 @@ def __init__( reply_to: str, headers: Optional["AnyDict"], # Regular publisher options - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI options schema_: Optional[Any], title_: Optional[str], @@ -180,11 +180,11 @@ async def publish( call: AsyncFunc = self._producer.publish for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -246,11 +246,11 @@ async def request( request: AsyncFunc = self._producer.request for pub_m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): request = partial(pub_m, request) @@ -261,7 +261,7 @@ async def request( async with AsyncExitStack() as stack: return_msg: Callable[[RedisMessage], Awaitable[RedisMessage]] = return_input - for m in self._broker_middlewares: + for m in self._broker_middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) @@ -282,8 +282,8 @@ def __init__( reply_to: str, headers: Optional["AnyDict"], # Regular publisher options - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI options schema_: Optional[Any], title_: Optional[str], @@ -393,11 +393,11 @@ async def publish( call: AsyncFunc = self._producer.publish for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -460,11 +460,11 @@ async def request( request: AsyncFunc = self._producer.request for pub_m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): request = partial(pub_m, request) @@ -475,7 +475,7 @@ async def request( async with AsyncExitStack() as stack: return_msg: Callable[[RedisMessage], Awaitable[RedisMessage]] = return_input - for m in self._broker_middlewares: + for m in self._broker_middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) @@ -524,11 +524,11 @@ async def publish( # type: ignore[override] call: AsyncFunc = self._producer.publish_batch for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -548,8 +548,8 @@ def __init__( reply_to: str, headers: Optional["AnyDict"], # Regular publisher options - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], - middlewares: Iterable["PublisherMiddleware"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], + middlewares: Sequence["PublisherMiddleware"], # AsyncAPI options schema_: Optional[Any], title_: Optional[str], @@ -667,11 +667,11 @@ async def publish( call: AsyncFunc = self._producer.publish for m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): call = partial(m, call) @@ -742,11 +742,11 @@ async def request( request: AsyncFunc = self._producer.request for pub_m in chain( + self._middlewares[::-1], ( _extra_middlewares - or (m(None).publish_scope for m in self._broker_middlewares) + or (m(None).publish_scope for m in self._broker_middlewares[::-1]) ), - self._middlewares, ): request = partial(pub_m, request) @@ -757,7 +757,7 @@ async def request( async with AsyncExitStack() as stack: return_msg: Callable[[RedisMessage], Awaitable[RedisMessage]] = return_input - for m in self._broker_middlewares: + for m in self._broker_middlewares[::-1]: mid = m(published_msg) await stack.enter_async_context(mid) return_msg = partial(mid.consume_scope, return_msg) diff --git a/faststream/redis/router.py b/faststream/redis/router.py index 38964c2c59..ab625f8711 100644 --- a/faststream/redis/router.py +++ b/faststream/redis/router.py @@ -1,4 +1,13 @@ -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, Optional, Union +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Iterable, + Optional, + Sequence, + Union, +) from typing_extensions import Annotated, Doc, deprecated @@ -55,7 +64,7 @@ def __init__( Doc("Reply message destination PubSub object name."), ] = "", middlewares: Annotated[ - Iterable["PublisherMiddleware"], + Sequence["PublisherMiddleware"], Doc("Publisher middlewares to wrap outgoing messages."), ] = (), # AsyncAPI information @@ -141,7 +150,7 @@ def __init__( Doc("Function to decode FastStream msg bytes body to python objects."), ] = None, middlewares: Annotated[ - Iterable["SubscriberMiddleware[UnifyRedisMessage]"], + Sequence["SubscriberMiddleware[UnifyRedisMessage]"], Doc("Subscriber middlewares to wrap incoming message processing."), ] = (), filter: Annotated[ @@ -230,7 +239,7 @@ def __init__( ), ] = (), middlewares: Annotated[ - Iterable["BrokerMiddleware[BaseMessage]"], + Sequence["BrokerMiddleware[BaseMessage]"], Doc("Router middlewares to apply to all routers' publishers/subscribers."), ] = (), parser: Annotated[ diff --git a/faststream/redis/subscriber/factory.py b/faststream/redis/subscriber/factory.py index ee0ae84c9b..9a43d054e8 100644 --- a/faststream/redis/subscriber/factory.py +++ b/faststream/redis/subscriber/factory.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Iterable, Optional, Union +from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union from typing_extensions import TypeAlias @@ -38,7 +38,7 @@ def create_subscriber( no_reply: bool = False, retry: bool = False, broker_dependencies: Iterable["Depends"] = (), - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"] = (), + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"] = (), # AsyncAPI args title_: Optional[str] = None, description_: Optional[str] = None, diff --git a/faststream/redis/subscriber/usecase.py b/faststream/redis/subscriber/usecase.py index a67095e986..5215838a49 100644 --- a/faststream/redis/subscriber/usecase.py +++ b/faststream/redis/subscriber/usecase.py @@ -77,7 +77,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -229,7 +229,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -355,7 +355,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -460,7 +460,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -509,7 +509,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -563,7 +563,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -752,7 +752,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], @@ -821,7 +821,7 @@ def __init__( no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], - broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], + broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"], # AsyncAPI args title_: Optional[str], description_: Optional[str], diff --git a/tests/brokers/base/middlewares.py b/tests/brokers/base/middlewares.py index 07560708ef..94a8cf5afb 100644 --- a/tests/brokers/base/middlewares.py +++ b/tests/brokers/base/middlewares.py @@ -13,6 +13,319 @@ from .basic import BaseTestcaseConfig +@pytest.mark.asyncio +class MiddlewaresOrderTestcase(BaseTestcaseConfig): + broker_class: Type[BrokerUsecase] + + def patch_broker(self, broker: BrokerUsecase) -> BrokerUsecase: + return broker + + async def test_broker_middleware_order( + self, event: asyncio.Event, queue: str, mock: Mock + ): + class InnerMiddleware(BaseMiddleware): + async def __aenter__(self): + mock.enter_inner() + mock.enter("inner") + + async def __aexit__(self, *args): + mock.exit_inner() + mock.exit("inner") + + async def consume_scope(self, call_next, msg): + mock.consume_inner() + mock.sub("inner") + return await call_next(msg) + + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_inner() + mock.pub("inner") + return await call_next(msg, *args, **kwargs) + + class OuterMiddleware(BaseMiddleware): + async def __aenter__(self): + mock.enter_outer() + mock.enter("outer") + + async def __aexit__(self, *args): + mock.exit_outer() + mock.exit("outer") + + async def consume_scope(self, call_next, msg): + mock.consume_outer() + mock.sub("outer") + return await call_next(msg) + + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_outer() + mock.pub("outer") + return await call_next(msg, *args, **kwargs) + + broker = self.broker_class( + middlewares=[OuterMiddleware, InnerMiddleware], + ) + + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) + async def handler(msg): + event.set() + + async with self.patch_broker(broker) as br: + await br.start() + await asyncio.wait( + ( + asyncio.create_task(broker.publish("start", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + assert event.is_set() + mock.consume_inner.assert_called_once() + mock.consume_outer.assert_called_once() + mock.publish_inner.assert_called_once() + mock.publish_outer.assert_called_once() + mock.enter_inner.assert_called_once() + mock.enter_outer.assert_called_once() + mock.exit_inner.assert_called_once() + mock.exit_outer.assert_called_once() + + assert [c.args[0] for c in mock.sub.call_args_list] == [ + "outer", + "inner", + ], mock.sub.call_args_list + assert [c.args[0] for c in mock.pub.call_args_list] == [ + "outer", + "inner", + ], mock.pub.call_args_list + assert [c.args[0] for c in mock.enter.call_args_list] == [ + "outer", + "inner", + ], mock.enter.call_args_list + assert [c.args[0] for c in mock.exit.call_args_list] == [ + "inner", + "outer", + ], mock.exit.call_args_list + + async def test_publisher_middleware_order( + self, event: asyncio.Event, queue: str, mock: Mock + ): + class InnerMiddleware(BaseMiddleware): + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_inner() + mock("inner") + return await call_next(msg, *args, **kwargs) + + class MiddleMiddleware(BaseMiddleware): + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_middle() + mock("middle") + return await call_next(msg, *args, **kwargs) + + class OuterMiddleware(BaseMiddleware): + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_outer() + mock("outer") + return await call_next(msg, *args, **kwargs) + + broker = self.broker_class( + middlewares=[OuterMiddleware], + ) + publisher = broker.publisher( + queue, + middlewares=[ + MiddleMiddleware(None).publish_scope, + InnerMiddleware(None).publish_scope, + ], + ) + + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) + async def handler(msg): + event.set() + + async with self.patch_broker(broker) as br: + await br.start() + await publisher.publish(None, queue) + + mock.publish_inner.assert_called_once() + mock.publish_middle.assert_called_once() + mock.publish_outer.assert_called_once() + assert [c.args[0] for c in mock.call_args_list] == [ + "outer", + "middle", + "inner", + ], mock.call_args_list + + async def test_publisher_with_router_middleware_order( + self, + event: asyncio.Event, + queue: str, + mock: Mock, + ): + class InnerMiddleware(BaseMiddleware): + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_inner() + mock("inner") + return await call_next(msg, *args, **kwargs) + + class MiddleMiddleware(BaseMiddleware): + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_middle() + mock("middle") + return await call_next(msg, *args, **kwargs) + + class OuterMiddleware(BaseMiddleware): + async def publish_scope(self, call_next, msg, *args, **kwargs): + mock.publish_outer() + mock("outer") + return await call_next(msg, *args, **kwargs) + + broker = self.broker_class( + middlewares=[OuterMiddleware], + ) + router = self.broker_class(middlewares=[MiddleMiddleware]) + router2 = self.broker_class(middlewares=[InnerMiddleware]) + + publisher = router2.publisher(queue) + + args, kwargs = self.get_subscriber_params(queue) + + @router2.subscriber(*args, **kwargs) + async def handler(msg): + event.set() + + router.include_router(router2) + broker.include_router(router) + + async with self.patch_broker(broker) as br: + await br.start() + await publisher.publish(None, queue) + + mock.publish_inner.assert_called_once() + mock.publish_middle.assert_called_once() + mock.publish_outer.assert_called_once() + + assert [c.args[0] for c in mock.call_args_list] == [ + "outer", + "middle", + "inner", + ], mock.call_args_list + + async def test_consume_middleware_order( + self, event: asyncio.Event, queue: str, mock: Mock + ): + class InnerMiddleware(BaseMiddleware): + async def consume_scope(self, call_next, msg): + mock.consume_inner() + mock("inner") + return await call_next(msg) + + class MiddleMiddleware(BaseMiddleware): + async def consume_scope(self, call_next, msg): + mock.consume_middle() + mock("middle") + return await call_next(msg) + + class OuterMiddleware(BaseMiddleware): + async def consume_scope(self, call_next, msg): + mock.consume_outer() + mock("outer") + return await call_next(msg) + + broker = self.broker_class(middlewares=[OuterMiddleware]) + + args, kwargs = self.get_subscriber_params( + queue, + middlewares=[ + MiddleMiddleware(None).consume_scope, + InnerMiddleware(None).consume_scope, + ], + ) + + @broker.subscriber(*args, **kwargs) + async def handler(msg): + event.set() + + async with self.patch_broker(broker) as br: + await br.start() + await asyncio.wait( + ( + asyncio.create_task(broker.publish("start", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + assert event.is_set() + mock.consume_inner.assert_called_once() + mock.consume_middle.assert_called_once() + mock.consume_outer.assert_called_once() + + assert [c.args[0] for c in mock.call_args_list] == [ + "outer", + "middle", + "inner", + ], mock.call_args_list + + async def test_consume_with_middleware_order( + self, event: asyncio.Event, queue: str, mock: Mock + ): + class InnerMiddleware(BaseMiddleware): + async def consume_scope(self, call_next, cmd): + mock.consume_inner() + mock("inner") + return await call_next(cmd) + + class MiddleMiddleware(BaseMiddleware): + async def consume_scope(self, call_next, cmd): + mock.consume_middle() + mock("middle") + return await call_next(cmd) + + class OuterMiddleware(BaseMiddleware): + async def consume_scope(self, call_next, cmd): + mock.consume_outer() + mock("outer") + return await call_next(cmd) + + broker = self.broker_class(middlewares=[OuterMiddleware]) + router = self.broker_class(middlewares=[MiddleMiddleware]) + router2 = self.broker_class(middlewares=[InnerMiddleware]) + + args, kwargs = self.get_subscriber_params(queue) + + @router2.subscriber(*args, **kwargs) + async def handler(msg): + event.set() + + router.include_router(router2) + broker.include_router(router) + + async with self.patch_broker(broker) as br: + await br.start() + await asyncio.wait( + ( + asyncio.create_task(broker.publish("start", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + mock.consume_inner.assert_called_once() + mock.consume_middle.assert_called_once() + mock.consume_outer.assert_called_once() + + assert event.is_set() + assert [c.args[0] for c in mock.call_args_list] == [ + "outer", + "middle", + "inner", + ], mock.call_args_list + + @pytest.mark.asyncio class LocalMiddlewareTestcase(BaseTestcaseConfig): broker_class: Type[BrokerUsecase] @@ -262,9 +575,7 @@ async def after_processed(self, exc_type, exc_val, exc_tb): mock.end() return await super().after_processed(exc_type, exc_val, exc_tb) - broker = self.broker_class( - middlewares=(mid,), - ) + broker = self.broker_class(middlewares=(mid,)) args, kwargs = self.get_subscriber_params(queue) @@ -351,7 +662,9 @@ class Mid(BaseMiddleware): async def on_publish(self, msg: str, *args, **kwargs) -> str: return msg * 2 - broker = self.broker_class(middlewares=(Mid,)) + broker = self.broker_class( + middlewares=(Mid,), + ) args, kwargs = self.get_subscriber_params(queue) @@ -497,7 +810,9 @@ async def value_error_handler(exc): event.set() raise SkipMessage() - broker = self.broker_class(middlewares=(mid,)) + broker = self.broker_class( + middlewares=(mid,), + ) args, kwargs = self.get_subscriber_params(queue) @broker.subscriber(*args, **kwargs) @@ -535,7 +850,9 @@ async def test_exception_middleware_do_not_catch_skip_msg( async def value_error_handler(exc): mock() - broker = self.broker_class(middlewares=(mid,)) + broker = self.broker_class( + middlewares=(mid,), + ) args, kwargs = self.get_subscriber_params(queue) @broker.subscriber(*args, **kwargs) @@ -569,7 +886,9 @@ async def value_error_handler(exc): event.set() raise exc - broker = self.broker_class(middlewares=(mid,)) + broker = self.broker_class( + middlewares=(mid,), + ) args, kwargs = self.get_subscriber_params(queue) @broker.subscriber(*args, **kwargs) @@ -611,7 +930,9 @@ async def zero_error_handler(exc): async def value_error_handler(exc): return "value" - broker = self.broker_class(middlewares=(mid,)) + broker = self.broker_class( + middlewares=(mid,), + ) args, kwargs = self.get_subscriber_params(queue) publisher = broker.publisher(queue + "2") @@ -692,7 +1013,10 @@ async def decoder( async def value_error_handler(exc): event.set() - broker = self.broker_class(middlewares=(mid,), decoder=decoder) + broker = self.broker_class( + middlewares=(mid,), + decoder=decoder, + ) args, kwargs = self.get_subscriber_params(queue) diff --git a/tests/brokers/confluent/test_middlewares.py b/tests/brokers/confluent/test_middlewares.py index 5ff72e0955..17e203879c 100644 --- a/tests/brokers/confluent/test_middlewares.py +++ b/tests/brokers/confluent/test_middlewares.py @@ -1,9 +1,10 @@ import pytest -from faststream.confluent import KafkaBroker +from faststream.confluent import KafkaBroker, TestKafkaBroker from tests.brokers.base.middlewares import ( ExceptionMiddlewareTestcase, MiddlewareTestcase, + MiddlewaresOrderTestcase, ) from .basic import ConfluentTestcaseConfig @@ -17,3 +18,10 @@ class TestMiddlewares(ConfluentTestcaseConfig, MiddlewareTestcase): @pytest.mark.confluent class TestExceptionMiddlewares(ConfluentTestcaseConfig, ExceptionMiddlewareTestcase): broker_class = KafkaBroker + + +class TestMiddlewaresOrder(MiddlewaresOrderTestcase): + broker_class = KafkaBroker + + def patch_broker(self, broker: KafkaBroker) -> TestKafkaBroker: + return TestKafkaBroker(broker) diff --git a/tests/brokers/kafka/test_middlewares.py b/tests/brokers/kafka/test_middlewares.py index ccfd657c8f..13f7e79349 100644 --- a/tests/brokers/kafka/test_middlewares.py +++ b/tests/brokers/kafka/test_middlewares.py @@ -1,9 +1,10 @@ import pytest -from faststream.kafka import KafkaBroker +from faststream.kafka import KafkaBroker, TestKafkaBroker from tests.brokers.base.middlewares import ( ExceptionMiddlewareTestcase, MiddlewareTestcase, + MiddlewaresOrderTestcase, ) @@ -15,3 +16,10 @@ class TestMiddlewares(MiddlewareTestcase): @pytest.mark.kafka class TestExceptionMiddlewares(ExceptionMiddlewareTestcase): broker_class = KafkaBroker + + +class TestMiddlewaresOrder(MiddlewaresOrderTestcase): + broker_class = KafkaBroker + + def patch_broker(self, broker: KafkaBroker) -> TestKafkaBroker: + return TestKafkaBroker(broker) diff --git a/tests/brokers/nats/test_middlewares.py b/tests/brokers/nats/test_middlewares.py index e1768dba88..d9dbca9224 100644 --- a/tests/brokers/nats/test_middlewares.py +++ b/tests/brokers/nats/test_middlewares.py @@ -1,9 +1,10 @@ import pytest -from faststream.nats import NatsBroker +from faststream.nats import NatsBroker, TestNatsBroker from tests.brokers.base.middlewares import ( ExceptionMiddlewareTestcase, MiddlewareTestcase, + MiddlewaresOrderTestcase, ) @@ -15,3 +16,10 @@ class TestMiddlewares(MiddlewareTestcase): @pytest.mark.nats class TestExceptionMiddlewares(ExceptionMiddlewareTestcase): broker_class = NatsBroker + + +class TestMiddlewaresOrder(MiddlewaresOrderTestcase): + broker_class = NatsBroker + + def patch_broker(self, broker: NatsBroker) -> TestNatsBroker: + return TestNatsBroker(broker) diff --git a/tests/brokers/rabbit/test_middlewares.py b/tests/brokers/rabbit/test_middlewares.py index 72e48958ee..050d98d173 100644 --- a/tests/brokers/rabbit/test_middlewares.py +++ b/tests/brokers/rabbit/test_middlewares.py @@ -1,9 +1,10 @@ import pytest -from faststream.rabbit import RabbitBroker +from faststream.rabbit import RabbitBroker, TestRabbitBroker from tests.brokers.base.middlewares import ( ExceptionMiddlewareTestcase, MiddlewareTestcase, + MiddlewaresOrderTestcase, ) @@ -15,3 +16,10 @@ class TestMiddlewares(MiddlewareTestcase): @pytest.mark.rabbit class TestExceptionMiddlewares(ExceptionMiddlewareTestcase): broker_class = RabbitBroker + + +class TestMiddlewaresOrder(MiddlewaresOrderTestcase): + broker_class = RabbitBroker + + def patch_broker(self, broker: RabbitBroker) -> TestRabbitBroker: + return TestRabbitBroker(broker) diff --git a/tests/brokers/redis/test_middlewares.py b/tests/brokers/redis/test_middlewares.py index 22f6894c3b..2c11d0db0c 100644 --- a/tests/brokers/redis/test_middlewares.py +++ b/tests/brokers/redis/test_middlewares.py @@ -1,9 +1,10 @@ import pytest -from faststream.redis import RedisBroker +from faststream.redis import RedisBroker, TestRedisBroker from tests.brokers.base.middlewares import ( ExceptionMiddlewareTestcase, MiddlewareTestcase, + MiddlewaresOrderTestcase, ) @@ -15,3 +16,10 @@ class TestMiddlewares(MiddlewareTestcase): @pytest.mark.redis class TestExceptionMiddlewares(ExceptionMiddlewareTestcase): broker_class = RedisBroker + + +class TestMiddlewaresOrder(MiddlewaresOrderTestcase): + broker_class = RedisBroker + + def patch_broker(self, broker: RedisBroker) -> TestRedisBroker: + return TestRedisBroker(broker)