Skip to content

Commit

Permalink
Merge pull request #36 from cloudblue/lite-18055-dead-letters-commands
Browse files Browse the repository at this point in the history
LITE-18055 Added dead letters management commands
  • Loading branch information
maxipavlovic authored May 17, 2021
2 parents ca8dee8 + a42b86f commit 57a96c0
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 0 deletions.
127 changes: 127 additions & 0 deletions dj_cqrs/management/commands/cqrs_dead_letters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

import ujson
from django.core.management.base import BaseCommand, CommandError

from dj_cqrs.dataclasses import TransportPayload
from dj_cqrs.registries import ReplicaRegistry
from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
from dj_cqrs.transport import current_transport
from dj_cqrs.utils import get_expires_datetime


class RabbitMQTransportService(RabbitMQTransport):

@classmethod
def get_consumer_settings(cls):
return cls._get_consumer_settings()

@classmethod
def get_common_settings(cls):
return cls._get_common_settings()

@classmethod
def create_connection(cls, host, port, creds, exchange):
return cls._create_connection(host, port, creds, exchange)

@classmethod
def declare_queue(cls, channel, queue_name):
return channel.queue_declare(queue_name, durable=True, exclusive=False)

@classmethod
def nack(cls, channel, delivery_tag, payload=None):
return cls._nack(channel, delivery_tag, payload)


class Command(BaseCommand):
help = 'CQRS dead letters queue management commands'

def add_arguments(self, parser):
command = parser.add_subparsers(dest='command')
command.required = True
command.add_parser('retry', help='Retry all dead letters.')
command.add_parser('dump', help='Dumps all dead letter to stdout.')
command.add_parser('purge', help='Removes all dead letters.')

def handle(self, *args, **options):
self.check_transport()
channel, connection = self.init_broker()

queue_name, dead_letter_queue_name = RabbitMQTransportService.get_consumer_settings()
dead_letters_queue = RabbitMQTransportService.declare_queue(
channel, dead_letter_queue_name,
)
dead_letters_count = dead_letters_queue.method.message_count
consumer_generator = channel.consume(
queue=dead_letter_queue_name,
auto_ack=False,
exclusive=False,
)

command = options['command']
if command == 'retry':
self.handle_retry(channel, consumer_generator, dead_letters_count)
elif command == 'dump':
self.handle_dump(consumer_generator, dead_letters_count)
elif command == 'purge':
self.handle_purge(channel, dead_letter_queue_name, dead_letters_count)

if not connection.is_closed:
connection.close()

def check_transport(self):
if not issubclass(current_transport, RabbitMQTransport):
raise CommandError("Dead letters commands available only for RabbitMQTransport.")

def init_broker(self):
host, port, creds, exchange = RabbitMQTransportService.get_common_settings()
connection, channel = RabbitMQTransportService.create_connection(
host, port, creds, exchange,
)

queue_name, dead_letter_queue_name = RabbitMQTransportService.get_consumer_settings()
RabbitMQTransportService.declare_queue(channel, queue_name)
RabbitMQTransportService.declare_queue(channel, dead_letter_queue_name)
for cqrs_id, replica_model in ReplicaRegistry.models.items():
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=cqrs_id)

# Every service must have specific SYNC or requeue routes
channel.queue_bind(
exchange=exchange,
queue=queue_name,
routing_key='cqrs.{}.{}'.format(queue_name, cqrs_id),
)

return channel, connection

def handle_retry(self, channel, consumer_generator, dead_letters_count):
self.stdout.write("Total dead letters: {}".format(dead_letters_count))
for i in range(1, dead_letters_count + 1):
self.stdout.write("Retrying: {}/{}".format(i, dead_letters_count))
method_frame, properties, body = next(consumer_generator)

dct = ujson.loads(body)
dct['retries'] = 0
if dct.get('expires'):
# Message could expire already
expires = get_expires_datetime()
dct['expires'] = expires.replace(microsecond=0).isoformat()
payload = TransportPayload.from_message(dct)
payload.is_requeue = True

RabbitMQTransportService.produce(payload)
message = ujson.dumps(dct)
self.stdout.write(message)

RabbitMQTransportService.nack(channel, method_frame.delivery_tag)

def handle_dump(self, consumer_generator, dead_letters_count):
for i in range(1, dead_letters_count + 1):
*_, body = next(consumer_generator)
self.stdout.write(body.decode('utf-8'))

def handle_purge(self, channel, dead_letter_queue_name, dead_letter_count):
self.stdout.write("Total dead letters: {}".format(dead_letter_count))
if dead_letter_count > 0:
channel.queue_purge(dead_letter_queue_name)
self.stdout.write("Purged")
32 changes: 32 additions & 0 deletions docs/lifecycle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,35 @@ Expired or failed messages which should not be retried are moved to 'dead letter
},
}
Commands
^^^^^^^^
Dump
""""""""""""
Dumps all dead letters to stdout.

.. code-block:: console
$ python manage.py cqrs_dead_letters dump
{"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":30,"expires":"2021-05-01T11:50:00+00:00"}
Retry
"""""""""""""
Retry all dead letters.
Message body retries and expires fields are downgraded.

.. code-block:: console
$ python manage.py cqrs_dead_letters retry
Total dead letters: 1
Retrying: 1/1
{"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":0,"expires":"2021-05-02T12:30:00+00:00"}
Purge
"""""""""""""
Removes all dead letters.

.. code-block:: console
$ python manage.py cqrs_dead_letters purge
Total dead letters: 1
Purged
135 changes: 135 additions & 0 deletions tests/test_commands/test_dead_letters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

import ujson
from datetime import datetime

import pytest
from django.utils import timezone

from dj_cqrs.constants import SignalType
from django.core.management import call_command, CommandError
from dj_cqrs.management.commands.cqrs_dead_letters import Command, RabbitMQTransport


COMMAND_NAME = 'cqrs_dead_letters'


def test_dump(capsys, mocker):
mocker.patch.object(Command, 'check_transport')
mocker.patch.object(
RabbitMQTransport,
'_get_consumer_settings',
return_value=('queue', 'dead_letters_queue')
)
mocker.patch.object(
RabbitMQTransport,
'_get_common_settings',
return_value=('host', 'port', mocker.MagicMock(), 'exchange')
)

queue = mocker.MagicMock()
queue.method.message_count = 1
message_body = ujson.dumps({'cqrs_id': 'test'}).encode('utf-8')

channel = mocker.MagicMock()
channel.consume = lambda *args, **kwargs: (v for v in [(None, None, message_body)])
channel.queue_declare = lambda *args, **kwargs: queue
mocker.patch.object(
RabbitMQTransport,
'_create_connection',
return_value=(mocker.MagicMock(), channel)
)
mocker.patch.object(RabbitMQTransport, '_nack')

call_command(COMMAND_NAME, 'dump')

captured = capsys.readouterr()
assert captured.out.strip() == message_body.decode('utf-8')


def test_handle_retry(settings, capsys, mocker):
produce_channel = mocker.MagicMock()
mocker.patch.object(
RabbitMQTransport,
'_get_producer_rmq_objects',
return_value=(None, produce_channel)
)

channel = mocker.MagicMock()
method_frame = mocker.MagicMock()
method_frame.delivery_tag = 12

settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 3600
fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
mocker.patch('django.utils.timezone.now', return_value=fake_now)
message = {
'signal_type': SignalType.SAVE,
'cqrs_id': 'test',
'instance_data': {'id': 123},
'instance_pk': 1,
'previous_data': None,
'correlation_id': None,
'expires': '2020-01-01T00:00:00+00:00',
'retries': 30,
}
consumer_generator = (v for v in [(method_frame, None, ujson.dumps(message))])

command = Command()
command.handle_retry(channel, consumer_generator, dead_letters_count=1)

assert produce_channel.basic_publish.call_count == 1

produce_kwargs = produce_channel.basic_publish.call_args[1]
assert produce_kwargs['routing_key'] == 'cqrs.replica.test'

produce_message = ujson.loads(produce_kwargs['body'])
assert produce_message['instance_data'] == message['instance_data']
assert produce_message['expires'] == '2020-01-01T01:00:00+00:00'
assert produce_message['retries'] == 0

captured = capsys.readouterr()
total_msg, retrying_msg, body_msg = captured.out.strip().split('\n')

assert total_msg == 'Total dead letters: 1'
assert retrying_msg == 'Retrying: 1/1'
assert '2020-01-01T01:00:00+00:00' in body_msg

assert channel.basic_nack.call_count == 1
assert channel.basic_nack.call_args[0][0] == 12


def test_handle_purge(capsys, mocker):
channel = mocker.MagicMock()

command = Command()
command.handle_purge(channel, 'dead_letters_test', dead_letter_count=3)

assert channel.queue_purge.call_count == 1
assert channel.queue_purge.call_args[0][0] == 'dead_letters_test'

captured = capsys.readouterr()
total_msg, purged_msg = captured.out.strip().split('\n')

assert total_msg == 'Total dead letters: 3'
assert purged_msg == 'Purged'


def test_handle_purge_empty_queue(capsys, mocker):
channel = mocker.MagicMock()

command = Command()
command.handle_purge(channel, 'dead_letters_test', dead_letter_count=0)

assert channel.queue_purge.call_count == 0

captured = capsys.readouterr()
assert captured.out.strip() == 'Total dead letters: 0'


def test_check_transport(settings):
command = Command()

with pytest.raises(CommandError) as e:
command.check_transport()

assert "Dead letters commands available only for RabbitMQTransport." in str(e)

0 comments on commit 57a96c0

Please sign in to comment.