Skip to content

Commit

Permalink
Fix recover due a loss of network connectivity (#267)
Browse files Browse the repository at this point in the history
* Add prints

* Add prints

* wip

* reformat print and cheeck connecti

* Set alfa version 1.13.0a1

* Revert client logs

* Linting

* Linting

* Alfa version

* linting

---------

Co-authored-by: jsansalo <[email protected]>
  • Loading branch information
jBonoraW and jsansaloni authored Jan 19, 2024
1 parent e24e9ed commit e1a6c6c
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 4 deletions.
2 changes: 1 addition & 1 deletion rele/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.13.0"
__version__ = "1.13.1a1"

try:
import django
Expand Down
96 changes: 93 additions & 3 deletions rele/worker.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
import signal
import socket
import sys
import threading
import time
from concurrent import futures
from datetime import datetime
from typing import Dict

from google.cloud.pubsub_v1.futures import Future
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

from .client import Subscriber
Expand All @@ -13,6 +18,25 @@
logger = logging.getLogger(__name__)


def check_internet_connection():
print("Checking connection")
remote_server = "www.google.com"
port = 80
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
sock.connect((remote_server, port))
return True
except socket.error:
return False
finally:
sock.close()


class NotConnectionError(BaseException):
pass


class Worker:
"""A Worker manages the subscriptions which consume Google PubSub messages.
Expand All @@ -39,7 +63,7 @@ def __init__(
default_ack_deadline,
default_retry_policy,
)
self._futures = {}
self._futures: Dict[str, Future] = {}
self._subscriptions = subscriptions
self.threads_per_subscription = threads_per_subscription

Expand All @@ -49,8 +73,10 @@ def setup(self):
If the subscription already exists, the subscription will not be
re-created. Therefore, it is idempotent.
"""
print(f"[{datetime.now()}][{threading.get_ident()}][start] start setup")
for subscription in self._subscriptions:
self._subscriber.update_or_create_subscription(subscription)
print(f"[{datetime.now()}][{threading.get_ident()}][setup] end setup")

def start(self):
"""Begin consuming all subscriptions.
Expand All @@ -62,19 +88,25 @@ def start(self):
The futures are stored so that they can be cancelled later on
for a graceful shutdown of the worker.
"""
print(f"[{datetime.now()}][{threading.get_ident()}][start] start start")
run_middleware_hook("pre_worker_start")
for subscription in self._subscriptions:
self._boostrap_consumption(subscription)
run_middleware_hook("post_worker_start")
print(f"[{datetime.now()}][{threading.get_ident()}][start] end start")

def run_forever(self, sleep_interval=1):
"""Shortcut for calling setup, start, and _wait_forever.
:param sleep_interval: Number of seconds to sleep in the ``while True`` loop
"""
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] setup")
self.setup()
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] start")
self.start()
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] wait for ever")
self._wait_forever(sleep_interval=sleep_interval)
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] finish")

def stop(self, signal=None, frame=None):
"""Manage the shutdown process of the worker.
Expand All @@ -101,8 +133,36 @@ def stop(self, signal=None, frame=None):
sys.exit(0)

def _boostrap_consumption(self, subscription):
print(
f"[{datetime.now()}][{threading.get_ident()}][_boostrap_consumption][0] "
f"subscription {subscription.name}"
)

if subscription in self._futures:
print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"[_boostrap_consumption][1] subscription {subscription.name} "
f"futures in [{self._futures[subscription]._state}]"
)
self._futures[subscription].cancel()
print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"[_boostrap_consumption][2] subscription {subscription.name} "
"future cancelled"
)
self._futures[subscription].result()
print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"[_boostrap_consumption][3] subscription {subscription.name} "
"future cancelled and result"
)

if not check_internet_connection():
print(
f"[{datetime.now()}][{threading.get_ident()}] Not internet "
f"connection when boostrap a consumption for {subscription}"
)
raise NotConnectionError

executor_kwargs = {"thread_name_prefix": "ThreadPoolExecutor-ThreadScheduler"}
executor = futures.ThreadPoolExecutor(
Expand All @@ -115,15 +175,42 @@ def _boostrap_consumption(self, subscription):
callback=Callback(subscription),
scheduler=scheduler,
)
print(
f"[{datetime.now()}][{threading.get_ident()}][_boostrap_consumption][3] "
f"subscription {subscription.name} future in "
f"[{self._futures[subscription]._state}]"
)

def _wait_forever(self, sleep_interval):
logger.info("Consuming subscriptions...")
while True:
print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"[_wait_forever][0] Futures: {self._futures.values()}"
)

if datetime.now().timestamp() % 50 < 1 and not check_internet_connection():
print(
f"[{datetime.now()}][{threading.get_ident()}] "
"Not internet connection, raising an Exception"
)
raise NotConnectionError

for subscription, future in self._futures.items():
if future.cancelled() or future.done():
print(
f"[{datetime.now()}][{threading.get_ident()}]"
"[_wait_forever][1] Restarting consumption "
f"of {subscription.name}."
)
logger.info(f"Restarting consumption of {subscription.name}.")
self._boostrap_consumption(subscription)

print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"[_wait_forever][2] Sleep {sleep_interval} "
f"second(s) with futures: {self._futures.values()}"
)
time.sleep(sleep_interval)


Expand Down Expand Up @@ -151,9 +238,12 @@ def create_and_run(subs, config):
:param subs: List :class:`~rele.subscription.Subscription`
:param config: :class:`~rele.config.Config`
"""
print(f"Configuring worker with {len(subs)} subscription(s)...")
print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"Configuring worker with {len(subs)} subscription(s)..."
)
for sub in subs:
print(f" {sub}")
print(f"[{datetime.now()}][{threading.get_ident()}] {sub}")
worker = Worker(
subs,
config.gc_project_id,
Expand Down

0 comments on commit e1a6c6c

Please sign in to comment.