Skip to content

Commit

Permalink
Improve auto reload for cqrs_consume
Browse files Browse the repository at this point in the history
  • Loading branch information
bdjilka committed Jan 30, 2023
1 parent 72c48b0 commit 602a073
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 63 deletions.
181 changes: 142 additions & 39 deletions dj_cqrs/management/commands/cqrs_consume.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,116 @@
# Copyright © 2022 Ingram Micro Inc. All rights reserved.
import multiprocessing
import logging
import signal
import threading
from pathlib import Path

from django.core.management.base import BaseCommand, CommandError
from watchfiles import watch
from watchfiles.filters import PythonFilter
from watchfiles.run import start_process

from dj_cqrs.registries import ReplicaRegistry
from dj_cqrs.transport import current_transport


logger = logging.getLogger('django_cqrs.cqrs_consume')


def consume(**kwargs):
import django
django.setup()

from dj_cqrs.transport import current_transport
current_transport.consume(**kwargs)


class WorkersManager:

def __init__(self, options, transport, consume_kwargs):
def __init__(
self,
consume_kwargs,
workers=1,
reload=False,
ignore_paths=None,
sigint_timeout=5,
sigkill_timeout=1,
):
self.pool = []
self.options = options
self.transport = transport
self.workers = workers
self.reload = reload
self.consume_kwargs = consume_kwargs
self.stop_event = threading.Event()
self.sigint_timeout = sigint_timeout
self.sigkill_timeout = sigkill_timeout

if self.reload:
self.watch_filter = PythonFilter(ignore_paths=ignore_paths)
self.watcher = watch(
Path.cwd(),
watch_filter=self.watch_filter,
stop_event=self.stop_event,
yield_on_timeout=True,
)

def handle_signal(self, *args, **kwargs):
self.stop_event.set()

def run(self):
for sig in [signal.SIGINT, signal.SIGTERM]:
signal.signal(sig, self.handle_signal)
if self.reload:
signal.signal(signal.SIGHUP, self.restart)

self.start()

if self.reload:
for files_changed in self:
if files_changed:
self.restart()
else:
self.stop_event.wait()

self.terminate()

def start(self):
for i in range(self.options['workers'] or 1):
process = multiprocessing.Process(
name=f'cqrs-consumer-{i}',
target=self.transport.consume,
kwargs=self.consume_kwargs,
for _ in range(self.workers):
process = start_process(
consume,
'function',
(),
self.consume_kwargs,
)
self.pool.append(process)
process.start()

for process in self.pool:
process.join()

def terminate(self, *args, **kwargs):
while self.pool:
p = self.pool.pop()
p.terminate()
p.join()
process = self.pool.pop()
process.stop(sigint_timeout=self.sigint_timeout, sigkill_timeout=self.sigkill_timeout)

def reload(self, *args, **kwargs):
def restart(self, *args, **kwargs):
self.terminate()
self.start()

def __iter__(self):
return self

def __next__(self):
changes = next(self.watcher)
if changes:
return list({Path(c[1]) for c in changes})
return None


class Command(BaseCommand):
help = 'Starts CQRS worker, which consumes messages from message queue.'

def add_arguments(self, parser):
parser.add_argument('--workers', '-w', help='Number of workers', type=int, default=0)
parser.add_argument(
'--workers',
'-w',
help='Number of workers',
type=int,
default=1,
)
parser.add_argument(
'--cqrs-id',
'-cid',
Expand All @@ -53,35 +119,72 @@ def add_arguments(self, parser):
help='Choose model(s) by CQRS_ID for consuming',
)
parser.add_argument(
'--reload', '-r', help='Enable reload signal SIGHUP', action='store_true',
'--reload',
'-r',
help=(
'Enable reload signal SIGHUP and autoreload '
'on file changes'
),
action='store_true',
default=False,
)
parser.add_argument(
'--ignore-paths',
nargs='?',
type=str,
help=(
'Specify directories to ignore, '
'to ignore multiple paths use a comma as separator, '
'e.g. "env" or "env,node_modules"'
),
)
parser.add_argument(
'--sigint-timeout',
nargs='?',
type=int,
default=5,
help='How long to wait for the sigint timeout before sending sigkill.',
)
parser.add_argument(
'--sigkill-timeout',
nargs='?',
type=int,
default=1,
help='How long to wait for the sigkill timeout before issuing a timeout exception.',
)

def handle(self, *args, **options):
if not options['workers'] and not options['reload']:
current_transport.consume(**self.get_consume_kwargs(options))
return

self.start_workers_pool(options)
def handle(
self,
*args,
workers=1,
cqrs_id=None,
reload=False,
ignore_paths=None,
sigint_timeout=5,
sigkill_timeout=1,
**options,
):

paths_to_ignore = None
if ignore_paths:
paths_to_ignore = [Path(p).resolve() for p in ignore_paths.split(',')]

def start_workers_pool(self, options):
workers_manager = WorkersManager(
options, current_transport, self.get_consume_kwargs(options),
workers=workers,
consume_kwargs=self.get_consume_kwargs(cqrs_id),
reload=reload,
ignore_paths=paths_to_ignore,
sigint_timeout=sigint_timeout,
sigkill_timeout=sigkill_timeout,
)
if options['reload']:
try:
multiprocessing.set_start_method('spawn')
except RuntimeError:
pass

signal.signal(signal.SIGHUP, workers_manager.reload)

workers_manager.start()
workers_manager.run()

def get_consume_kwargs(self, options):
def get_consume_kwargs(self, ids_list):
consume_kwargs = {}
if options.get('cqrs_id'):
if ids_list:
cqrs_ids = set()
for cqrs_id in options['cqrs_id']:
for cqrs_id in ids_list:
model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
if not model:
raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pswd
- POSTGRES_DB=replica
- POSTGRES_HOST_AUTH_METHOD=md5
- POSTGRES_INITDB_ARGS=--auth-host=md5

replica:
build:
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/kombu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pswd
- POSTGRES_DB=replica
- POSTGRES_HOST_AUTH_METHOD=md5
- POSTGRES_INITDB_ARGS=--auth-host=md5

replica:
build:
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/rdbms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pswd
- POSTGRES_DB=django_cqrs
- POSTGRES_HOST_AUTH_METHOD=md5
- POSTGRES_INITDB_ARGS=--auth-host=md5

mysql:
image: mysql:8.0
Expand Down
1 change: 1 addition & 0 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ kombu>=4.6.*
ujson>=5.4.0
django-model-utils>=4.0.0
python-dateutil>=2.4
watchfiles>=0.18.1
Loading

0 comments on commit 602a073

Please sign in to comment.