From 6d41ea353985581e4b69b662aea6dec43c4f7b77 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 23 Nov 2024 22:10:16 +0100 Subject: [PATCH] Missing middlewares --- faststream/broker/subscriber/call_item.py | 2 +- faststream/broker/subscriber/usecase.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/faststream/broker/subscriber/call_item.py b/faststream/broker/subscriber/call_item.py index c7c32b3609..798a905823 100644 --- a/faststream/broker/subscriber/call_item.py +++ b/faststream/broker/subscriber/call_item.py @@ -157,7 +157,7 @@ async def call( """Execute wrapped handler with consume middlewares.""" call: AsyncFuncAny = self.handler.call_wrapped - for middleware in chain(self.item_middlewares, _extra_middlewares): + for middleware in chain(self.item_middlewares[::-1], _extra_middlewares): call = partial(middleware, call) try: diff --git a/faststream/broker/subscriber/usecase.py b/faststream/broker/subscriber/usecase.py index d9974efe43..e94b547624 100644 --- a/faststream/broker/subscriber/usecase.py +++ b/faststream/broker/subscriber/usecase.py @@ -330,7 +330,7 @@ async def process_message(self, msg: MsgType) -> "Response": # enter all middlewares middlewares: List[BaseMiddleware] = [] - for base_m in self._broker_middlewares: + for base_m in self._broker_middlewares[::-1]: middleware = base_m(msg) middlewares.append(middleware) await middleware.__aenter__() @@ -360,14 +360,14 @@ async def process_message(self, msg: MsgType) -> "Response": stack.enter_context(context.scope("message", message)) # Middlewares should be exited before scope release - for m in middlewares: + for m in middlewares[::-1]: stack.push_async_exit(m.__aexit__) result_msg = ensure_response( await h.call( message=message, # consumer middlewares - _extra_middlewares=(m.consume_scope for m in middlewares), + _extra_middlewares=(m.consume_scope for m in middlewares[::-1]), ) ) @@ -382,7 +382,7 @@ async def process_message(self, msg: MsgType) -> "Response": result_msg.body, **result_msg.as_publish_kwargs(), # publisher middlewares - _extra_middlewares=(m.publish_scope for m in middlewares), + _extra_middlewares=(m.publish_scope for m in middlewares[::-1]), ) # Return data for tests @@ -390,7 +390,7 @@ async def process_message(self, msg: MsgType) -> "Response": # Suitable handler was not found or # parsing/decoding exception occurred - for m in middlewares: + for m in middlewares[::-1]: stack.push_async_exit(m.__aexit__) if parsing_error: