diff --git a/faststream/broker/acknowledgement_watcher.py b/faststream/broker/acknowledgement_watcher.py index 4ecb6ad4b3..4f75b1d66e 100644 --- a/faststream/broker/acknowledgement_watcher.py +++ b/faststream/broker/acknowledgement_watcher.py @@ -126,11 +126,13 @@ def __init__( self, message: "StreamMessage[MsgType]", watcher: BaseWatcher, + logger: Optional["LoggerProto"] = None, **extra_options: Any, ) -> None: self.watcher = watcher self.message = message self.extra_options = extra_options + self.logger = logger async def __aenter__(self) -> None: self.watcher.add(self.message.message_id) @@ -172,15 +174,41 @@ async def __aexit__( return not is_test_env() async def __ack(self) -> None: - await self.message.ack(**self.extra_options) - self.watcher.remove(self.message.message_id) + try: + await self.message.ack(**self.extra_options) + except Exception as er: + if self.logger is not None: + self.logger.log( + logging.ERROR, + er, + exc_info=er + ) + else: + self.watcher.remove(self.message.message_id) async def __nack(self) -> None: - await self.message.nack(**self.extra_options) + try: + await self.message.nack(**self.extra_options) + except Exception as er: + if self.logger is not None: + self.logger.log( + logging.ERROR, + er, + exc_info=er + ) async def __reject(self) -> None: - await self.message.reject(**self.extra_options) - self.watcher.remove(self.message.message_id) + try: + await self.message.reject(**self.extra_options) + except Exception as er: + if self.logger is not None: + self.logger.log( + logging.ERROR, + er, + exc_info=er + ) + else: + self.watcher.remove(self.message.message_id) def get_watcher( diff --git a/faststream/broker/utils.py b/faststream/broker/utils.py index 8ca0585a4c..6903f4c94d 100644 --- a/faststream/broker/utils.py +++ b/faststream/broker/utils.py @@ -50,6 +50,7 @@ def get_watcher_context( return partial( WatcherContext, watcher=get_watcher(logger, retry), + logger=logger, **extra_options, )