Skip to content

Commit

Permalink
[Added] Implement auto restart of the consumption when futures are do…
Browse files Browse the repository at this point in the history
…ne or cancelled (#226)

* Auto restart consumers on failure

* Handle futures

* Simplify test
  • Loading branch information
EmilioCarrion authored Aug 3, 2022
1 parent 123a604 commit 5d05758
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 17 deletions.
40 changes: 24 additions & 16 deletions rele/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
threads_per_subscription=None,
):
self._subscriber = Subscriber(gc_project_id, credentials, default_ack_deadline)
self._futures = []
self._futures = {}
self._subscriptions = subscriptions
self.threads_per_subscription = threads_per_subscription

Expand All @@ -56,20 +56,7 @@ def start(self):
"""
run_middleware_hook("pre_worker_start")
for subscription in self._subscriptions:
executor_kwargs = {
"thread_name_prefix": "ThreadPoolExecutor-ThreadScheduler"
}
executor = futures.ThreadPoolExecutor(
max_workers=self.threads_per_subscription, **executor_kwargs
)
scheduler = ThreadScheduler(executor=executor)
self._futures.append(
self._subscriber.consume(
subscription_name=subscription.name,
callback=Callback(subscription),
scheduler=scheduler,
)
)
self._boostrap_consumption(subscription)
run_middleware_hook("post_worker_start")

def run_forever(self, sleep_interval=1):
Expand Down Expand Up @@ -99,15 +86,36 @@ def stop(self, signal=None, frame=None):
:param frame: Needed for `signal.signal <https://docs.python.org/3/library/signal.html#signal.signal>`_ # noqa
"""
run_middleware_hook("pre_worker_stop", self._subscriptions)
for future in self._futures:
for future in self._futures.values():
future.cancel()

run_middleware_hook("post_worker_stop")
sys.exit(0)

def _boostrap_consumption(self, subscription):
if subscription in self._futures:
self._futures[subscription].cancel()

executor_kwargs = {"thread_name_prefix": "ThreadPoolExecutor-ThreadScheduler"}
executor = futures.ThreadPoolExecutor(
max_workers=self.threads_per_subscription, **executor_kwargs
)
scheduler = ThreadScheduler(executor=executor)

self._futures[subscription] = self._subscriber.consume(
subscription_name=subscription.name,
callback=Callback(subscription),
scheduler=scheduler,
)

def _wait_forever(self, sleep_interval):
logger.info("Consuming subscriptions...")
while True:
for subscription, future in self._futures.items():
if future.cancelled() or future.done():
logger.info(f"Restarting consumption of {subscription.name}.")
self._boostrap_consumption(subscription)

time.sleep(sleep_interval)


Expand Down
39 changes: 38 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import time
from concurrent import futures
from unittest.mock import ANY, patch

import pytest
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

from rele import Subscriber, Worker, sub
from rele.middleware import register_middleware
from rele.subscription import Callback
from rele.worker import create_and_run


Expand All @@ -27,8 +30,10 @@ def worker(config):


@pytest.fixture
def mock_consume():
def mock_consume(config):
with patch.object(Subscriber, "consume") as m:
client = pubsub_v1.SubscriberClient(credentials=config.credentials)
m.return_value = client.subscribe("dummy-subscription", Callback(sub_stub))
yield m


Expand Down Expand Up @@ -108,6 +113,38 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment(
assert worker._subscriber._gc_project_id == "rele-test"


@pytest.mark.usefixtures("mock_create_subscription")
class TestRestartConsumer:
@pytest.fixture(autouse=True)
def mock_sleep(self):
with patch.object(time, "sleep", side_effect=ValueError) as m:
yield m

def test_does_not_restart_consumption_when_everything_goes_well(
self, worker, mock_consume
):
with pytest.raises(ValueError):
worker.run_forever()

assert len(mock_consume.call_args_list) == 1

def test_restarts_consumption_when_future_is_cancelled(self, worker, mock_consume):
mock_consume.return_value.cancel()

with pytest.raises(ValueError):
worker.run_forever()

assert len(mock_consume.call_args_list) == 2

def test_restarts_consumption_when_future_is_done(self, worker, mock_consume):
mock_consume.return_value.set_result(True)

with pytest.raises(ValueError):
worker.run_forever()

assert len(mock_consume.call_args_list) == 2


class TestCreateAndRun:
@pytest.fixture(autouse=True)
def worker_wait_forever(self):
Expand Down

0 comments on commit 5d05758

Please sign in to comment.