From 65d01a799f03346a9bda987cd065a7efc1d8d817 Mon Sep 17 00:00:00 2001 From: Maxim Kolyubyakin Date: Wed, 15 Mar 2023 20:37:58 +0100 Subject: [PATCH 1/2] LITE-26790 Better Logging in RabbitMQ producer * Retryable reconnects are logged as WARNING * Non-retryable exceptions are logged as ERROR --- dj_cqrs/transport/rabbit_mq.py | 22 ++++++++---- tests/test_transport/test_rabbit_mq.py | 50 ++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 10 deletions(-) diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index 2d7837c..dddb5f0 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -109,18 +109,26 @@ def _produce_with_retries(cls, payload, retries): cls._produce_message(channel, exchange, payload) cls.log_produced(payload) except ( - exceptions.AMQPError, exceptions.ChannelError, exceptions.ReentrancyError, + exceptions.AMQPError, + exceptions.ChannelError, + exceptions.ReentrancyError, AMQPConnectorException, - ): - logger.exception("CQRS couldn't be published: pk = {0} ({1}).{2}".format( - payload.pk, payload.cqrs_id, " Reconnect..." if retries else "", + ) as e: + base_log_message = "CQRS couldn't be published: pk = {0} ({1}).".format( + payload.pk, payload.cqrs_id, + ) + + if not retries: + logger.exception(base_log_message) + return + + logger.warning('{0} Error: {1}. Reconnect...'.format( + base_log_message, e.__class__.__name__, )) # in case of any error - close connection and try to reconnect cls.clean_connection() - - if retries: - cls._produce_with_retries(payload, retries - 1) + cls._produce_with_retries(payload, retries - 1) @classmethod def _consume_message(cls, ch, method, properties, body, delay_queue): diff --git a/tests/test_transport/test_rabbit_mq.py b/tests/test_transport/test_rabbit_mq.py index 471fe68..e7c3ca1 100644 --- a/tests/test_transport/test_rabbit_mq.py +++ b/tests/test_transport/test_rabbit_mq.py @@ -8,7 +8,12 @@ import ujson from django.db import DatabaseError from pika.adapters.utils.connection_workflow import AMQPConnectorException -from pika.exceptions import AMQPError, ChannelError, ReentrancyError +from pika.exceptions import ( + AMQPError, + ChannelError, + ReentrancyError, + StreamLostError, +) from dj_cqrs.constants import ( DEFAULT_MASTER_AUTO_UPDATE_FIELDS, @@ -223,8 +228,47 @@ def test_produce_retry_on_error(rabbit_transport, mocker, caplog): SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1, ), ) - assert "CQRS couldn't be published: pk = 1 (CQRS_ID). Reconnect..." in caplog.text - assert 'CQRS is published: pk = 1 (CQRS_ID)' in caplog.text + + assert caplog.record_tuples == [ + ( + 'django-cqrs', + logging.WARNING, + "CQRS couldn't be published: pk = 1 (CQRS_ID)." + " Error: AMQPConnectorException. Reconnect...", + ), + ( + 'django-cqrs', + logging.INFO, + 'CQRS is published: pk = 1 (CQRS_ID), correlation_id = None.', + ), + ] + + +def test_produce_retry_on_error_1(rabbit_transport, mocker, caplog): + mocker.patch.object(RabbitMQTransport, '_get_producer_rmq_objects', side_effect=[ + StreamLostError, + StreamLostError, + ]) + mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True) + + rabbit_transport.produce( + TransportPayload( + SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1, + ), + ) + + assert caplog.record_tuples == [ + ( + 'django-cqrs', + logging.WARNING, + "CQRS couldn't be published: pk = 1 (CQRS_ID). Error: StreamLostError. Reconnect...", + ), + ( + 'django-cqrs', + logging.ERROR, + "CQRS couldn't be published: pk = 1 (CQRS_ID).", + ), + ] def test_produce_message_ok(mocker): From bf5c760a9569ba07ae5c88f2c21eeb3d8c5580be Mon Sep 17 00:00:00 2001 From: Maxim Kolyubyakin Date: Mon, 20 Mar 2023 10:08:53 +0100 Subject: [PATCH 2/2] LITE-26790 RMQ Connection is cleaned in any error case --- dj_cqrs/transport/rabbit_mq.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index dddb5f0..478763e 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -114,10 +114,12 @@ def _produce_with_retries(cls, payload, retries): exceptions.ReentrancyError, AMQPConnectorException, ) as e: + # in case of any error - close connection and try to reconnect + cls.clean_connection() + base_log_message = "CQRS couldn't be published: pk = {0} ({1}).".format( payload.pk, payload.cqrs_id, ) - if not retries: logger.exception(base_log_message) return @@ -126,8 +128,6 @@ def _produce_with_retries(cls, payload, retries): base_log_message, e.__class__.__name__, )) - # in case of any error - close connection and try to reconnect - cls.clean_connection() cls._produce_with_retries(payload, retries - 1) @classmethod