From e1a6c6cc0b87d91e0730016e9db7914b8111fac7 Mon Sep 17 00:00:00 2001 From: Jose Bonora Soriano <55445530+jBonoraW@users.noreply.github.com> Date: Fri, 19 Jan 2024 09:36:19 +0100 Subject: [PATCH] Fix recover due a loss of network connectivity (#267) * Add prints * Add prints * wip * reformat print and cheeck connecti * Set alfa version 1.13.0a1 * Revert client logs * Linting * Linting * Alfa version * linting --------- Co-authored-by: jsansalo --- rele/__init__.py | 2 +- rele/worker.py | 96 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/rele/__init__.py b/rele/__init__.py index 189c80d..3857a55 100644 --- a/rele/__init__.py +++ b/rele/__init__.py @@ -1,4 +1,4 @@ -__version__ = "1.13.0" +__version__ = "1.13.1a1" try: import django diff --git a/rele/worker.py b/rele/worker.py index b1a775f..30d9d05 100644 --- a/rele/worker.py +++ b/rele/worker.py @@ -1,9 +1,14 @@ import logging import signal +import socket import sys +import threading import time from concurrent import futures +from datetime import datetime +from typing import Dict +from google.cloud.pubsub_v1.futures import Future from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler from .client import Subscriber @@ -13,6 +18,25 @@ logger = logging.getLogger(__name__) +def check_internet_connection(): + print("Checking connection") + remote_server = "www.google.com" + port = 80 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + try: + sock.connect((remote_server, port)) + return True + except socket.error: + return False + finally: + sock.close() + + +class NotConnectionError(BaseException): + pass + + class Worker: """A Worker manages the subscriptions which consume Google PubSub messages. @@ -39,7 +63,7 @@ def __init__( default_ack_deadline, default_retry_policy, ) - self._futures = {} + self._futures: Dict[str, Future] = {} self._subscriptions = subscriptions self.threads_per_subscription = threads_per_subscription @@ -49,8 +73,10 @@ def setup(self): If the subscription already exists, the subscription will not be re-created. Therefore, it is idempotent. """ + print(f"[{datetime.now()}][{threading.get_ident()}][start] start setup") for subscription in self._subscriptions: self._subscriber.update_or_create_subscription(subscription) + print(f"[{datetime.now()}][{threading.get_ident()}][setup] end setup") def start(self): """Begin consuming all subscriptions. @@ -62,19 +88,25 @@ def start(self): The futures are stored so that they can be cancelled later on for a graceful shutdown of the worker. """ + print(f"[{datetime.now()}][{threading.get_ident()}][start] start start") run_middleware_hook("pre_worker_start") for subscription in self._subscriptions: self._boostrap_consumption(subscription) run_middleware_hook("post_worker_start") + print(f"[{datetime.now()}][{threading.get_ident()}][start] end start") def run_forever(self, sleep_interval=1): """Shortcut for calling setup, start, and _wait_forever. :param sleep_interval: Number of seconds to sleep in the ``while True`` loop """ + print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] setup") self.setup() + print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] start") self.start() + print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] wait for ever") self._wait_forever(sleep_interval=sleep_interval) + print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] finish") def stop(self, signal=None, frame=None): """Manage the shutdown process of the worker. @@ -101,8 +133,36 @@ def stop(self, signal=None, frame=None): sys.exit(0) def _boostrap_consumption(self, subscription): + print( + f"[{datetime.now()}][{threading.get_ident()}][_boostrap_consumption][0] " + f"subscription {subscription.name}" + ) + if subscription in self._futures: + print( + f"[{datetime.now()}][{threading.get_ident()}]" + f"[_boostrap_consumption][1] subscription {subscription.name} " + f"futures in [{self._futures[subscription]._state}]" + ) self._futures[subscription].cancel() + print( + f"[{datetime.now()}][{threading.get_ident()}]" + f"[_boostrap_consumption][2] subscription {subscription.name} " + "future cancelled" + ) + self._futures[subscription].result() + print( + f"[{datetime.now()}][{threading.get_ident()}]" + f"[_boostrap_consumption][3] subscription {subscription.name} " + "future cancelled and result" + ) + + if not check_internet_connection(): + print( + f"[{datetime.now()}][{threading.get_ident()}] Not internet " + f"connection when boostrap a consumption for {subscription}" + ) + raise NotConnectionError executor_kwargs = {"thread_name_prefix": "ThreadPoolExecutor-ThreadScheduler"} executor = futures.ThreadPoolExecutor( @@ -115,15 +175,42 @@ def _boostrap_consumption(self, subscription): callback=Callback(subscription), scheduler=scheduler, ) + print( + f"[{datetime.now()}][{threading.get_ident()}][_boostrap_consumption][3] " + f"subscription {subscription.name} future in " + f"[{self._futures[subscription]._state}]" + ) def _wait_forever(self, sleep_interval): logger.info("Consuming subscriptions...") while True: + print( + f"[{datetime.now()}][{threading.get_ident()}]" + f"[_wait_forever][0] Futures: {self._futures.values()}" + ) + + if datetime.now().timestamp() % 50 < 1 and not check_internet_connection(): + print( + f"[{datetime.now()}][{threading.get_ident()}] " + "Not internet connection, raising an Exception" + ) + raise NotConnectionError + for subscription, future in self._futures.items(): if future.cancelled() or future.done(): + print( + f"[{datetime.now()}][{threading.get_ident()}]" + "[_wait_forever][1] Restarting consumption " + f"of {subscription.name}." + ) logger.info(f"Restarting consumption of {subscription.name}.") self._boostrap_consumption(subscription) + print( + f"[{datetime.now()}][{threading.get_ident()}]" + f"[_wait_forever][2] Sleep {sleep_interval} " + f"second(s) with futures: {self._futures.values()}" + ) time.sleep(sleep_interval) @@ -151,9 +238,12 @@ def create_and_run(subs, config): :param subs: List :class:`~rele.subscription.Subscription` :param config: :class:`~rele.config.Config` """ - print(f"Configuring worker with {len(subs)} subscription(s)...") + print( + f"[{datetime.now()}][{threading.get_ident()}]" + f"Configuring worker with {len(subs)} subscription(s)..." + ) for sub in subs: - print(f" {sub}") + print(f"[{datetime.now()}][{threading.get_ident()}] {sub}") worker = Worker( subs, config.gc_project_id,