From 5d05758ede1e9f2b68a9e752e4f075762bb17351 Mon Sep 17 00:00:00 2001 From: EmilioCarrion <54892804+EmilioCarrion@users.noreply.github.com> Date: Wed, 3 Aug 2022 15:30:28 +0200 Subject: [PATCH] [Added] Implement auto restart of the consumption when futures are done or cancelled (#226) * Auto restart consumers on failure * Handle futures * Simplify test --- rele/worker.py | 40 ++++++++++++++++++++++++---------------- tests/test_worker.py | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/rele/worker.py b/rele/worker.py index 5536b55..639dae5 100644 --- a/rele/worker.py +++ b/rele/worker.py @@ -31,7 +31,7 @@ def __init__( threads_per_subscription=None, ): self._subscriber = Subscriber(gc_project_id, credentials, default_ack_deadline) - self._futures = [] + self._futures = {} self._subscriptions = subscriptions self.threads_per_subscription = threads_per_subscription @@ -56,20 +56,7 @@ def start(self): """ run_middleware_hook("pre_worker_start") for subscription in self._subscriptions: - executor_kwargs = { - "thread_name_prefix": "ThreadPoolExecutor-ThreadScheduler" - } - executor = futures.ThreadPoolExecutor( - max_workers=self.threads_per_subscription, **executor_kwargs - ) - scheduler = ThreadScheduler(executor=executor) - self._futures.append( - self._subscriber.consume( - subscription_name=subscription.name, - callback=Callback(subscription), - scheduler=scheduler, - ) - ) + self._boostrap_consumption(subscription) run_middleware_hook("post_worker_start") def run_forever(self, sleep_interval=1): @@ -99,15 +86,36 @@ def stop(self, signal=None, frame=None): :param frame: Needed for `signal.signal `_ # noqa """ run_middleware_hook("pre_worker_stop", self._subscriptions) - for future in self._futures: + for future in self._futures.values(): future.cancel() run_middleware_hook("post_worker_stop") sys.exit(0) + def _boostrap_consumption(self, subscription): + if subscription in self._futures: + self._futures[subscription].cancel() + + executor_kwargs = {"thread_name_prefix": "ThreadPoolExecutor-ThreadScheduler"} + executor = futures.ThreadPoolExecutor( + max_workers=self.threads_per_subscription, **executor_kwargs + ) + scheduler = ThreadScheduler(executor=executor) + + self._futures[subscription] = self._subscriber.consume( + subscription_name=subscription.name, + callback=Callback(subscription), + scheduler=scheduler, + ) + def _wait_forever(self, sleep_interval): logger.info("Consuming subscriptions...") while True: + for subscription, future in self._futures.items(): + if future.cancelled() or future.done(): + logger.info(f"Restarting consumption of {subscription.name}.") + self._boostrap_consumption(subscription) + time.sleep(sleep_interval) diff --git a/tests/test_worker.py b/tests/test_worker.py index 3a63b6f..94d610d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,11 +1,14 @@ +import time from concurrent import futures from unittest.mock import ANY, patch import pytest +from google.cloud import pubsub_v1 from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler from rele import Subscriber, Worker, sub from rele.middleware import register_middleware +from rele.subscription import Callback from rele.worker import create_and_run @@ -27,8 +30,10 @@ def worker(config): @pytest.fixture -def mock_consume(): +def mock_consume(config): with patch.object(Subscriber, "consume") as m: + client = pubsub_v1.SubscriberClient(credentials=config.credentials) + m.return_value = client.subscribe("dummy-subscription", Callback(sub_stub)) yield m @@ -108,6 +113,38 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment( assert worker._subscriber._gc_project_id == "rele-test" +@pytest.mark.usefixtures("mock_create_subscription") +class TestRestartConsumer: + @pytest.fixture(autouse=True) + def mock_sleep(self): + with patch.object(time, "sleep", side_effect=ValueError) as m: + yield m + + def test_does_not_restart_consumption_when_everything_goes_well( + self, worker, mock_consume + ): + with pytest.raises(ValueError): + worker.run_forever() + + assert len(mock_consume.call_args_list) == 1 + + def test_restarts_consumption_when_future_is_cancelled(self, worker, mock_consume): + mock_consume.return_value.cancel() + + with pytest.raises(ValueError): + worker.run_forever() + + assert len(mock_consume.call_args_list) == 2 + + def test_restarts_consumption_when_future_is_done(self, worker, mock_consume): + mock_consume.return_value.set_result(True) + + with pytest.raises(ValueError): + worker.run_forever() + + assert len(mock_consume.call_args_list) == 2 + + class TestCreateAndRun: @pytest.fixture(autouse=True) def worker_wait_forever(self):