This repository has been archived by the owner on Jun 15, 2024. It is now read-only.
forked from tropicoo/yt-dlp-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstart.py
110 lines (83 loc) · 3.07 KB
/
start.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""Over-engineered Python 3.10+ version of bash script with netcat (nc) just for fun.
#!/bin/bash
check_reachability() {
while ! nc -z "$1" "${!2}"
do
echo "Waiting for $3 to be reachable on port ${!2}"
sleep 1
done
echo "Connection to $3 on port ${!2} verified"
return 0
}
wait_for_services_to_be_reachable() {
check_reachability rabbitmq RABBITMQ_PORT RabbitMQ
check_reachability postgres POSTGRES_PORT PostgreSQL
}
wait_for_services_to_be_reachable
exit 0
"""
import asyncio
import logging
import os
import sys
from dataclasses import dataclass, field
from typing import Generator, Type
SOCK_CONNECTED = 0
DEFAULT_PORT = 0
DEFAULT_SLEEP_TIME = 1
class ServiceRegistry(type):
REGISTRY: dict[str, type['BaseService']] = {}
def __new__(
mcs: Type['ServiceRegistry'],
name: str,
bases: tuple[type['BaseService']],
attrs: dict,
) -> type['BaseService']:
service_cls: type['BaseService'] = type.__new__(mcs, name, bases, attrs)
mcs.REGISTRY[service_cls.__name__] = service_cls
return service_cls
@classmethod
def get_registry(mcs) -> dict[str, type['BaseService']]:
return mcs.REGISTRY.copy()
@classmethod
def get_instances(mcs) -> Generator['BaseService', None, None]:
return (service_cls() for service_cls in mcs.REGISTRY.values())
@dataclass
class BaseService:
name: str = field(default='', init=False)
host: str = field(default='', init=False)
port: int = field(default=DEFAULT_PORT, init=False)
def __post_init__(self) -> None:
if self.__class__ is BaseService:
raise TypeError('Cannot instantiate abstract class.')
@dataclass
class RabbitMQService(BaseService, metaclass=ServiceRegistry):
name: str = field(default='RabbitMQ')
host: str = field(default=os.getenv('RABBITMQ_HOST'))
port: int = field(default=int(os.getenv('RABBITMQ_PORT', DEFAULT_PORT)))
@dataclass
class PostgreSQLService(BaseService, metaclass=ServiceRegistry):
name: str = field(default='PostgreSQL')
host: str = field(default=os.getenv('POSTGRES_HOST'))
port: int = field(default=int(os.getenv('POSTGRES_PORT', DEFAULT_PORT)))
async def is_port_open(host: str, port: int) -> bool:
try:
reader, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
return True
except Exception:
return False
async def check_reachability(service: BaseService) -> None:
while True:
print(f'[{service.name}] Waiting to be reachable on port {service.port}')
if await is_port_open(host=service.host, port=service.port):
break
await asyncio.sleep(DEFAULT_SLEEP_TIME)
print(f'[{service.name}] Connection on port {service.port} verified')
async def main() -> None:
logging.getLogger('asyncio').setLevel(logging.ERROR)
coros = [check_reachability(service) for service in ServiceRegistry.get_instances()]
await asyncio.gather(*coros)
if __name__ == '__main__':
sys.exit(asyncio.run(main()))