diff --git a/.travis.yml b/.travis.yml
index 22b15f8..7159424 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,24 +1,44 @@
dist: bionic
language: python
-matrix:
+services:
+ - docker
+jobs:
include:
- os: linux
python: 3.6
sudo: true
- env: BUILD=PYPI
+ env:
+ - BUILD=PYPI
+ - DJANGO_VERSION=3.1.*
- os: linux
python: 3.7
sudo: true
+ env: DJANGO_VERSION=3.1.*
- os: linux
python: 3.8
sudo: true
+ env: DJANGO_VERSION=1.11.*
+ - os: linux
+ python: 3.8
+ sudo: true
+ env: DJANGO_VERSION=2.2.*
+ - os: linux
+ python: 3.8
+ sudo: true
+ env:
+ - DJANGO_VERSION=3.1.*
+ - INTEGRATION_TESTS=yes
+ - COMPAT_TESTS=yes
install:
- pip install -r requirements/dev.txt
- pip install -r requirements/test.txt
- pip install pytest-cov
+- pip install django==$DJANGO_VERSION
script:
- flake8
- python setup.py test
+- ./travis_integration_tests.sh
+- ./travis_compat_tests.sh
- sonar-scanner
after_success:
- bash <(curl -s https://codecov.io/bash)
diff --git a/README.md b/README.md
index 265903d..1b9ab64 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
Django CQRS
===========
-![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg) [![PyPi Status](https://img.shields.io/pypi/v/django-cqrs.svg)](https://pypi.org/project/django-cqrs/) [![codecov](https://codecov.io/gh/cloudblue/django-cqrs/branch/master/graph/badge.svg)](https://codecov.io/gh/cloudblue/django-cqrs) [![Build Status](https://travis-ci.org/cloudblue/django-cqrs.svg?branch=master)](https://travis-ci.org/cloudblue/django-cqrs) [![PyPI status](https://img.shields.io/pypi/status/django-cqrs.svg)](https://pypi.python.org/pypi/django-cqrs/) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=alert_status)](https://sonarcloud.io/dashboard?id=django-cqrs)
+![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg) [![PyPi Status](https://img.shields.io/pypi/v/django-cqrs.svg)](https://pypi.org/project/django-cqrs/)) [![Docs](https://readthedocs.org/projects/django-cqrs/badge/?version=latest)](https://readthedocs.org/projects/django-cqrs) [![codecov](https://codecov.io/gh/cloudblue/django-cqrs/branch/master/graph/badge.svg)](https://codecov.io/gh/cloudblue/django-cqrs) [![Build Status](https://travis-ci.org/cloudblue/django-cqrs.svg?branch=master)](https://travis-ci.org/cloudblue/django-cqrs) [![PyPI status](https://img.shields.io/pypi/status/django-cqrs.svg)](https://pypi.python.org/pypi/django-cqrs/) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=alert_status)](https://sonarcloud.io/dashboard?id=django-cqrs)
`django-cqrs` is an Django application, that implements CQRS data synchronisation between several Django microservices.
@@ -12,6 +12,12 @@ In Connect we have a rather complex Domain Model. There are many microservices,
The pattern, that solves this issue is called [CQRS - Command Query Responsibility Segregation](https://microservices.io/patterns/data/cqrs.html). Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. Applications keep their replicas up to data by subscribing to [Domain events](https://microservices.io/patterns/data/domain-event.html) published by the service that owns the data. Data is [eventually consistent](https://en.wikipedia.org/wiki/Eventual_consistency) and that's okay for non-critical business transactions.
+Documentation
+=============
+
+Full documentation is available at [https://django-cqrs.readthedocs.org](https://django-cqrs.readthedocs.org).
+
+
Examples
========
diff --git a/dj_cqrs/transport/__init__.py b/dj_cqrs/transport/__init__.py
index dd30ae7..b0eaa44 100644
--- a/dj_cqrs/transport/__init__.py
+++ b/dj_cqrs/transport/__init__.py
@@ -4,6 +4,7 @@
from django.utils.module_loading import import_string
from dj_cqrs.transport.base import BaseTransport
+from dj_cqrs.transport.kombu import KombuTransport
from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
@@ -22,4 +23,4 @@
raise ImportError('Bad CQRS transport class.')
-__all__ = [BaseTransport, RabbitMQTransport, current_transport]
+__all__ = [BaseTransport, KombuTransport, RabbitMQTransport, current_transport]
diff --git a/dj_cqrs/transport/kombu.py b/dj_cqrs/transport/kombu.py
new file mode 100644
index 0000000..c5c474c
--- /dev/null
+++ b/dj_cqrs/transport/kombu.py
@@ -0,0 +1,191 @@
+# Copyright © 2020 Ingram Micro Inc. All rights reserved.
+
+import logging
+
+import ujson
+from django.conf import settings
+from kombu import Connection, Exchange, Producer, Queue
+from kombu.exceptions import KombuError
+from kombu.mixins import ConsumerMixin
+
+
+from dj_cqrs.constants import SignalType
+from dj_cqrs.controller import consumer
+from dj_cqrs.dataclasses import TransportPayload
+from dj_cqrs.registries import ReplicaRegistry
+from dj_cqrs.transport import BaseTransport
+from dj_cqrs.transport.mixins import LoggingMixin
+
+logger = logging.getLogger('django-cqrs')
+
+
+class _KombuConsumer(ConsumerMixin):
+
+ def __init__(self, url, exchange_name, queue_name, prefetch_count, callback):
+ self.connection = Connection(url)
+ self.exchange = Exchange(
+ exchange_name,
+ type='topic',
+ durable=True,
+ )
+ self.queue_name = queue_name
+ self.prefetch_count = prefetch_count
+ self.callback = callback
+ self.queues = []
+ self._init_queues()
+
+ def _init_queues(self):
+ channel = self.connection.channel()
+ for cqrs_id in ReplicaRegistry.models.keys():
+ q = Queue(
+ self.queue_name,
+ exchange=self.exchange,
+ routing_key=cqrs_id,
+ )
+ q.maybe_bind(channel)
+ q.declare()
+ self.queues.append(q)
+
+ sync_q = Queue(
+ self.queue_name,
+ exchange=self.exchange,
+ routing_key='cqrs.{}.{}'.format(self.queue_name, cqrs_id),
+ )
+ sync_q.maybe_bind(channel)
+ sync_q.declare()
+ self.queues.append(sync_q)
+
+ def get_consumers(self, Consumer, channel):
+ return [
+ Consumer(
+ queues=self.queues,
+ callbacks=[self.callback],
+ prefetch_count=self.prefetch_count,
+ auto_declare=True,
+ ),
+ ]
+
+
+class KombuTransport(LoggingMixin, BaseTransport):
+ CONSUMER_RETRY_TIMEOUT = 5
+
+ @classmethod
+ def consume(cls):
+ queue_name, prefetch_count = cls._get_consumer_settings()
+ url, exchange_name = cls._get_common_settings()
+
+ consumer = _KombuConsumer(
+ url,
+ exchange_name,
+ queue_name,
+ prefetch_count,
+ cls._consume_message,
+ )
+ consumer.run()
+
+ @classmethod
+ def produce(cls, payload):
+ url, exchange_name = cls._get_common_settings()
+
+ connection = None
+ try:
+ # Decided not to create context-manager to stay within the class
+ connection, channel = cls._get_producer_kombu_objects(url, exchange_name)
+ exchange = cls._create_exchange(exchange_name)
+ cls._produce_message(channel, exchange, payload)
+ cls.log_produced(payload)
+ except KombuError:
+ logger.error("CQRS couldn't be published: pk = {} ({}).".format(
+ payload.pk, payload.cqrs_id,
+ ))
+ finally:
+ if connection:
+ connection.close()
+
+ @classmethod
+ def _consume_message(cls, body, message):
+ try:
+ dct = ujson.loads(body)
+ for key in ('signal_type', 'cqrs_id', 'instance_data'):
+ if key not in dct:
+ raise ValueError
+
+ if 'instance_pk' not in dct:
+ logger.warning('CQRS deprecated package structure.')
+
+ except ValueError:
+ logger.error("CQRS couldn't be parsed: {}.".format(body))
+ message.reject()
+ return
+
+ payload = TransportPayload(
+ dct['signal_type'], dct['cqrs_id'], dct['instance_data'], dct.get('instance_pk'),
+ previous_data=dct.get('previous_data'),
+ )
+
+ cls.log_consumed(payload)
+ instance = consumer.consume(payload)
+
+ if instance:
+ message.ack()
+ cls.log_consumed_accepted(payload)
+ else:
+ message.reject()
+ cls.log_consumed_denied(payload)
+
+ @classmethod
+ def _produce_message(cls, channel, exchange, payload):
+ routing_key = cls._get_produced_message_routing_key(payload)
+ producer = Producer(
+ channel,
+ exchange=exchange,
+ auto_declare=True,
+ )
+ producer.publish(
+ ujson.dumps(payload.to_dict()),
+ routing_key=routing_key,
+ mandatory=True,
+ content_type='text/plain',
+ delivery_mode=2,
+ )
+
+ @staticmethod
+ def _get_produced_message_routing_key(payload):
+ routing_key = payload.cqrs_id
+
+ if payload.signal_type == SignalType.SYNC and payload.queue:
+ routing_key = 'cqrs.{}.{}'.format(payload.queue, routing_key)
+
+ return routing_key
+
+ @classmethod
+ def _get_producer_kombu_objects(cls, url, exchange_name):
+ connection = Connection(url)
+ channel = connection.channel()
+ return connection, channel
+
+ @staticmethod
+ def _create_exchange(exchange_name):
+ return Exchange(
+ exchange_name,
+ type='topic',
+ durable=True,
+ )
+
+ @staticmethod
+ def _get_common_settings():
+ url = settings.CQRS.get('url', 'amqp://localhost')
+ exchange = settings.CQRS.get('exchange', 'cqrs')
+ return (
+ url,
+ exchange,
+ )
+
+ @staticmethod
+ def _get_consumer_settings():
+ queue_name = settings.CQRS['queue']
+ consumer_prefetch_count = settings.CQRS.get('consumer_prefetch_count', 10)
+ return (
+ queue_name,
+ consumer_prefetch_count,
+ )
diff --git a/dj_cqrs/transport/mixins.py b/dj_cqrs/transport/mixins.py
new file mode 100644
index 0000000..8c76c07
--- /dev/null
+++ b/dj_cqrs/transport/mixins.py
@@ -0,0 +1,38 @@
+import logging
+
+
+logger = logging.getLogger('django-cqrs')
+
+
+class LoggingMixin:
+
+ @staticmethod
+ def log_consumed(payload):
+ """
+ :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
+ """
+ if payload.pk:
+ logger.info('CQRS is received: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
+
+ @staticmethod
+ def log_consumed_accepted(payload):
+ """
+ :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
+ """
+ if payload.pk:
+ logger.info('CQRS is applied: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
+
+ @staticmethod
+ def log_consumed_denied(payload):
+ """
+ :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
+ """
+ if payload.pk:
+ logger.info('CQRS is denied: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
+
+ @staticmethod
+ def log_produced(payload):
+ """
+ :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
+ """
+ logger.info('CQRS is published: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py
index 9c2df21..20c736e 100644
--- a/dj_cqrs/transport/rabbit_mq.py
+++ b/dj_cqrs/transport/rabbit_mq.py
@@ -2,7 +2,10 @@
import logging
import time
+
from socket import gaierror
+from urllib.parse import unquote, urlparse
+
import ujson
from django.conf import settings
@@ -13,11 +16,12 @@
from dj_cqrs.dataclasses import TransportPayload
from dj_cqrs.registries import ReplicaRegistry
from dj_cqrs.transport import BaseTransport
+from dj_cqrs.transport.mixins import LoggingMixin
logger = logging.getLogger('django-cqrs')
-class RabbitMQTransport(BaseTransport):
+class RabbitMQTransport(LoggingMixin, BaseTransport):
CONSUMER_RETRY_TIMEOUT = 5
@classmethod
@@ -53,7 +57,7 @@ def produce(cls, payload):
connection, channel = cls._get_producer_rmq_objects(*rmq_settings)
cls._produce_message(channel, exchange, payload)
- cls._log_produced(payload)
+ cls.log_produced(payload)
except (exceptions.AMQPError, exceptions.ChannelError, exceptions.ReentrancyError):
logger.error("CQRS couldn't be published: pk = {} ({}).".format(
payload.pk, payload.cqrs_id,
@@ -83,15 +87,15 @@ def _consume_message(cls, ch, method, properties, body):
previous_data=dct.get('previous_data'),
)
- cls._log_consumed(payload)
+ cls.log_consumed(payload)
instance = consumer.consume(payload)
if instance:
ch.basic_ack(delivery_tag=method.delivery_tag)
- cls._log_consumed_accepted(payload)
+ cls.log_consumed_accepted(payload)
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
- cls._log_consumed_denied(payload)
+ cls.log_consumed_denied(payload)
@classmethod
def _produce_message(cls, channel, exchange, payload):
@@ -168,11 +172,30 @@ def _declare_exchange(channel, exchange):
)
@staticmethod
- def _get_common_settings():
- host = settings.CQRS.get('host', ConnectionParameters.DEFAULT_HOST)
- port = settings.CQRS.get('port', ConnectionParameters.DEFAULT_PORT)
- user = settings.CQRS.get('user', ConnectionParameters.DEFAULT_USERNAME)
- password = settings.CQRS.get('password', ConnectionParameters.DEFAULT_PASSWORD)
+ def _parse_url(url):
+ scheme = urlparse(url).scheme
+ schemeless = url[len(scheme) + 3:]
+ parts = urlparse('http://' + schemeless)
+ path = parts.path or ''
+ path = path[1:] if path and path[0] == '/' else path
+ assert scheme == 'amqp', \
+ 'Scheme must be "amqp" for RabbitMQTransport.'
+ return (
+ unquote(parts.hostname or '') or ConnectionParameters.DEFAULT_HOST,
+ parts.port or ConnectionParameters.DEFAULT_PORT,
+ unquote(parts.username or '') or ConnectionParameters.DEFAULT_USERNAME,
+ unquote(parts.password or '') or ConnectionParameters.DEFAULT_PASSWORD,
+ )
+
+ @classmethod
+ def _get_common_settings(cls):
+ if 'url' in settings.CQRS:
+ host, port, user, password = cls._parse_url(settings.CQRS.get('url'))
+ else:
+ host = settings.CQRS.get('host', ConnectionParameters.DEFAULT_HOST)
+ port = settings.CQRS.get('port', ConnectionParameters.DEFAULT_PORT)
+ user = settings.CQRS.get('user', ConnectionParameters.DEFAULT_USERNAME)
+ password = settings.CQRS.get('password', ConnectionParameters.DEFAULT_PASSWORD)
exchange = settings.CQRS.get('exchange', 'cqrs')
return (
host,
@@ -189,34 +212,3 @@ def _get_consumer_settings():
queue_name,
consumer_prefetch_count,
)
-
- @staticmethod
- def _log_consumed(payload):
- """
- :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
- """
- if payload.pk:
- logger.info('CQRS is received: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
-
- @staticmethod
- def _log_consumed_accepted(payload):
- """
- :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
- """
- if payload.pk:
- logger.info('CQRS is applied: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
-
- @staticmethod
- def _log_consumed_denied(payload):
- """
- :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
- """
- if payload.pk:
- logger.info('CQRS is denied: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
-
- @staticmethod
- def _log_produced(payload):
- """
- :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
- """
- logger.info('CQRS is published: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
diff --git a/docs/getting_started.rst b/docs/getting_started.rst
index ffd6a71..7d7e513 100644
--- a/docs/getting_started.rst
+++ b/docs/getting_started.rst
@@ -6,6 +6,7 @@ Getting started
This guide assumes that you have at least a single instance of `RabbitMQ `_
up and running.
+ For other messaging brokers/transports please see :ref:`transports`.
@@ -16,6 +17,7 @@ Requirements
* Django >= 1.11.20
* pika 1.1.0
+ * kombu 4.6
* ujson 3.0.0
* django-model-utils 4.0.0
@@ -54,10 +56,7 @@ and add the `django-cqrs` configuration:
CQRS = {
'transport': 'dj_cqrs.transport.RabbitMQTransport',
- 'host': RABBITMQ_HOST,
- 'port': RABBITMQ_PORT,
- 'user': RABBITMQ_USERNAME,
- 'password': RABBITMQ_PASSWORD,
+ 'url': 'amqp://guest:guest@rabbit:5672/'
}
@@ -126,15 +125,12 @@ Add dj_cqrs to Django ``INSTALLED_APPS``:
and add the `django-cqrs` configuration:
.. code-block:: python
- :emphasize-lines: 3
+ :emphasize-lines: 4
CQRS = {
'transport': 'dj_cqrs.transport.RabbitMQTransport',
+ 'url': 'amqp://guest:guest@rabbit:5672/',
'queue': 'my_replica', # Each replica service must have a unique queue.
- 'host': RABBITMQ_HOST,
- 'port': RABBITMQ_PORT,
- 'user': RABBITMQ_USERNAME,
- 'password': RABBITMQ_PASSWORD,
}
diff --git a/docs/index.rst b/docs/index.rst
index 585cfde..ae8e509 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -34,6 +34,7 @@ and that's okay for non-critical business transactions.
getting_started
custom_serialization
track_fields_changes
+ transports
utilities
reference
diff --git a/docs/reference.rst b/docs/reference.rst
index 585f0de..a78eb57 100644
--- a/docs/reference.rst
+++ b/docs/reference.rst
@@ -43,10 +43,13 @@ Signals
Transports
----------
-.. autoclass:: dj_cqrs.transport.base.BaseTransport
+.. autoclass:: dj_cqrs.transport.BaseTransport
:members:
-.. autoclass:: dj_cqrs.transport.rabbit_mq.RabbitMQTransport
+.. autoclass:: dj_cqrs.transport.RabbitMQTransport
+ :members:
+
+.. autoclass:: dj_cqrs.transport.KombuTransport
:members:
.. autoclass:: dj_cqrs.constants.SignalType
diff --git a/docs/transports.rst b/docs/transports.rst
new file mode 100644
index 0000000..a7ef1ce
--- /dev/null
+++ b/docs/transports.rst
@@ -0,0 +1,53 @@
+.. _transports:
+
+Transports
+==========
+
+`django-cqrs` ships with two transport that allow users to choose the messaging broker
+that best fit their needs.
+
+
+RabbitMQ transport
+------------------
+
+
+The :class:`dj_cqrs.transport.RabbitMQTransport` transport
+is based on the `pika `_ messaging library.
+
+To configure the ``RabbitMQTransport`` you must provide the rabbitmq connection url:
+
+.. code-block:: python
+
+ CQRS = {
+ 'transport': 'dj_cqrs.transport.RabbitMQTransport',
+ 'url': 'amqp://guest:guest@rabbit:5672/'
+ }
+
+.. warning::
+
+ Previous versions of the ``RabbitMQTransport`` use the attributes
+ ``host``, ``port``, ``user``, ``password`` to configure the connection
+ with rabbitmq. These attributes are deprecated and will be removed in
+ future versions of `django-cqrs`.
+
+
+Kombu transport
+---------------
+
+The :class:`dj_cqrs.transport.KombuTransport` transport
+is based on the `kombu `_ messaging library.
+
+Kombu supports different messaging brokers like RabbitMQ, Redis, Amazon SQS etc.
+
+To configure the ``KombuTransport`` you must provide the rabbitmq connection url:
+
+.. code-block:: python
+
+ CQRS = {
+ 'transport': 'dj_cqrs.transport.KombuTransport',
+ 'url': 'redis://redis:6379/'
+ }
+
+Please read `https://kombu.readthedocs.io/en/master/introduction.html#transport-comparison `_
+and `https://kombu.readthedocs.io/en/master/userguide/connections.html#urls `_ for
+more information on supported brokers and configuration urls.
diff --git a/integration_tests/Dockerfile.Master b/integration_tests/Dockerfile.Master
index d989297..d7bc834 100644
--- a/integration_tests/Dockerfile.Master
+++ b/integration_tests/Dockerfile.Master
@@ -1,11 +1,22 @@
FROM python:3.8
+
+ENV DOCKERIZE_VERSION v0.6.1
+RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz
+
+
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH /master
RUN mkdir /master
+COPY ./requirements/ /master/requirements
+
+RUN pip install -r /master/requirements/dev.txt -r /master/requirements/test.txt && pip install psycopg2-binary redis
+
COPY . /master/
ADD integration_tests/setup.cfg /master/
-
-RUN pip install -r /master/requirements/dev.txt -r /master/requirements/test.txt && pip install psycopg2-binary
+ADD integration_tests/run_integration_tests.sh /master/
+RUN chmod +x /master/run_integration_tests.sh
WORKDIR /master/
diff --git a/integration_tests/Dockerfile.MasterV1 b/integration_tests/Dockerfile.MasterV1
index 2eccac1..2cd5cb8 100644
--- a/integration_tests/Dockerfile.MasterV1
+++ b/integration_tests/Dockerfile.MasterV1
@@ -1,15 +1,22 @@
FROM python:3.8
+
+ENV DOCKERIZE_VERSION v0.6.1
+RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz
+
+
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH /master
RUN mkdir /master
+COPY ./requirements/ /master/requirements
-COPY ./requirements /master/requirements
-COPY ./tests /master/tests
-COPY ./integration_tests /master/integration_tests
-ADD integration_tests/setup.cfg /master/
+RUN pip install -r /master/requirements/dev.txt -r /master/requirements/test.txt && pip install psycopg2-binary redis django-cqrs==1.3.1
-RUN pip install -r /master/requirements/dev.txt -r /master/requirements/test.txt \
- && pip install psycopg2-binary django-cqrs==1.3.1
+COPY . /master/
+ADD integration_tests/setup.cfg /master/
+ADD integration_tests/run_integration_tests.sh /master/
+RUN chmod +x /master/run_integration_tests.sh
WORKDIR /master/
diff --git a/integration_tests/Dockerfile.Replica b/integration_tests/Dockerfile.Replica
index ce30279..06982f1 100644
--- a/integration_tests/Dockerfile.Replica
+++ b/integration_tests/Dockerfile.Replica
@@ -1,11 +1,19 @@
FROM python:3.8
+
+ENV DOCKERIZE_VERSION v0.6.1
+RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz
+
+
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH /replica
RUN mkdir /replica
+COPY ./requirements/ /replica/requirements
+RUN pip install -r /replica/requirements/dev.txt -r /replica/requirements/test.txt && pip install psycopg2-binary redis
+
COPY . /replica/
ADD integration_tests/manage.py /replica/
-RUN pip install -r /replica/requirements/dev.txt -r /replica/requirements/test.txt && pip install psycopg2-binary
-
WORKDIR /replica/
diff --git a/integration_tests/Dockerfile.ReplicaV1 b/integration_tests/Dockerfile.ReplicaV1
index 11f9a16..33fbcc1 100644
--- a/integration_tests/Dockerfile.ReplicaV1
+++ b/integration_tests/Dockerfile.ReplicaV1
@@ -1,14 +1,19 @@
FROM python:3.8
+
+ENV DOCKERIZE_VERSION v0.6.1
+RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
+ && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz
+
+
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH /replica
RUN mkdir /replica
-COPY ./requirements /replica/requirements
-COPY ./tests /replica/tests
-COPY ./integration_tests /replica/integration_tests
-ADD integration_tests/manage.py /replica/
+COPY ./requirements/ /replica/requirements
+RUN pip install -r /replica/requirements/dev.txt -r /replica/requirements/test.txt && pip install psycopg2-binary redis django-cqrs==1.3.1
-RUN pip install -r /replica/requirements/dev.txt -r /replica/requirements/test.txt \
- && pip install psycopg2-binary django-cqrs==1.3.1
+COPY . /replica/
+ADD integration_tests/manage.py /replica/
WORKDIR /replica/
diff --git a/integration_tests/Makefile b/integration_tests/Makefile
index b0a11a2..edf76f2 100644
--- a/integration_tests/Makefile
+++ b/integration_tests/Makefile
@@ -1,13 +1,8 @@
.PHONY: build test
-.DEFAULT_GOAL := current
+.DEFAULT_GOAL := pika
-stop:
- @echo "Stopping running containers..."
- docker-compose -f docker-compose.yml -f masterV1.yml -f replicaV1.yml down --remove-orphans
- @echo "Done!"
-
-build_current:
+build:
docker-compose build
build_master_v1:
@@ -16,13 +11,32 @@ build_master_v1:
build_replica_v1:
docker-compose -f docker-compose.yml -f replicaV1.yml build
-current: build_current
+pika: build
+ @echo "Run PIKA integration tests..."
docker-compose run master
+ @echo "Stopping running containers..."
+ docker-compose down --remove-orphans
+ @echo "Done!"
+
+kombu: build
+ @echo "Run KOMBU integration tests..."
+ docker-compose -f docker-compose.yml -f kombu.yml run master
+ @echo "Stopping running containers..."
+ docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans
+ @echo "Done!"
master_v1: build_master_v1
+ @echo "Run regression tests Master v1.3.1..."
docker-compose -f docker-compose.yml -f masterV1.yml run master
+ @echo "Stopping running containers..."
+ docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans
+ @echo "Done!"
replica_v1: build_replica_v1
+ @echo "Run regression tests Replica v1.3.1..."
docker-compose -f docker-compose.yml -f replicaV1.yml run master
+ @echo "Stopping running containers..."
+ docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans
+ @echo "Done!"
-all: build_master_v1 build_replica_v1 current master_v1 replica_v1
+all: pika kombu master_v1 replica_v1
diff --git a/integration_tests/docker-compose.yml b/integration_tests/docker-compose.yml
index 087fa27..59a485c 100644
--- a/integration_tests/docker-compose.yml
+++ b/integration_tests/docker-compose.yml
@@ -2,19 +2,16 @@ version: '3'
services:
- rabbit:
+ mq:
image: rabbitmq:latest
- expose:
- - '5672'
postgres:
image: postgres:latest
- expose:
- - '5432'
environment:
- POSTGRES_USER: user
- POSTGRES_PASSWORD: pswd
- POSTGRES_DB: replica
+ - POSTGRES_HOST=postgres
+ - POSTGRES_USER=user
+ - POSTGRES_PASSWORD=pswd
+ - POSTGRES_DB=replica
replica:
build:
@@ -23,35 +20,40 @@ services:
restart: always
command: >
bash -c "
- sleep 10 &&
+ dockerize -wait tcp://mq:5672 -wait tcp://postgres:5432 -timeout 60s &&
python manage.py makemigrations --settings=integration_tests.replica_settings &&
python manage.py makemigrations dj_replica --settings=integration_tests.replica_settings &&
python manage.py migrate --settings=integration_tests.replica_settings &&
python manage.py cqrs_consume -w 2 --settings=integration_tests.replica_settings
"
- container_name: django_cqrs_test_replica
depends_on:
- - rabbit
+ - mq
- postgres
volumes:
- ../dj_cqrs:/replica/dj_cqrs
+ environment:
+ - POSTGRES_HOST=postgres
+ - POSTGRES_USER=user
+ - POSTGRES_PASSWORD=pswd
+ - POSTGRES_DB=replica
+ - CQRS_REPLICA_TRANSPORT=tests.dj.transport.RabbitMQTransportWithEvents
+ - CQRS_BROKER_URL=amqp://mq:5672/
master:
build:
context: ..
dockerfile: integration_tests/Dockerfile.Master
command: >
- bash -c "
- sleep 12 &&
- echo '####################################################' &&
- echo 'Running integration tests for current version (v2) ' &&
- echo '####################################################' &&
- pytest integration_tests/
- "
- container_name: django_cqrs_test_master
+ bash -c "
+ dockerize -wait tcp://mq:5672 -wait tcp://postgres:5432 -timeout 60s &&
+ ./run_integration_tests.sh
+ "
depends_on:
- - rabbit
+ - mq
- replica
volumes:
- ./tests/:/master/integration_tests/tests
- ../dj_cqrs:/master/dj_cqrs
+ environment:
+ - CQRS_MASTER_TRANSPORT=dj_cqrs.transport.RabbitMQTransport
+ - CQRS_BROKER_URL=amqp://mq:5672/
\ No newline at end of file
diff --git a/integration_tests/kombu.yml b/integration_tests/kombu.yml
new file mode 100644
index 0000000..be20549
--- /dev/null
+++ b/integration_tests/kombu.yml
@@ -0,0 +1,66 @@
+version: '3'
+
+services:
+
+ mq:
+ image: redis:latest
+ expose:
+ - '6379'
+
+ postgres:
+ image: postgres:latest
+ expose:
+ - '5432'
+ environment:
+ - POSTGRES_HOST=postgres
+ - POSTGRES_USER=user
+ - POSTGRES_PASSWORD=pswd
+ - POSTGRES_DB=replica
+
+ replica:
+ build:
+ context: ..
+ dockerfile: integration_tests/Dockerfile.Replica
+ restart: always
+ command: >
+ bash -c "
+ dockerize -wait tcp://mq:6379 -wait tcp://postgres:5432 -timeout 60s &&
+ python manage.py makemigrations --settings=integration_tests.replica_settings &&
+ python manage.py makemigrations dj_replica --settings=integration_tests.replica_settings &&
+ python manage.py migrate --settings=integration_tests.replica_settings &&
+ python manage.py cqrs_consume -w 2 --settings=integration_tests.replica_settings
+ "
+ container_name: django_cqrs_test_replica
+ depends_on:
+ - mq
+ - postgres
+ volumes:
+ - ../dj_cqrs:/replica/dj_cqrs
+ environment:
+ - POSTGRES_HOST=postgres
+ - POSTGRES_USER=user
+ - POSTGRES_PASSWORD=pswd
+ - POSTGRES_DB=replica
+ - CQRS_REPLICA_TRANSPORT=tests.dj.transport.KombuTransportWithEvents
+ - CQRS_BROKER_URL=redis://mq:6379/
+
+ master:
+ build:
+ context: ..
+ dockerfile: integration_tests/Dockerfile.Master
+ command: >
+ bash -c "
+ dockerize -wait tcp://mq:6379 -wait tcp://postgres:5432 -timeout 60s &&
+ ./run_integration_tests.sh
+ "
+
+ container_name: django_cqrs_test_master
+ depends_on:
+ - mq
+ - replica
+ volumes:
+ - ./tests/:/master/integration_tests/tests
+ - ../dj_cqrs:/master/dj_cqrs
+ environment:
+ - CQRS_MASTER_TRANSPORT=dj_cqrs.transport.KombuTransport
+ - CQRS_BROKER_URL=redis://mq:6379/
\ No newline at end of file
diff --git a/integration_tests/masterV1.yml b/integration_tests/masterV1.yml
index fbeb690..557d95d 100644
--- a/integration_tests/masterV1.yml
+++ b/integration_tests/masterV1.yml
@@ -2,16 +2,24 @@ version: '3'
services:
master:
- build:
- context: ..
- dockerfile: integration_tests/Dockerfile.MasterV1
- image: django_cqrs_test_maste_v1
- container_name: django_cqrs_test_masterV1
- command: >
- bash -c "
- sleep 12 &&
- echo '####################################################' &&
- echo 'Running integration tests Master v1.3.1 - Replica v2' &&
- echo '####################################################' &&
- pytest integration_tests/
- "
+ build:
+ context: ..
+ dockerfile: integration_tests/Dockerfile.Master
+ command: >
+ bash -c "
+ dockerize -wait tcp://mq:5672 -wait tcp://postgres:5432 -timeout 60s &&
+ echo '########################################################' &&
+ echo ' Running compat tests Master v1.3.1 - Replica latest ' &&
+ echo '########################################################' &&
+ ./run_integration_tests.sh
+ "
+ depends_on:
+ - mq
+ - replica
+ volumes:
+ - ./tests/:/master/integration_tests/tests
+ - ../dj_cqrs:/master/dj_cqrs
+ environment:
+ - CQRS_MASTER_TRANSPORT=dj_cqrs.transport.RabbitMQTransport
+ - CQRS_BROKER_URL=amqp://mq:5672/
+
diff --git a/integration_tests/master_settings.py b/integration_tests/master_settings.py
index 2d13d32..abceb4d 100644
--- a/integration_tests/master_settings.py
+++ b/integration_tests/master_settings.py
@@ -53,6 +53,6 @@
USE_TZ = True
CQRS = {
- 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
- 'host': 'rabbit',
+ 'transport': os.getenv('CQRS_MASTER_TRANSPORT'),
+ 'url': os.getenv('CQRS_BROKER_URL'),
}
diff --git a/integration_tests/replicaV1.yml b/integration_tests/replicaV1.yml
index 5c096fc..53cf637 100644
--- a/integration_tests/replicaV1.yml
+++ b/integration_tests/replicaV1.yml
@@ -3,30 +3,49 @@ version: '3'
services:
master:
+ build:
+ context: ..
+ dockerfile: integration_tests/Dockerfile.Master
command: >
bash -c "
- sleep 12 &&
- echo '####################################################' &&
- echo 'Running integration tests Master v2 - Replica v1.3.1' &&
- echo '####################################################' &&
- pytest integration_tests/
+ dockerize -wait tcp://mq:5672 -wait tcp://postgres:5432 -timeout 60s &&
+ echo '########################################################' &&
+ echo ' Running compat tests Master latest - Replica v1.3.1 ' &&
+ echo '########################################################' &&
+ ./run_integration_tests.sh
"
+ depends_on:
+ - mq
+ - replica
+ volumes:
+ - ./tests/:/master/integration_tests/tests
+ - ../dj_cqrs:/master/dj_cqrs
+ environment:
+ - CQRS_MASTER_TRANSPORT=dj_cqrs.transport.RabbitMQTransport
+ - CQRS_BROKER_URL=amqp://mq:5672/
replica:
build:
context: ..
dockerfile: integration_tests/Dockerfile.ReplicaV1
image: django_cqrs_test_replica_v1
- restart: always
command: >
bash -c "
- sleep 10 &&
+ dockerize -wait tcp://mq:5672 -wait tcp://postgres:5432 -timeout 60s &&
python manage.py makemigrations --settings=integration_tests.replica_settings &&
python manage.py makemigrations dj_replica --settings=integration_tests.replica_settings &&
python manage.py migrate --settings=integration_tests.replica_settings &&
python manage.py cqrs_consume -w 2 --settings=integration_tests.replica_settings
"
- container_name: django_cqrs_test_replicaV1
depends_on:
- - rabbit
+ - mq
- postgres
+ volumes:
+ - ../dj_cqrs:/replica/dj_cqrs
+ environment:
+ - POSTGRES_HOST=postgres
+ - POSTGRES_USER=user
+ - POSTGRES_PASSWORD=pswd
+ - POSTGRES_DB=replica
+ - CQRS_REPLICA_TRANSPORT=tests.dj.transport.RabbitMQTransportWithEvents
+ - CQRS_BROKER_URL=amqp://mq:5672/
diff --git a/integration_tests/replica_settings.py b/integration_tests/replica_settings.py
index c9dc7e9..6a13d27 100644
--- a/integration_tests/replica_settings.py
+++ b/integration_tests/replica_settings.py
@@ -38,10 +38,10 @@
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
- 'HOST': 'postgres',
- 'NAME': 'replica',
- 'USER': 'user',
- 'PASSWORD': 'pswd',
+ 'HOST': os.getenv('POSTGRES_HOST', 'postgres'),
+ 'NAME': os.getenv('POSTGRES_DB', 'replica'),
+ 'USER': os.getenv('POSTGRES_USER', 'user'),
+ 'PASSWORD': os.getenv('POSTGRES_PASSWORD', 'pswd'),
}
}
@@ -56,8 +56,8 @@
USE_TZ = True
CQRS = {
- 'transport': 'tests.dj.transport.RabbitMQTransportWithEvents',
- 'host': 'rabbit',
+ 'transport': os.getenv('CQRS_REPLICA_TRANSPORT'),
+ 'url': os.getenv('CQRS_BROKER_URL'),
'consumer_prefetch_count': 2,
'queue': 'replica',
}
diff --git a/integration_tests/run_integration_tests.sh b/integration_tests/run_integration_tests.sh
new file mode 100644
index 0000000..0c7a0f4
--- /dev/null
+++ b/integration_tests/run_integration_tests.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+
+echo "####################################################"
+echo " Running tests: "
+echo
+echo " -> transport: $CQRS_MASTER_TRANSPORT "
+echo " -> url: $CQRS_BROKER_URL "
+echo
+echo "####################################################"
+
+sleep 12
+
+pytest integration_tests/
diff --git a/requirements/dev.txt b/requirements/dev.txt
index 9ca2137..b70bd51 100644
--- a/requirements/dev.txt
+++ b/requirements/dev.txt
@@ -1,4 +1,5 @@
Django >= 1.11.20
pika>=1.0.0
+kombu==4.6.*
ujson==3.0.0
django-model-utils==4.0.0
diff --git a/tests/dj/transport.py b/tests/dj/transport.py
index b442cca..b786cf5 100644
--- a/tests/dj/transport.py
+++ b/tests/dj/transport.py
@@ -4,6 +4,7 @@
from dj_cqrs.controller import consumer
from dj_cqrs.transport.base import BaseTransport
+from dj_cqrs.transport.kombu import KombuTransport
from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
@@ -19,7 +20,18 @@ def consume(payload):
class RabbitMQTransportWithEvents(RabbitMQTransport):
@staticmethod
- def _log_consumed(payload):
+ def log_consumed(payload):
+ from tests.dj_replica.models import Event
+ Event.objects.create(
+ pid=os.getpid(),
+ cqrs_id=payload.cqrs_id,
+ cqrs_revision=int(payload.instance_data['cqrs_revision']),
+ )
+
+
+class KombuTransportWithEvents(KombuTransport):
+ @staticmethod
+ def log_consumed(payload):
from tests.dj_replica.models import Event
Event.objects.create(
pid=os.getpid(),
diff --git a/tests/test_transport/test_kombu.py b/tests/test_transport/test_kombu.py
new file mode 100644
index 0000000..155ff35
--- /dev/null
+++ b/tests/test_transport/test_kombu.py
@@ -0,0 +1,332 @@
+# Copyright © 2020 Ingram Micro Inc. All rights reserved.
+
+import logging
+import ujson
+from importlib import import_module
+
+import pytest
+from kombu.exceptions import KombuError
+from six.moves import reload_module
+
+from dj_cqrs.constants import SignalType
+from dj_cqrs.dataclasses import TransportPayload
+from dj_cqrs.registries import ReplicaRegistry
+from dj_cqrs.transport.kombu import _KombuConsumer, KombuTransport
+
+
+class PublicKombuTransport(KombuTransport):
+ @classmethod
+ def get_common_settings(cls):
+ return cls._get_common_settings()
+
+ @classmethod
+ def get_consumer_settings(cls):
+ return cls._get_consumer_settings()
+
+ @classmethod
+ def consume_message(cls, *args):
+ return cls._consume_message(*args)
+
+ @classmethod
+ def produce_message(cls, *args):
+ return cls._produce_message(*args)
+
+ @classmethod
+ def create_exchange(cls, *args):
+ return cls._create_exchange(*args)
+
+
+def test_default_settings(settings):
+ settings.CQRS = {
+ 'transport': 'dj_cqrs.transport.kombu.KombuTransport',
+ }
+ s = PublicKombuTransport.get_common_settings()
+ assert s[0] == 'amqp://localhost'
+ assert s[1] == 'cqrs'
+
+
+def test_non_default_settings(settings, caplog):
+ settings.CQRS = {
+ 'url': 'redis://localhost:6379',
+ 'exchange': 'exchange',
+ }
+
+ s = PublicKombuTransport.get_common_settings()
+ assert s[0] == 'redis://localhost:6379'
+ assert s[1] == 'exchange'
+
+
+def test_consumer_default_settings():
+ s = PublicKombuTransport.get_consumer_settings()
+ assert s[0] is None
+ assert s[1] == 10
+
+
+def test_consumer_non_default_settings(settings):
+ settings.CQRS = {
+ 'transport': 'dj_cqrs.transport.kombu.KombuTransport',
+ 'queue': 'q',
+ 'consumer_prefetch_count': 2,
+ }
+
+ s = PublicKombuTransport.get_consumer_settings()
+ assert s[0] == 'q'
+ assert s[1] == 2
+
+
+@pytest.fixture
+def kombu_transport(settings):
+ settings.CQRS = {
+ 'transport': 'dj_cqrs.transport.kombu.KombuTransport',
+ 'queue': 'replica',
+ }
+ module = reload_module(import_module('dj_cqrs.transport'))
+ yield module.current_transport
+
+
+def kombu_error(*args, **kwargs):
+ raise KombuError()
+
+
+def test_produce_connection_error(kombu_transport, mocker, caplog):
+ mocker.patch.object(KombuTransport, '_get_producer_kombu_objects', side_effect=kombu_error)
+
+ kombu_transport.produce(
+ TransportPayload(
+ SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1,
+ ),
+ )
+ assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
+
+
+def test_produce_publish_error(kombu_transport, mocker, caplog):
+ mocker.patch.object(
+ KombuTransport, '_get_producer_kombu_objects', return_value=(mocker.MagicMock(), None),
+ )
+ mocker.patch.object(KombuTransport, '_produce_message', side_effect=kombu_error)
+
+ kombu_transport.produce(
+ TransportPayload(
+ SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1,
+ ),
+ )
+ assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
+
+
+def test_produce_ok(kombu_transport, mocker, caplog):
+ caplog.set_level(logging.INFO)
+ mocker.patch.object(
+ KombuTransport, '_get_producer_kombu_objects', return_value=(mocker.MagicMock(), None),
+ )
+ mocker.patch.object(KombuTransport, '_produce_message', return_value=True)
+
+ kombu_transport.produce(
+ TransportPayload(
+ SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1,
+ ),
+ )
+ assert 'CQRS is published: pk = 1 (CQRS_ID).' in caplog.text
+
+
+def test_produce_message_ok(mocker):
+ channel = mocker.MagicMock()
+ payload = TransportPayload(
+ SignalType.SAVE, 'cqrs_id', {}, 'id', previous_data={'e': 'f'},
+ )
+ exchange = PublicKombuTransport.create_exchange('exchange')
+
+ PublicKombuTransport.produce_message(channel, exchange, payload)
+ assert channel.basic_publish.call_count == 1
+
+ prepare_message_args = channel.prepare_message.call_args[0]
+ basic_publish_kwargs = channel.basic_publish.call_args[1]
+
+ assert ujson.loads(prepare_message_args[0]) == \
+ {
+ 'signal_type': SignalType.SAVE,
+ 'cqrs_id': 'cqrs_id',
+ 'instance_data': {},
+ 'instance_pk': 'id',
+ 'previous_data': {'e': 'f'},
+ }
+
+ assert prepare_message_args[2] == 'text/plain'
+ assert prepare_message_args[5]['delivery_mode'] == 2
+
+ assert basic_publish_kwargs['exchange'] == 'exchange'
+ assert basic_publish_kwargs['mandatory']
+ assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
+
+
+def test_produce_sync_message_no_queue(mocker):
+ channel = mocker.MagicMock()
+ payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, None)
+
+ exchange = PublicKombuTransport.create_exchange('exchange')
+
+ PublicKombuTransport.produce_message(channel, exchange, payload)
+
+ prepare_message_args = channel.prepare_message.call_args[0]
+ basic_publish_kwargs = channel.basic_publish.call_args[1]
+
+ assert ujson.loads(prepare_message_args[0]) == \
+ {
+ 'signal_type': SignalType.SYNC,
+ 'cqrs_id': 'cqrs_id',
+ 'instance_data': {},
+ 'instance_pk': None,
+ 'previous_data': None,
+ }
+ assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
+
+
+def test_produce_sync_message_queue(mocker):
+ channel = mocker.MagicMock()
+ payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, 'id', 'queue')
+
+ exchange = PublicKombuTransport.create_exchange('exchange')
+
+ PublicKombuTransport.produce_message(channel, exchange, payload)
+
+ prepare_message_args = channel.prepare_message.call_args[0]
+ basic_publish_kwargs = channel.basic_publish.call_args[1]
+ assert ujson.loads(prepare_message_args[0]) == \
+ {
+ 'signal_type': SignalType.SYNC,
+ 'cqrs_id': 'cqrs_id',
+ 'instance_data': {},
+ 'instance_pk': 'id',
+ 'previous_data': None,
+ }
+ assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id'
+
+
+def test_consume_message_ack(mocker, caplog):
+ caplog.set_level(logging.INFO)
+ consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume')
+ message_mock = mocker.MagicMock()
+
+ PublicKombuTransport.consume_message(
+ '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
+ '"instance_pk":1, "previous_data":{}}',
+ message_mock,
+ )
+
+ assert consumer_mock.call_count == 1
+ assert message_mock.ack.call_count == 1
+
+ payload = consumer_mock.call_args[0][0]
+ assert payload.signal_type == 'signal'
+ assert payload.cqrs_id == 'cqrs_id'
+ assert payload.instance_data == {}
+ assert payload.previous_data == {}
+ assert payload.pk == 1
+
+ assert 'CQRS is received: pk = 1 (cqrs_id).' in caplog.text
+ assert 'CQRS is applied: pk = 1 (cqrs_id).' in caplog.text
+
+
+def test_consume_message_ack_deprecated_structure(mocker, caplog):
+ caplog.set_level(logging.INFO)
+ consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume')
+
+ PublicKombuTransport.consume_message(
+ '{"signal_type":"signal","cqrs_id":"cqrs_id",'
+ '"instance_data":{},"previous_data":null}',
+ mocker.MagicMock(),
+ )
+
+ assert consumer_mock.call_count == 1
+
+ payload = consumer_mock.call_args[0][0]
+ assert payload.signal_type == 'signal'
+ assert payload.cqrs_id == 'cqrs_id'
+ assert payload.instance_data == {}
+ assert payload.pk is None
+
+ assert 'CQRS deprecated package structure.' in caplog.text
+ assert 'CQRS is received: pk = None (cqrs_id).' not in caplog.text
+ assert 'CQRS is applied: pk = None (cqrs_id).' not in caplog.text
+
+
+def test_consume_message_nack(mocker, caplog):
+ caplog.set_level(logging.INFO)
+ mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
+ message_mock = mocker.MagicMock()
+ PublicKombuTransport.consume_message(
+ '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
+ '"instance_pk":1,"previous_data":null}',
+ message_mock,
+ )
+
+ assert message_mock.reject.call_count == 1
+
+ assert 'CQRS is received: pk = 1 (cqrs_id).' in caplog.text
+ assert 'CQRS is denied: pk = 1 (cqrs_id).' in caplog.text
+
+
+def test_consume_message_nack_deprecated_structure(mocker, caplog):
+ caplog.set_level(logging.INFO)
+ mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
+
+ PublicKombuTransport.consume_message(
+ '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{}}',
+ mocker.MagicMock(),
+ )
+
+ assert 'CQRS is received: pk = 1 (cqrs_id).' not in caplog.text
+ assert 'CQRS is denied: pk = 1 (cqrs_id).' not in caplog.text
+
+
+def test_consume_message_json_parsing_error(mocker, caplog):
+ PublicKombuTransport.consume_message(
+ '{bad_payload:',
+ mocker.MagicMock(),
+ )
+
+ assert ": {bad_payload:." in caplog.text
+
+
+def test_consume_message_package_structure_error(mocker, caplog):
+ PublicKombuTransport.consume_message(
+ '{"pk":"1"}',
+ mocker.MagicMock(),
+ )
+
+ assert """CQRS couldn't be parsed: {"pk":"1"}""" in caplog.text
+
+
+def test_consumer_queues(mocker):
+ mocker.patch('dj_cqrs.transport.kombu.Connection')
+
+ def callback(body, message):
+ pass
+
+ c = _KombuConsumer('amqp://localhost', 'cqrs', 'cqrs_queue', 2, callback)
+
+ assert len(c.queues) == len(ReplicaRegistry.models) * 2
+
+
+def test_consumer_consumers(mocker):
+ mocker.patch('dj_cqrs.transport.kombu.Connection')
+
+ def callback(body, message):
+ pass
+
+ c = _KombuConsumer('amqp://localhost', 'cqrs', 'cqrs_queue', 2, callback)
+
+ consumers = c.get_consumers(mocker.MagicMock, None)
+ assert len(consumers) == 1
+ consumer = consumers[0]
+ assert consumer.queues == c.queues
+ assert consumer.callbacks[0] == callback
+ assert consumer.prefetch_count == 2
+
+
+def test_consumer_run(mocker):
+ mocker.patch('dj_cqrs.transport.kombu.Connection')
+ mocked_run = mocker.patch.object(_KombuConsumer, 'run')
+
+ PublicKombuTransport.consume()
+
+ mocked_run.assert_called_once()
diff --git a/tests/test_transport/test_rabbit_mq.py b/tests/test_transport/test_rabbit_mq.py
index 94980f9..2c91b49 100644
--- a/tests/test_transport/test_rabbit_mq.py
+++ b/tests/test_transport/test_rabbit_mq.py
@@ -41,7 +41,7 @@ def test_default_settings():
assert s[3] == 'cqrs'
-def test_non_default_settings(settings):
+def test_non_default_settings(settings, caplog):
settings.CQRS = {
'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
'host': 'rabbit',
@@ -58,6 +58,42 @@ def test_non_default_settings(settings):
assert s[3] == 'exchange'
+def test_default_url_settings(settings):
+ settings.CQRS = {
+ 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
+ 'url': 'amqp://localhost'
+ }
+ s = PublicRabbitMQTransport.get_common_settings()
+ assert s[0] == 'localhost'
+ assert s[1] == 5672
+ assert s[2].username == 'guest' and s[2].password == 'guest'
+ assert s[3] == 'cqrs'
+
+
+def test_non_default_url_settings(settings):
+ settings.CQRS = {
+ 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
+ 'url': 'amqp://usr:pswd@rabbit:8000',
+ 'exchange': 'exchange',
+ }
+ s = PublicRabbitMQTransport.get_common_settings()
+ assert s[0] == 'rabbit'
+ assert s[1] == 8000
+ assert s[2].username == 'usr' and s[2].password == 'pswd'
+ assert s[3] == 'exchange'
+
+
+def test_invalid_url_settings(settings):
+ settings.CQRS = {
+ 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
+ 'url': 'rabbit://localhost'
+ }
+ with pytest.raises(AssertionError) as ei:
+ PublicRabbitMQTransport.get_common_settings()
+
+ assert ei.match('Scheme must be "amqp" for RabbitMQTransport.')
+
+
def test_consumer_default_settings():
s = PublicRabbitMQTransport.get_consumer_settings()
assert s[0] is None
diff --git a/travis_compat_tests.sh b/travis_compat_tests.sh
new file mode 100755
index 0000000..531d6a0
--- /dev/null
+++ b/travis_compat_tests.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+
+set -e
+
+if [ "$COMPAT_TESTS" == "yes" ]; then
+ echo "Running backward compatibility tests....."
+ cd integration_tests
+ docker-compose -f docker-compose.yml -f masterV1.yml build
+ docker-compose -f docker-compose.yml -f masterV1.yml run master
+ docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans
+
+ docker-compose -f docker-compose.yml -f replicaV1.yml build
+ docker-compose -f docker-compose.yml -f replicaV1.yml run master
+ docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans
+ cd ..
+ echo "Done!"
+else
+ echo "Skip backward compatibility tests..."
+fi
diff --git a/travis_integration_tests.sh b/travis_integration_tests.sh
new file mode 100755
index 0000000..25e794a
--- /dev/null
+++ b/travis_integration_tests.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+set -e
+
+if [ "$INTEGRATION_TESTS" == "yes" ]; then
+ echo "Running integration tests....."
+ cd integration_tests
+ docker-compose build
+ docker-compose run master
+ docker-compose down --remove-orphans
+ docker-compose -f docker-compose.yml -f kombu.yml run master
+ docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans
+ cd ..
+ echo "Done!"
+else
+ echo "Skip integration tests..."
+fi