diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 8a76ee0dce..e81c64efbb 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -382,6 +382,7 @@ search: - exception - [BaseExceptionMiddleware](api/faststream/broker/middlewares/exception/BaseExceptionMiddleware.md) - [ExceptionMiddleware](api/faststream/broker/middlewares/exception/ExceptionMiddleware.md) + - [ignore_handler](api/faststream/broker/middlewares/exception/ignore_handler.md) - logging - [CriticalLogMiddleware](api/faststream/broker/middlewares/logging/CriticalLogMiddleware.md) - proto diff --git a/docs/docs/en/api/faststream/broker/middlewares/exception/ignore_handler.md b/docs/docs/en/api/faststream/broker/middlewares/exception/ignore_handler.md new file mode 100644 index 0000000000..425561dcba --- /dev/null +++ b/docs/docs/en/api/faststream/broker/middlewares/exception/ignore_handler.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.broker.middlewares.exception.ignore_handler diff --git a/faststream/broker/middlewares/exception.py b/faststream/broker/middlewares/exception.py index feb1627b68..e307133e99 100644 --- a/faststream/broker/middlewares/exception.py +++ b/faststream/broker/middlewares/exception.py @@ -5,7 +5,10 @@ Callable, ContextManager, Dict, + List, + NoReturn, Optional, + Tuple, Type, Union, overload, @@ -14,6 +17,7 @@ from typing_extensions import Literal, TypeAlias from faststream.broker.middlewares.base import BaseMiddleware +from faststream.exceptions import IgnoredException from faststream.utils import apply_types, context from faststream.utils.functions import sync_fake_context, to_async @@ -29,9 +33,17 @@ CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]] CastedPublishingExceptionHandler: TypeAlias = Callable[..., Awaitable["Any"]] -CastedHandlers: TypeAlias = Dict[Type[Exception], CastedGeneralExceptionHandler] -CastedPublishingHandlers: TypeAlias = Dict[ - Type[Exception], CastedPublishingExceptionHandler +CastedHandlers: TypeAlias = List[ + Tuple[ + Type[Exception], + CastedGeneralExceptionHandler, + ] +] +CastedPublishingHandlers: TypeAlias = List[ + Tuple[ + Type[Exception], + CastedPublishingExceptionHandler, + ] ] @@ -57,7 +69,7 @@ async def consume_scope( except Exception as exc: exc_type = type(exc) - for handler_type, handler in self._publish_handlers.items(): + for handler_type, handler in self._publish_handlers: if issubclass(exc_type, handler_type): return await handler(exc) @@ -70,7 +82,7 @@ async def after_processed( exc_tb: Optional["TracebackType"] = None, ) -> Optional[bool]: if exc_type: - for handler_type, handler in self._handlers.items(): + for handler_type, handler in self._handlers: if issubclass(exc_type, handler_type): # TODO: remove it after context will be moved to middleware # In case parser/decoder error occurred @@ -98,20 +110,34 @@ class ExceptionMiddleware: def __init__( self, - handlers: Optional[Dict[Type[Exception], GeneralExceptionHandler]] = None, + handlers: Optional[ + Dict[ + Type[Exception], + GeneralExceptionHandler, + ] + ] = None, publish_handlers: Optional[ - Dict[Type[Exception], PublishingExceptionHandler] + Dict[ + Type[Exception], + PublishingExceptionHandler, + ] ] = None, ) -> None: - self._handlers = { - exc_type: apply_types(to_async(handler)) - for exc_type, handler in (handlers or {}).items() - } - - self._publish_handlers = { - exc_type: apply_types(to_async(handler)) - for exc_type, handler in (publish_handlers or {}).items() - } + self._handlers: CastedHandlers = [ + (IgnoredException, ignore_handler), + *( + (exc_type, apply_types(to_async(handler))) + for exc_type, handler in (handlers or {}).items() + ), + ] + + self._publish_handlers: CastedPublishingHandlers = [ + (IgnoredException, ignore_handler), + *( + (exc_type, apply_types(to_async(handler))) + for exc_type, handler in (publish_handlers or {}).items() + ), + ] @overload def add_handler( @@ -140,7 +166,12 @@ def add_handler( def pub_wrapper( func: PublishingExceptionHandler, ) -> PublishingExceptionHandler: - self._publish_handlers[exc] = apply_types(to_async(func)) + self._publish_handlers.append( + ( + exc, + apply_types(to_async(func)), + ) + ) return func return pub_wrapper @@ -150,7 +181,12 @@ def pub_wrapper( def default_wrapper( func: GeneralExceptionHandler, ) -> GeneralExceptionHandler: - self._handlers[exc] = apply_types(to_async(func)) + self._handlers.append( + ( + exc, + apply_types(to_async(func)), + ) + ) return func return default_wrapper @@ -162,3 +198,7 @@ def __call__(self, msg: Optional[Any]) -> BaseExceptionMiddleware: publish_handlers=self._publish_handlers, msg=msg, ) + + +async def ignore_handler(exception: IgnoredException) -> NoReturn: + raise exception diff --git a/tests/brokers/base/middlewares.py b/tests/brokers/base/middlewares.py index dc6175198e..cbf9efae38 100644 --- a/tests/brokers/base/middlewares.py +++ b/tests/brokers/base/middlewares.py @@ -526,6 +526,39 @@ async def subscriber2(msg=Context("message")): assert event.is_set() assert mock.call_count == 0 + async def test_exception_middleware_do_not_catch_skip_msg( + self, event: asyncio.Event, queue: str, mock: Mock, raw_broker + ): + mid = ExceptionMiddleware() + + @mid.add_handler(Exception) + async def value_error_handler(exc): + mock() + + broker = self.broker_class(middlewares=(mid,)) + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) + async def subscriber(m): + event.set() + raise SkipMessage + + broker = self.patch_broker(raw_broker, broker) + + async with broker: + await broker.start() + await asyncio.wait( + ( + asyncio.create_task(broker.publish("", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + await asyncio.sleep(0.001) + + assert event.is_set() + assert mock.call_count == 0 + async def test_exception_middleware_reraise( self, event: asyncio.Event, queue: str, mock: Mock, raw_broker ): @@ -629,7 +662,7 @@ async def value_error_handler(exc): mid2 = ExceptionMiddleware(handlers={ValueError: value_error_handler}) - assert mid1._handlers.keys() == mid2._handlers.keys() + assert [x[0] for x in mid1._handlers] == [x[0] for x in mid2._handlers] async def test_exception_middleware_init_publish_handler_same(self): mid1 = ExceptionMiddleware() @@ -640,7 +673,9 @@ async def value_error_handler(exc): mid2 = ExceptionMiddleware(publish_handlers={ValueError: value_error_handler}) - assert mid1._publish_handlers.keys() == mid2._publish_handlers.keys() + assert [x[0] for x in mid1._publish_handlers] == [ + x[0] for x in mid2._publish_handlers + ] async def test_exception_middleware_decoder_error( self, event: asyncio.Event, queue: str, mock: Mock, raw_broker