From 49c84fa2c4dfcfcf422e5df086d425b0c5cb3128 Mon Sep 17 00:00:00 2001 From: Maxim Kolyubyakin Date: Fri, 18 Oct 2024 14:31:57 +0200 Subject: [PATCH 1/4] LITE-31232 Init implementation of `bulk_relate_cqrs_serialization` --- dj_cqrs/mixins.py | 14 ++++++-- dj_cqrs/signals.py | 7 +++- dj_cqrs/state.py | 7 ++++ dj_cqrs/utils.py | 57 ++++++++++++++++++++++++++++++- tests/test_master/test_signals.py | 25 +++++++++++++- 5 files changed, 104 insertions(+), 6 deletions(-) create mode 100644 dj_cqrs/state.py diff --git a/dj_cqrs/mixins.py b/dj_cqrs/mixins.py index 0425b25..4140ff6 100644 --- a/dj_cqrs/mixins.py +++ b/dj_cqrs/mixins.py @@ -1,4 +1,4 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. import logging @@ -20,6 +20,7 @@ from dj_cqrs.managers import MasterManager, ReplicaManager from dj_cqrs.metas import MasterMeta, ReplicaMeta from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update +from dj_cqrs.state import cqrs_state logger = logging.getLogger('django-cqrs') @@ -292,9 +293,16 @@ def _class_serialization(self, using, sync=False): if sync: instance = self else: + instance = None db = using if using is not None else self._state.db - qs = self.__class__._default_manager.using(db) - instance = self.relate_cqrs_serialization(qs).get(pk=self.pk) + + bulk_relate_cm = cqrs_state.bulk_relate_cm + if bulk_relate_cm: + instance = bulk_relate_cm.get_cached_instance(self, db) + + if not instance: + qs = self.__class__._default_manager.using(db) + instance = self.relate_cqrs_serialization(qs).get(pk=self.pk) data = self._cqrs_serializer_cls(instance).data data['cqrs_revision'] = instance.cqrs_revision diff --git a/dj_cqrs/signals.py b/dj_cqrs/signals.py index af436f4..7b02344 100644 --- a/dj_cqrs/signals.py +++ b/dj_cqrs/signals.py @@ -1,4 +1,4 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. import logging @@ -9,6 +9,7 @@ from dj_cqrs.constants import SignalType from dj_cqrs.controller import producer from dj_cqrs.dataclasses import TransportPayload +from dj_cqrs.state import cqrs_state from dj_cqrs.utils import get_message_expiration_dt @@ -64,6 +65,10 @@ def post_save(cls, sender, **kwargs): using = kwargs['using'] + bulk_relate_cm = cqrs_state.bulk_relate_cm + if bulk_relate_cm: + bulk_relate_cm.register(instance, using) + sync = kwargs.get('sync', False) queue = kwargs.get('queue', None) diff --git a/dj_cqrs/state.py b/dj_cqrs/state.py new file mode 100644 index 0000000..0a575af --- /dev/null +++ b/dj_cqrs/state.py @@ -0,0 +1,7 @@ +# Copyright © 2024 Ingram Micro Inc. All rights reserved. + +import threading + + +cqrs_state = threading.local() +cqrs_state.bulk_relate_cm = None diff --git a/dj_cqrs/utils.py b/dj_cqrs/utils.py index ae75690..8bc474b 100644 --- a/dj_cqrs/utils.py +++ b/dj_cqrs/utils.py @@ -1,6 +1,8 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. import logging +from collections import defaultdict +from contextlib import ContextDecorator from datetime import date, datetime, timedelta from uuid import UUID @@ -10,6 +12,7 @@ from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS from dj_cqrs.logger import install_last_query_capturer +from dj_cqrs.state import cqrs_state logger = logging.getLogger('django-cqrs') @@ -80,3 +83,55 @@ def apply_query_timeouts(model_cls): # pragma: no cover cursor.execute(statement, params=(query_timeout,)) install_last_query_capturer(model_cls) + + +class _BulkRelateCM(ContextDecorator): + def __init__(self, cqrs_id=None): + self._cqrs_id = cqrs_id + self._mapping = defaultdict(lambda: defaultdict(list)) + self._cache = {} + + def register(self, instance, using): + instance_cqrs_id = getattr(instance, 'CQRS_ID', None) + if self._cqrs_id and instance_cqrs_id != self._cqrs_id: + return + + self._mapping[instance_cqrs_id][using].append(instance.pk) + + def get_cached_instance(self, instance, using): + instance_cqrs_id = getattr(instance, 'CQRS_ID', None) + if self._cqrs_id and instance_cqrs_id != self._cqrs_id: + return + + instance_pk = instance.pk + cached_instances = self._cache.get(instance_cqrs_id, {}).get(using, {}) + if cached_instances: + return cached_instances.get(instance_pk) + + cached_pks = self._mapping[instance_cqrs_id][using] + if not cached_pks: + return + + qs = instance.__class__._default_manager.using(using) + instances_cache = { + instance.pk: instance + for instance in instance.__class__.relate_cqrs_serialization(qs).filter( + pk__in=cached_pks, + ).order_by().all() + } + self._cache.update({ + instance_cqrs_id: { + using: instances_cache, + }, + }) + return instances_cache.get(instance_pk) + + def __enter__(self): + cqrs_state.bulk_relate_cm = self + + def __exit__(self, exc_type, exc_val, exc_tb): + cqrs_state.bulk_relate_cm = None + + +def bulk_relate_cqrs_serialization(cqrs_id=None): + return _BulkRelateCM(cqrs_id=cqrs_id) diff --git a/tests/test_master/test_signals.py b/tests/test_master/test_signals.py index 05d1c90..c52d48f 100644 --- a/tests/test_master/test_signals.py +++ b/tests/test_master/test_signals.py @@ -1,4 +1,4 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. from datetime import datetime, timezone @@ -8,6 +8,7 @@ from dj_cqrs.constants import SignalType from dj_cqrs.signals import post_bulk_create, post_update +from dj_cqrs.utils import bulk_relate_cqrs_serialization from tests.dj_master import models from tests.utils import assert_is_sub_dict, assert_publisher_once_called_with_args @@ -127,6 +128,28 @@ def test_manual_post_bulk_create(mocker): assert publisher_mock.call_count == 3 +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize('count', (1, 3, 5)) +def test_bulk_relate_cqrs_serialization( + django_assert_num_queries, + django_v_trans_q_count_sup, + mocker, + count, +): + mocker.patch('dj_cqrs.controller.producer.produce') + + opt_query_count = count + 2 + django_v_trans_q_count_sup + with django_assert_num_queries(opt_query_count): + with bulk_relate_cqrs_serialization(): + with transaction.atomic(savepoint=False): + [models.Author.objects.create(id=i) for i in range(count)] + + not_opt_query_count = count + count * 2 + django_v_trans_q_count_sup + with django_assert_num_queries(not_opt_query_count): + with transaction.atomic(savepoint=False): + [models.Author.objects.create(id=10 + i) for i in range(count)] + + @pytest.mark.django_db(transaction=True) def test_automatic_post_bulk_create(mocker): publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce') From 396035013d5d3e266f5aad8b860694c4bf7ce7bc Mon Sep 17 00:00:00 2001 From: Maxim Kolyubyakin Date: Mon, 21 Oct 2024 13:02:22 +0200 Subject: [PATCH 2/4] LITE-31232 Units for `bulk_relate_cqrs_serialization` --- dj_cqrs/utils.py | 31 ++++++----- tests/test_utils.py | 128 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 13 deletions(-) diff --git a/dj_cqrs/utils.py b/dj_cqrs/utils.py index 8bc474b..ff80eb0 100644 --- a/dj_cqrs/utils.py +++ b/dj_cqrs/utils.py @@ -88,19 +88,19 @@ def apply_query_timeouts(model_cls): # pragma: no cover class _BulkRelateCM(ContextDecorator): def __init__(self, cqrs_id=None): self._cqrs_id = cqrs_id - self._mapping = defaultdict(lambda: defaultdict(list)) + self._mapping = defaultdict(lambda: defaultdict(set)) self._cache = {} - def register(self, instance, using): + def register(self, instance, using=None): instance_cqrs_id = getattr(instance, 'CQRS_ID', None) - if self._cqrs_id and instance_cqrs_id != self._cqrs_id: + if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id): return - self._mapping[instance_cqrs_id][using].append(instance.pk) + self._mapping[instance_cqrs_id][using].add(instance.pk) - def get_cached_instance(self, instance, using): + def get_cached_instance(self, instance, using=None): instance_cqrs_id = getattr(instance, 'CQRS_ID', None) - if self._cqrs_id and instance_cqrs_id != self._cqrs_id: + if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id): return instance_pk = instance.pk @@ -115,15 +115,20 @@ def get_cached_instance(self, instance, using): qs = instance.__class__._default_manager.using(using) instances_cache = { instance.pk: instance - for instance in instance.__class__.relate_cqrs_serialization(qs).filter( + for instance in instance.__class__.relate_cqrs_serialization(qs) + .filter( pk__in=cached_pks, - ).order_by().all() + ) + .order_by() + .all() } - self._cache.update({ - instance_cqrs_id: { - using: instances_cache, - }, - }) + self._cache.update( + { + instance_cqrs_id: { + using: instances_cache, + }, + } + ) return instances_cache.get(instance_pk) def __enter__(self): diff --git a/tests/test_utils.py b/tests/test_utils.py index 34920ab..0c7c838 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -10,14 +10,18 @@ from uuid import UUID import pytest +from django.db import transaction +from dj_cqrs.state import cqrs_state from dj_cqrs.utils import ( apply_query_timeouts, + bulk_relate_cqrs_serialization, get_delay_queue_max_size, get_json_valid_value, get_message_expiration_dt, get_messages_prefetch_count_per_worker, ) +from tests.dj_master import models as master_models from tests.dj_replica import models @@ -109,3 +113,127 @@ def test_apply_query_timeouts(settings, engine, p_count): assert apply_query_timeouts(models.BasicFieldsModelRef) is None assert p.call_count == p_count + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_simple_model(mocker): + produce_mock = mocker.patch('dj_cqrs.controller.producer.produce') + + @bulk_relate_cqrs_serialization() + def func(): + assert cqrs_state.bulk_relate_cm + + instance = master_models.SimplestModel(id=1) + instance.save() + + assert cqrs_state.bulk_relate_cm is None + func() + + assert master_models.SimplestModel.objects.count() == 1 + assert produce_mock.call_count == 1 + assert cqrs_state.bulk_relate_cm is None + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_serialized_model(mocker): + produce_mock = mocker.patch('dj_cqrs.controller.producer.produce') + + assert cqrs_state.bulk_relate_cm is None + with bulk_relate_cqrs_serialization(cqrs_id=master_models.Author.CQRS_ID): + bulk_relate_cm = cqrs_state.bulk_relate_cm + + with transaction.atomic(savepoint=False): + master_models.Author.objects.create(id=1) + + assert bulk_relate_cm + assert bulk_relate_cm._mapping + assert not bulk_relate_cm._cache + + assert bulk_relate_cm._cache + + assert master_models.Author.objects.count() == 1 + assert produce_mock.call_count == 1 + assert cqrs_state.bulk_relate_cm is None + + +def test_bulk_relate_cqrs_serialization_error(): + assert cqrs_state.bulk_relate_cm is None + + try: + with bulk_relate_cqrs_serialization(cqrs_id=master_models.Author.CQRS_ID): + assert cqrs_state.bulk_relate_cm + raise ValueError + except ValueError: + pass + + assert cqrs_state.bulk_relate_cm is None + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_register(): + author1 = master_models.Author(id=1) + author2 = master_models.Author(id=2) + + with bulk_relate_cqrs_serialization(cqrs_id=master_models.Author.CQRS_ID): + bulk_relate_cm = cqrs_state.bulk_relate_cm + bulk_relate_cm.register(ValueError) + bulk_relate_cm.register(master_models.FilteredSimplestModel()) + bulk_relate_cm.register(author1, 'default') + bulk_relate_cm.register(author1, 'default') + bulk_relate_cm.register(author1, 'other') + bulk_relate_cm.register(author2, 'other') + bulk_relate_cm.register(author2) + + assert bulk_relate_cm._mapping == { + master_models.Author.CQRS_ID: { + 'default': {1}, + 'other': {1, 2}, + None: {2}, + }, + } + + assert cqrs_state.bulk_relate_cm is None + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_get_cached_instance(mocker, django_assert_num_queries): + produce_mock = mocker.patch('dj_cqrs.controller.producer.produce') + + simple = master_models.SimplestModel.objects.create(id=1) + + with bulk_relate_cqrs_serialization(): + bulk_relate_cm = cqrs_state.bulk_relate_cm + + with transaction.atomic(): + author1 = master_models.Author.objects.create(id=1) + author1.name = 'new' + author1.save() + author2 = master_models.Author.objects.create(id=2) + + af = master_models.AutoFieldsModel.objects.using('default').create() + publisher = master_models.Publisher.objects.create(id=3) + + assert produce_mock.call_count == 4 + assert bulk_relate_cm._cache == { + master_models.Author.CQRS_ID: { + 'default': { + 1: author1, + 2: author2, + }, + }, + } + + assert bulk_relate_cm.get_cached_instance(publisher) is None + assert bulk_relate_cm.get_cached_instance(ValueError, 'test') is None + + with django_assert_num_queries(0): + assert bulk_relate_cm.get_cached_instance(simple) is None + assert bulk_relate_cm.get_cached_instance(author1, 'default') == author1 + assert bulk_relate_cm.get_cached_instance(author1, 'default') == author1 + assert bulk_relate_cm.get_cached_instance(author1, 'other') is None + assert bulk_relate_cm.get_cached_instance(author2, 'default') == author2 + assert bulk_relate_cm.get_cached_instance(author2) is None + assert bulk_relate_cm.get_cached_instance(master_models.Author(id=3)) is None + assert bulk_relate_cm.get_cached_instance(af) is None + + assert cqrs_state.bulk_relate_cm is None From 901aac34a841647d19bdb9b706f8646a4c6902d3 Mon Sep 17 00:00:00 2001 From: Maxim Kolyubyakin Date: Mon, 21 Oct 2024 15:06:55 +0200 Subject: [PATCH 3/4] LITE-31232 Fixed `test_bulk_relate_cqrs_serialization` for diff Python/DJ versions --- tests/test_master/test_signals.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/test_master/test_signals.py b/tests/test_master/test_signals.py index c52d48f..d27ca9f 100644 --- a/tests/test_master/test_signals.py +++ b/tests/test_master/test_signals.py @@ -135,16 +135,22 @@ def test_bulk_relate_cqrs_serialization( django_v_trans_q_count_sup, mocker, count, + settings, ): mocker.patch('dj_cqrs.controller.producer.produce') - opt_query_count = count + 2 + django_v_trans_q_count_sup + if settings.DB_ENGINE == 'sqlite' and django_v_trans_q_count_sup == 0: + suppl = 1 + else: + suppl = django_v_trans_q_count_sup + + opt_query_count = count + 2 + suppl with django_assert_num_queries(opt_query_count): with bulk_relate_cqrs_serialization(): with transaction.atomic(savepoint=False): [models.Author.objects.create(id=i) for i in range(count)] - not_opt_query_count = count + count * 2 + django_v_trans_q_count_sup + not_opt_query_count = count + count * 2 + suppl with django_assert_num_queries(not_opt_query_count): with transaction.atomic(savepoint=False): [models.Author.objects.create(id=10 + i) for i in range(count)] From 9610845c813dff7ffc704f45e61ef2a99005c5f5 Mon Sep 17 00:00:00 2001 From: Maxim Kolyubyakin Date: Mon, 21 Oct 2024 15:34:19 +0200 Subject: [PATCH 4/4] LITE-31232 `docker compose` is now used instead of `docker-compose` --- README.md | 8 ++++---- examples/demo_project/README.md | 12 ++++++------ integration_tests/Makefile | 22 +++++++++++----------- travis_compat_tests.sh | 12 ++++++------ travis_integration_tests.sh | 16 ++++++++-------- 5 files changed, 35 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 748615a..373b225 100644 --- a/README.md +++ b/README.md @@ -228,8 +228,8 @@ Unit testing Run tests with various RDBMS: - `cd integration_tests` -- `DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test` -- `DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test` +- `DB=postgres docker compose -f docker-compose.yml -f rdbms.yml run app_test` +- `DB=mysql docker compose -f docker-compose.yml -f rdbms.yml run app_test` Check code style: `flake8` Run tests: `pytest` @@ -244,6 +244,6 @@ To generate HTML coverage reports use: Integrational testing ------ -1. docker-compose +1. docker compose 2. `cd integration_tests` -3. `docker-compose run master` +3. `docker compose run master` diff --git a/examples/demo_project/README.md b/examples/demo_project/README.md index f39af4e..a15c5d1 100644 --- a/examples/demo_project/README.md +++ b/examples/demo_project/README.md @@ -8,12 +8,12 @@ It's a simple demo project contains 2 services: ## Start project: ``` -docker-compose up -d db_pgsql db_mysql -docker-compose run master ./manage.py migrate -docker-compose run replica ./manage.py migrate -docker-compose up -d -docker-compose run master ./manage.py cqrs_sync --cqrs-id=user -f={} -docker-compose run master ./manage.py cqrs_sync --cqrs-id=product -f={} +docker compose up -d db_pgsql db_mysql +docker compose run master ./manage.py migrate +docker compose run replica ./manage.py migrate +docker compose up -d +docker compose run master ./manage.py cqrs_sync --cqrs-id=user -f={} +docker compose run master ./manage.py cqrs_sync --cqrs-id=product -f={} ``` It starts master WEB app on [http://127.0.0.1:8000](http://127.0.0.1:8000) and replica on [http://127.0.0.1:8001](http://127.0.0.1:8001) diff --git a/integration_tests/Makefile b/integration_tests/Makefile index edf76f2..0c32303 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -3,40 +3,40 @@ .DEFAULT_GOAL := pika build: - docker-compose build + docker compose build build_master_v1: - docker-compose -f docker-compose.yml -f masterV1.yml build + docker compose -f docker-compose.yml -f masterV1.yml build build_replica_v1: - docker-compose -f docker-compose.yml -f replicaV1.yml build + docker compose -f docker-compose.yml -f replicaV1.yml build pika: build @echo "Run PIKA integration tests..." - docker-compose run master + docker compose run master @echo "Stopping running containers..." - docker-compose down --remove-orphans + docker compose down --remove-orphans @echo "Done!" kombu: build @echo "Run KOMBU integration tests..." - docker-compose -f docker-compose.yml -f kombu.yml run master + docker compose -f docker-compose.yml -f kombu.yml run master @echo "Stopping running containers..." - docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans + docker compose -f docker-compose.yml -f kombu.yml down --remove-orphans @echo "Done!" master_v1: build_master_v1 @echo "Run regression tests Master v1.3.1..." - docker-compose -f docker-compose.yml -f masterV1.yml run master + docker compose -f docker-compose.yml -f masterV1.yml run master @echo "Stopping running containers..." - docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f masterV1.yml down --remove-orphans @echo "Done!" replica_v1: build_replica_v1 @echo "Run regression tests Replica v1.3.1..." - docker-compose -f docker-compose.yml -f replicaV1.yml run master + docker compose -f docker-compose.yml -f replicaV1.yml run master @echo "Stopping running containers..." - docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans @echo "Done!" all: pika kombu master_v1 replica_v1 diff --git a/travis_compat_tests.sh b/travis_compat_tests.sh index fa6f7ed..6e2e70a 100755 --- a/travis_compat_tests.sh +++ b/travis_compat_tests.sh @@ -6,13 +6,13 @@ if [ "$COMPAT_TESTS" == "yes" ]; then echo "Running backward compatibility tests....." cd integration_tests echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - docker-compose -f docker-compose.yml -f masterV1.yml build - docker-compose -f docker-compose.yml -f masterV1.yml run master - docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f masterV1.yml build + docker compose -f docker-compose.yml -f masterV1.yml run master + docker compose -f docker-compose.yml -f masterV1.yml down --remove-orphans - docker-compose -f docker-compose.yml -f replicaV1.yml build - docker-compose -f docker-compose.yml -f replicaV1.yml run master - docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f replicaV1.yml build + docker compose -f docker-compose.yml -f replicaV1.yml run master + docker compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans cd .. echo "Done!" else diff --git a/travis_integration_tests.sh b/travis_integration_tests.sh index e315720..63ea8a9 100755 --- a/travis_integration_tests.sh +++ b/travis_integration_tests.sh @@ -6,14 +6,14 @@ if [ "$INTEGRATION_TESTS" == "yes" ]; then echo "Running integration tests....." echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin cd integration_tests - docker-compose build - docker-compose run master - docker-compose down --remove-orphans - docker-compose -f docker-compose.yml -f kombu.yml run master - docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans - DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test - DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test - docker-compose -f docker-compose.yml -f rdbms.yml down --remove-orphans + docker compose build + docker compose run master + docker compose down --remove-orphans + docker compose -f docker-compose.yml -f kombu.yml run master + docker compose -f docker-compose.yml -f kombu.yml down --remove-orphans + DB=postgres docker compose -f docker-compose.yml -f rdbms.yml run app_test + DB=mysql docker compose -f docker-compose.yml -f rdbms.yml run app_test + docker compose -f docker-compose.yml -f rdbms.yml down --remove-orphans cd .. echo "Done!"