Skip to content

Commit

Permalink
Merge pull request #44 from cloudblue/refactoring/LITE-18118
Browse files Browse the repository at this point in the history
LITE-18118 CQRS settings are now validated in one place
  • Loading branch information
net-free authored Aug 13, 2021
2 parents 2f4c4d3 + ee6832f commit c61a423
Show file tree
Hide file tree
Showing 22 changed files with 605 additions and 215 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Django CQRS
===========
![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg) [![PyPi Status](https://img.shields.io/pypi/v/django-cqrs.svg)](https://pypi.org/project/django-cqrs/)) [![Docs](https://readthedocs.org/projects/django-cqrs/badge/?version=latest)](https://readthedocs.org/projects/django-cqrs) [![Coverage](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=coverage)](https://sonarcloud.io/dashboard?id=django-cqrs)
![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg) [![PyPi Status](https://img.shields.io/pypi/v/django-cqrs.svg)](https://pypi.org/project/django-cqrs/) [![Docs](https://readthedocs.org/projects/django-cqrs/badge/?version=latest)](https://readthedocs.org/projects/django-cqrs) [![Coverage](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=coverage)](https://sonarcloud.io/dashboard?id=django-cqrs)
[![Build Status](https://travis-ci.org/cloudblue/django-cqrs.svg?branch=master)](https://travis-ci.org/cloudblue/django-cqrs) [![PyPI status](https://img.shields.io/pypi/status/django-cqrs.svg)](https://pypi.python.org/pypi/django-cqrs/) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=alert_status)](https://sonarcloud.io/dashboard?id=django-cqrs)

`django-cqrs` is an Django application, that implements CQRS data synchronisation between several Django microservices.
Expand Down
4 changes: 3 additions & 1 deletion dj_cqrs/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

default_app_config = 'dj_cqrs.apps.CQRSConfig' # pragma: no cover
181 changes: 181 additions & 0 deletions dj_cqrs/_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

import logging

from dj_cqrs.constants import (
DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
DEFAULT_MASTER_MESSAGE_TTL,
DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
DEFAULT_REPLICA_MAX_RETRIES,
DEFAULT_REPLICA_RETRY_DELAY,
)
from dj_cqrs.registries import MasterRegistry, ReplicaRegistry
from dj_cqrs.transport import BaseTransport

from django.utils.module_loading import import_string


logger = logging.getLogger('django-cqrs')


def validate_settings(settings):
is_master = bool(MasterRegistry.models)
is_replica = bool(ReplicaRegistry.models)
if (not is_master) and (not is_replica): # pragma: no cover
return

assert hasattr(settings, 'CQRS'), 'CQRS configuration must be set in Django project settings.'

cqrs_settings = settings.CQRS
assert isinstance(cqrs_settings, dict), 'CQRS configuration must be dict.'

_validate_transport(cqrs_settings)

if is_master or ('master' in cqrs_settings):
_validate_master(cqrs_settings)

if is_replica or ('replica' in cqrs_settings):
_validate_replica(cqrs_settings)


def _validate_transport(cqrs_settings):
transport_cls_location = cqrs_settings.get('transport')
if not transport_cls_location:
raise AssertionError('CQRS transport is not set.')

transport = import_string(transport_cls_location)
if not issubclass(transport, BaseTransport):
raise AssertionError(
'CQRS transport must be inherited from `dj_cqrs.transport.BaseTransport`.',
)


def _validate_master(cqrs_settings):
default_master_settings = {
'master': {
'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL,
'correlation_function': None,
},
}

if 'master' not in cqrs_settings:
cqrs_settings.update(default_master_settings)
return

master_settings = cqrs_settings['master']
assert isinstance(master_settings, dict), 'CQRS master configuration must be dict.'

_validate_master_auto_update_fields(master_settings)
_validate_master_message_ttl(master_settings)
_validate_master_correlation_func(master_settings)


def _validate_master_auto_update_fields(master_settings):
if 'CQRS_AUTO_UPDATE_FIELDS' in master_settings:
assert isinstance(master_settings['CQRS_AUTO_UPDATE_FIELDS'], bool), (
'CQRS master CQRS_AUTO_UPDATE_FIELDS must be bool.'
)
else:
master_settings['CQRS_AUTO_UPDATE_FIELDS'] = DEFAULT_MASTER_AUTO_UPDATE_FIELDS


def _validate_master_message_ttl(master_settings):
if 'CQRS_MESSAGE_TTL' in master_settings:
min_message_ttl = 1
message_ttl = master_settings['CQRS_MESSAGE_TTL']
if (message_ttl is not None) and (
not isinstance(message_ttl, int) or message_ttl < min_message_ttl
):
# No error is raised for backward compatibility
# TODO: raise error in 2.0.0
logger.warning(
'Settings CQRS_MESSAGE_TTL=%s is invalid, using default %s.',
message_ttl, DEFAULT_MASTER_MESSAGE_TTL,
)
master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL
else:
master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL


def _validate_master_correlation_func(master_settings):
correlation_func = master_settings.get('correlation_function')
if not correlation_func:
master_settings['correlation_function'] = None
elif not callable(correlation_func):
raise AssertionError('CQRS master correlation_function must be callable.')


def _validate_replica(cqrs_settings):
queue = cqrs_settings.get('queue')
assert queue, 'CQRS queue is not set.'
assert isinstance(queue, str), 'CQRS queue must be string.'

default_replica_settings = {
'replica': {
'CQRS_MAX_RETRIES': DEFAULT_REPLICA_MAX_RETRIES,
'CQRS_RETRY_DELAY': DEFAULT_REPLICA_RETRY_DELAY,
'delay_queue_max_size': DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
},
}

if 'replica' not in cqrs_settings:
cqrs_settings.update(default_replica_settings)
return

replica_settings = cqrs_settings['replica']
assert isinstance(replica_settings, dict), 'CQRS replica configuration must be dict.'

_validate_replica_max_retries(replica_settings)
_validate_replica_retry_delay(replica_settings)
_validate_replica_delay_queue_max_size(replica_settings)


def _validate_replica_max_retries(replica_settings):
if 'CQRS_MAX_RETRIES' in replica_settings:
min_retries = 0
max_retries = replica_settings['CQRS_MAX_RETRIES']
if (max_retries is not None) and (
not isinstance(max_retries, int) or max_retries < min_retries
):
# No error is raised for backward compatibility
# TODO: raise error in 2.0.0
logger.warning(
'Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s.',
max_retries, DEFAULT_REPLICA_MAX_RETRIES,
)
replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES
else:
replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES


def _validate_replica_retry_delay(replica_settings):
min_retry_delay = 0
retry_delay = replica_settings.get('CQRS_RETRY_DELAY')
if 'CQRS_RETRY_DELAY' not in replica_settings:
replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY
elif not isinstance(retry_delay, int) or retry_delay < min_retry_delay:
# No error is raised for backward compatibility
# TODO: raise error in 2.0.0
logger.warning(
'Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s.',
retry_delay, DEFAULT_REPLICA_RETRY_DELAY,
)
replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY


def _validate_replica_delay_queue_max_size(replica_settings):
min_qsize = 0
max_qsize = replica_settings.get('delay_queue_max_size')
if 'delay_queue_max_size' not in replica_settings:
max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE
elif (max_qsize is not None) and (not isinstance(max_qsize, int) or max_qsize <= min_qsize):
# No error is raised for backward compatibility
# TODO: raise error in 2.0.0
logger.warning(
'Settings delay_queue_max_size=%s is invalid, using default %s.',
max_qsize, DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
)
max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE

replica_settings['delay_queue_max_size'] = max_qsize
13 changes: 13 additions & 0 deletions dj_cqrs/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright © 2021 Ingram Micro Inc. All rights reserved.

from dj_cqrs._validation import validate_settings

from django.apps import AppConfig
from django.conf import settings


class CQRSConfig(AppConfig):
name = 'dj_cqrs'

def ready(self):
validate_settings(settings)
11 changes: 7 additions & 4 deletions dj_cqrs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ class SignalType:
NO_QUEUE = 'None'

DEFAULT_DEAD_MESSAGE_TTL = 864000 # 10 days
DEFAULT_DELAY_QUEUE_MAX_SIZE = 1000
DEFAULT_CQRS_MESSAGE_TTL = 86400 # 1 day
DEFAULT_CQRS_MAX_RETRIES = 30
DEFAULT_CQRS_RETRY_DELAY = 2 # seconds

DEFAULT_MASTER_AUTO_UPDATE_FIELDS = False
DEFAULT_MASTER_MESSAGE_TTL = 86400 # 1 day

DEFAULT_REPLICA_MAX_RETRIES = 30
DEFAULT_REPLICA_RETRY_DELAY = 2 # seconds
DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE = 1000
10 changes: 3 additions & 7 deletions dj_cqrs/correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
from django.conf import settings


_correlation_function = getattr(settings, 'CQRS', {}).get('master', {}).get('correlation_function')
if _correlation_function and (not callable(_correlation_function)):
raise AttributeError('CQRS correlation_function must be callable.')


def get_correlation_id(signal_type, cqrs_id, instance_pk, queue):
"""
:param signal_type: Type of the signal for this message.
Expand All @@ -18,5 +13,6 @@ def get_correlation_id(signal_type, cqrs_id, instance_pk, queue):
:param queue: Queue to synchronize, defaults to None
:type queue: str, optional
"""
if _correlation_function:
return _correlation_function(signal_type, cqrs_id, instance_pk, queue)
correlation_func = settings.CQRS.get('master', {}).get('correlation_function')
if correlation_func:
return correlation_func(signal_type, cqrs_id, instance_pk, queue)
4 changes: 2 additions & 2 deletions dj_cqrs/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dateutil.parser import parse as dateutil_parse

from dj_cqrs.correlation import get_correlation_id
from dj_cqrs.utils import get_expires_datetime
from dj_cqrs.utils import get_message_expiration_dt

from django.utils import timezone

Expand Down Expand Up @@ -74,7 +74,7 @@ def from_message(cls, dct):
expires = dateutil_parse(dct['expires'])
else:
# Backward compatibility for old messages otherwise they are infinite by default.
expires = get_expires_datetime()
expires = get_message_expiration_dt()

return cls(
dct['signal_type'],
Expand Down
4 changes: 2 additions & 2 deletions dj_cqrs/management/commands/cqrs_dead_letters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dj_cqrs.registries import ReplicaRegistry
from dj_cqrs.transport import current_transport
from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
from dj_cqrs.utils import get_expires_datetime
from dj_cqrs.utils import get_message_expiration_dt

from django.core.management.base import BaseCommand, CommandError

Expand Down Expand Up @@ -105,7 +105,7 @@ def handle_retry(self, channel, consumer_generator, dead_letters_count):
dct['retries'] = 0
if dct.get('expires'):
# Message could expire already
expires = get_expires_datetime()
expires = get_message_expiration_dt()
dct['expires'] = expires.replace(microsecond=0).isoformat()
payload = TransportPayload.from_message(dct)
payload.is_requeue = True
Expand Down
35 changes: 4 additions & 31 deletions dj_cqrs/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

from dj_cqrs.constants import (
ALL_BASIC_FIELDS,
DEFAULT_CQRS_MAX_RETRIES,
DEFAULT_CQRS_RETRY_DELAY,
FIELDS_TRACKER_FIELD_NAME,
TRACKED_FIELDS_ATTR_NAME,
)
Expand Down Expand Up @@ -128,9 +126,7 @@ def save_tracked_fields(self):

@property
def _update_cqrs_fields_default(self):
return bool(
getattr(settings, 'CQRS', {}).get('master', {}).get('CQRS_AUTO_UPDATE_FIELDS', False),
)
return settings.CQRS['master']['CQRS_AUTO_UPDATE_FIELDS']

def to_cqrs_dict(self, using=None, sync=False):
"""CQRS serialization for transport payload.
Expand Down Expand Up @@ -425,20 +421,11 @@ def should_retry_cqrs(current_retry, exception=None):
:return: True if message should be retried, False otherwise.
:rtype: bool
"""
replica_settings = settings.CQRS.get('replica', {})
if 'CQRS_MAX_RETRIES' in replica_settings and replica_settings['CQRS_MAX_RETRIES'] is None:
max_retries = settings.CQRS['replica']['CQRS_MAX_RETRIES']
if max_retries is None:
# Infinite
return True

min_value = 0
max_retries = replica_settings.get('CQRS_MAX_RETRIES', DEFAULT_CQRS_MAX_RETRIES)
if not isinstance(max_retries, int) or max_retries < min_value:
logger.warning(
"Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s",
max_retries, DEFAULT_CQRS_MAX_RETRIES,
)
max_retries = DEFAULT_CQRS_MAX_RETRIES

return current_retry < max_retries

@staticmethod
Expand All @@ -450,18 +437,4 @@ def get_cqrs_retry_delay(current_retry):
:return: Delay in seconds.
:rtype: int
"""
retry_delay = (
settings.CQRS
.get('replica', {})
.get('CQRS_RETRY_DELAY', DEFAULT_CQRS_RETRY_DELAY)
)

min_value = 1
if not isinstance(retry_delay, int) or retry_delay < min_value:
logger.warning(
"Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s",
retry_delay, DEFAULT_CQRS_RETRY_DELAY,
)
retry_delay = DEFAULT_CQRS_RETRY_DELAY

return retry_delay
return settings.CQRS['replica']['CQRS_RETRY_DELAY']
6 changes: 3 additions & 3 deletions dj_cqrs/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dj_cqrs.constants import SignalType
from dj_cqrs.controller import producer
from dj_cqrs.dataclasses import TransportPayload
from dj_cqrs.utils import get_expires_datetime
from dj_cqrs.utils import get_message_expiration_dt

from django.db import models, transaction
from django.dispatch import Signal
Expand Down Expand Up @@ -74,7 +74,7 @@ def post_save(cls, sender, **kwargs):
instance.pk,
queue,
previous_data,
expires=get_expires_datetime(),
expires=get_message_expiration_dt(),
)
producer.produce(payload)

Expand Down Expand Up @@ -114,7 +114,7 @@ def post_delete(cls, sender, **kwargs):
sender.CQRS_ID,
instance_data,
instance.pk,
expires=get_expires_datetime(),
expires=get_message_expiration_dt(),
)
# Delete is always in transaction!
transaction.on_commit(lambda: producer.produce(payload))
Expand Down
14 changes: 3 additions & 11 deletions dj_cqrs/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,10 @@
from django.utils.module_loading import import_string


transport_cls_location = getattr(settings, 'CQRS', {}).get('transport')
if not transport_cls_location:
raise AttributeError('CQRS transport is not set.')

try:
current_transport = import_string(transport_cls_location)

if not issubclass(current_transport, BaseTransport):
raise ValueError

except (ImportError, ValueError):
raise ImportError('Bad CQRS transport class.')
current_transport = import_string(settings.CQRS['transport'])
except (AttributeError, ImportError, KeyError):
current_transport = None


__all__ = ['BaseTransport', 'KombuTransport', 'RabbitMQTransport', current_transport]
Loading

0 comments on commit c61a423

Please sign in to comment.