-
Notifications
You must be signed in to change notification settings - Fork 3
/
fconnch.py
134 lines (106 loc) · 3.5 KB
/
fconnch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Entry-point script
import asyncio
import sys
from pathlib import Path
from fconnch.checker import site_is_online, site_is_online_async, statuses
from fconnch.cli import (
BLD,
W,
banner,
display_check_result,
no_color,
read_user_cli_args,
table,
)
online_urls = {}
def main():
""" Run ConnChecker."""
global verbose_mode, user_args
banner()
user_args = read_user_cli_args()
verbose_mode = True if user_args.verbose else False
if user_args.no_color:
no_color()
total_urls, urls = _get_websites_urls(user_args)
if not urls:
print("Error: No URL to check.", file=sys.stderr)
sys.exit(1)
table()
if user_args.asynchronous:
asyncio.run(_asynchronous_check(urls))
else:
_synchronous_check(urls)
show_final_result()
write_online_urls_to_file(online_urls, user_args.output_file)
def _get_websites_urls(user_args):
global total_urls
urls = user_args.urls
if user_args.input_file:
urls += _read_urls_from_file(user_args.input_file)
urls = list(filter(None, urls))
total_urls = len(urls)
return total_urls, urls
def _read_urls_from_file(file):
file_path = Path(file)
if file_path.is_file():
with file_path.open() as urls_file:
urls = [url.strip() for url in urls_file]
if urls:
return urls
print(f"Error: Empty input file, {file}", file=sys.stderr)
else:
print("Error: input file not found!", file=sys.stderr)
return []
async def _asynchronous_check(urls):
async def _check(url):
error = ""
try:
response = await site_is_online_async(url, timeout=user_args.timeout)
except Exception as e:
response = False
error = f"{str(e)}"
if verbose_mode:
display_check_result(response, url, error)
else:
display_check_result(response, url)
if response:
online_urls[url] = response
await asyncio.gather(*(_check(url) for url in urls))
def _synchronous_check(urls):
for url in urls:
error = ""
try:
response = site_is_online(url, timeout=user_args.timeout)
except Exception as e:
response = False
error = f"{str(e)}"
if verbose_mode:
display_check_result(response, url, error)
else:
display_check_result(response, url)
if response:
online_urls[url] = response
def write_online_urls_to_file(online_urls, output_file):
"""Write the online URLs to a text file."""
with open(output_file, 'w') as file:
for url, status in online_urls.items():
file.write(f"{url},{status}\n")
print(f"Online URLs written to '{output_file}'.")
def show_final_result():
offline_urls = int(total_urls) - sum(statuses.values())
if offline_urls:
statuses.setdefault("Status(Offline)", offline_urls)
headers = ["Total URLs", *list(statuses.keys())]
print()
[print(f"{BLD}{str(h):15}{W}", end='') for h in headers]
print('\n' + '-' * (15 * len(statuses) + 15))
[print(f"{str(key):15}", end='') for key in [total_urls, *list(statuses.values())]]
print("\n")
# print(f"\n{B}Total Domains:{W} {total_urls}\n{B}Statuses:{W} {statuses}")
async def run_sequence(*functions) -> None:
for function in functions:
await function
async def run_parallel(*functions) -> None:
await asyncio.gather(*functions)
if __name__ == "__main__":
main()