diff --git a/dj_cqrs/management/commands/cqrs_dead_letters.py b/dj_cqrs/management/commands/cqrs_dead_letters.py new file mode 100644 index 0000000..4d0c501 --- /dev/null +++ b/dj_cqrs/management/commands/cqrs_dead_letters.py @@ -0,0 +1,127 @@ +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +import ujson +from django.core.management.base import BaseCommand, CommandError + +from dj_cqrs.dataclasses import TransportPayload +from dj_cqrs.registries import ReplicaRegistry +from dj_cqrs.transport.rabbit_mq import RabbitMQTransport +from dj_cqrs.transport import current_transport +from dj_cqrs.utils import get_expires_datetime + + +class RabbitMQTransportService(RabbitMQTransport): + + @classmethod + def get_consumer_settings(cls): + return cls._get_consumer_settings() + + @classmethod + def get_common_settings(cls): + return cls._get_common_settings() + + @classmethod + def create_connection(cls, host, port, creds, exchange): + return cls._create_connection(host, port, creds, exchange) + + @classmethod + def declare_queue(cls, channel, queue_name): + return channel.queue_declare(queue_name, durable=True, exclusive=False) + + @classmethod + def nack(cls, channel, delivery_tag, payload=None): + return cls._nack(channel, delivery_tag, payload) + + +class Command(BaseCommand): + help = 'CQRS dead letters queue management commands' + + def add_arguments(self, parser): + command = parser.add_subparsers(dest='command') + command.required = True + command.add_parser('retry', help='Retry all dead letters.') + command.add_parser('dump', help='Dumps all dead letter to stdout.') + command.add_parser('purge', help='Removes all dead letters.') + + def handle(self, *args, **options): + self.check_transport() + channel, connection = self.init_broker() + + queue_name, dead_letter_queue_name = RabbitMQTransportService.get_consumer_settings() + dead_letters_queue = RabbitMQTransportService.declare_queue( + channel, dead_letter_queue_name, + ) + dead_letters_count = dead_letters_queue.method.message_count + consumer_generator = channel.consume( + queue=dead_letter_queue_name, + auto_ack=False, + exclusive=False, + ) + + command = options['command'] + if command == 'retry': + self.handle_retry(channel, consumer_generator, dead_letters_count) + elif command == 'dump': + self.handle_dump(consumer_generator, dead_letters_count) + elif command == 'purge': + self.handle_purge(channel, dead_letter_queue_name, dead_letters_count) + + if not connection.is_closed: + connection.close() + + def check_transport(self): + if not issubclass(current_transport, RabbitMQTransport): + raise CommandError("Dead letters commands available only for RabbitMQTransport.") + + def init_broker(self): + host, port, creds, exchange = RabbitMQTransportService.get_common_settings() + connection, channel = RabbitMQTransportService.create_connection( + host, port, creds, exchange, + ) + + queue_name, dead_letter_queue_name = RabbitMQTransportService.get_consumer_settings() + RabbitMQTransportService.declare_queue(channel, queue_name) + RabbitMQTransportService.declare_queue(channel, dead_letter_queue_name) + for cqrs_id, replica_model in ReplicaRegistry.models.items(): + channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=cqrs_id) + + # Every service must have specific SYNC or requeue routes + channel.queue_bind( + exchange=exchange, + queue=queue_name, + routing_key='cqrs.{}.{}'.format(queue_name, cqrs_id), + ) + + return channel, connection + + def handle_retry(self, channel, consumer_generator, dead_letters_count): + self.stdout.write("Total dead letters: {}".format(dead_letters_count)) + for i in range(1, dead_letters_count + 1): + self.stdout.write("Retrying: {}/{}".format(i, dead_letters_count)) + method_frame, properties, body = next(consumer_generator) + + dct = ujson.loads(body) + dct['retries'] = 0 + if dct.get('expires'): + # Message could expire already + expires = get_expires_datetime() + dct['expires'] = expires.replace(microsecond=0).isoformat() + payload = TransportPayload.from_message(dct) + payload.is_requeue = True + + RabbitMQTransportService.produce(payload) + message = ujson.dumps(dct) + self.stdout.write(message) + + RabbitMQTransportService.nack(channel, method_frame.delivery_tag) + + def handle_dump(self, consumer_generator, dead_letters_count): + for i in range(1, dead_letters_count + 1): + *_, body = next(consumer_generator) + self.stdout.write(body.decode('utf-8')) + + def handle_purge(self, channel, dead_letter_queue_name, dead_letter_count): + self.stdout.write("Total dead letters: {}".format(dead_letter_count)) + if dead_letter_count > 0: + channel.queue_purge(dead_letter_queue_name) + self.stdout.write("Purged") diff --git a/docs/lifecycle.rst b/docs/lifecycle.rst index dca8c2d..83e5119 100644 --- a/docs/lifecycle.rst +++ b/docs/lifecycle.rst @@ -126,3 +126,35 @@ Expired or failed messages which should not be retried are moved to 'dead letter }, } +Commands +^^^^^^^^ +Dump +"""""""""""" +Dumps all dead letters to stdout. + +.. code-block:: console + + $ python manage.py cqrs_dead_letters dump + {"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":30,"expires":"2021-05-01T11:50:00+00:00"} + +Retry +""""""""""""" +Retry all dead letters. +Message body retries and expires fields are downgraded. + +.. code-block:: console + + $ python manage.py cqrs_dead_letters retry + Total dead letters: 1 + Retrying: 1/1 + {"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":0,"expires":"2021-05-02T12:30:00+00:00"} + +Purge +""""""""""""" +Removes all dead letters. + +.. code-block:: console + + $ python manage.py cqrs_dead_letters purge + Total dead letters: 1 + Purged diff --git a/tests/test_commands/test_dead_letters.py b/tests/test_commands/test_dead_letters.py new file mode 100644 index 0000000..519f326 --- /dev/null +++ b/tests/test_commands/test_dead_letters.py @@ -0,0 +1,135 @@ +# Copyright © 2021 Ingram Micro Inc. All rights reserved. + +import ujson +from datetime import datetime + +import pytest +from django.utils import timezone + +from dj_cqrs.constants import SignalType +from django.core.management import call_command, CommandError +from dj_cqrs.management.commands.cqrs_dead_letters import Command, RabbitMQTransport + + +COMMAND_NAME = 'cqrs_dead_letters' + + +def test_dump(capsys, mocker): + mocker.patch.object(Command, 'check_transport') + mocker.patch.object( + RabbitMQTransport, + '_get_consumer_settings', + return_value=('queue', 'dead_letters_queue') + ) + mocker.patch.object( + RabbitMQTransport, + '_get_common_settings', + return_value=('host', 'port', mocker.MagicMock(), 'exchange') + ) + + queue = mocker.MagicMock() + queue.method.message_count = 1 + message_body = ujson.dumps({'cqrs_id': 'test'}).encode('utf-8') + + channel = mocker.MagicMock() + channel.consume = lambda *args, **kwargs: (v for v in [(None, None, message_body)]) + channel.queue_declare = lambda *args, **kwargs: queue + mocker.patch.object( + RabbitMQTransport, + '_create_connection', + return_value=(mocker.MagicMock(), channel) + ) + mocker.patch.object(RabbitMQTransport, '_nack') + + call_command(COMMAND_NAME, 'dump') + + captured = capsys.readouterr() + assert captured.out.strip() == message_body.decode('utf-8') + + +def test_handle_retry(settings, capsys, mocker): + produce_channel = mocker.MagicMock() + mocker.patch.object( + RabbitMQTransport, + '_get_producer_rmq_objects', + return_value=(None, produce_channel) + ) + + channel = mocker.MagicMock() + method_frame = mocker.MagicMock() + method_frame.delivery_tag = 12 + + settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 3600 + fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc) + mocker.patch('django.utils.timezone.now', return_value=fake_now) + message = { + 'signal_type': SignalType.SAVE, + 'cqrs_id': 'test', + 'instance_data': {'id': 123}, + 'instance_pk': 1, + 'previous_data': None, + 'correlation_id': None, + 'expires': '2020-01-01T00:00:00+00:00', + 'retries': 30, + } + consumer_generator = (v for v in [(method_frame, None, ujson.dumps(message))]) + + command = Command() + command.handle_retry(channel, consumer_generator, dead_letters_count=1) + + assert produce_channel.basic_publish.call_count == 1 + + produce_kwargs = produce_channel.basic_publish.call_args[1] + assert produce_kwargs['routing_key'] == 'cqrs.replica.test' + + produce_message = ujson.loads(produce_kwargs['body']) + assert produce_message['instance_data'] == message['instance_data'] + assert produce_message['expires'] == '2020-01-01T01:00:00+00:00' + assert produce_message['retries'] == 0 + + captured = capsys.readouterr() + total_msg, retrying_msg, body_msg = captured.out.strip().split('\n') + + assert total_msg == 'Total dead letters: 1' + assert retrying_msg == 'Retrying: 1/1' + assert '2020-01-01T01:00:00+00:00' in body_msg + + assert channel.basic_nack.call_count == 1 + assert channel.basic_nack.call_args[0][0] == 12 + + +def test_handle_purge(capsys, mocker): + channel = mocker.MagicMock() + + command = Command() + command.handle_purge(channel, 'dead_letters_test', dead_letter_count=3) + + assert channel.queue_purge.call_count == 1 + assert channel.queue_purge.call_args[0][0] == 'dead_letters_test' + + captured = capsys.readouterr() + total_msg, purged_msg = captured.out.strip().split('\n') + + assert total_msg == 'Total dead letters: 3' + assert purged_msg == 'Purged' + + +def test_handle_purge_empty_queue(capsys, mocker): + channel = mocker.MagicMock() + + command = Command() + command.handle_purge(channel, 'dead_letters_test', dead_letter_count=0) + + assert channel.queue_purge.call_count == 0 + + captured = capsys.readouterr() + assert captured.out.strip() == 'Total dead letters: 0' + + +def test_check_transport(settings): + command = Command() + + with pytest.raises(CommandError) as e: + command.check_transport() + + assert "Dead letters commands available only for RabbitMQTransport." in str(e)