-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
148 lines (111 loc) · 4.16 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import glob
import json
import os
from datetime import datetime, timedelta, timezone
from urllib.parse import unquote
from better_proxy import Proxy
from bot.config import settings
from bot.exceptions import MissingTelegramAPIException
def calculate_real_number(number):
return number / 1000000000
def update_or_add_dict(list_of_dicts, key, key_value, new_data):
"""
Updates a dictionary in a list of dictionaries if the key exists, otherwise adds a new dictionary.
:param list_of_dicts: List of dictionaries to update or add to.
:param key: The key to check for existence in each dictionary.
:param key_value: The value of the key to match.
:param new_data: The data to update or add to the dictionary.
"""
for dictionary in list_of_dicts:
if dictionary.get(key) == key_value and dictionary.get("upgrade_lvl") < new_data.get(
"upgrade_lvl"
):
dictionary.update(new_data)
return
# If the key_value is not found, add a new dictionary
# new_dict = {key: key_value}
# new_dict.update(new_data)
list_of_dicts.append(new_data)
def get_session_names() -> list[str]:
session_names = glob.glob("sessions/*.session")
session_names = [os.path.splitext(os.path.basename(file))[0] for file in session_names]
return session_names
def check_telegram_api():
API_ID = settings.API_ID
API_HASH = settings.API_HASH
if not API_ID or not API_HASH:
raise MissingTelegramAPIException(
"API_ID and API_HASH is missing, please check your .env file!"
)
def get_proxies() -> list[Proxy]:
if settings.USE_PROXY_FROM_FILE.lower() == "true":
with open(file="bot/config/proxies.txt", encoding="utf-8-sig") as file:
proxies = [Proxy.from_str(proxy=row.strip()).as_url for row in file]
else:
proxies = []
return proxies
def convert_datetime_str_to_utc(datetime_str):
decimal_index = datetime_str.find(".")
if decimal_index != -1:
# Ensure only 3 digits after the decimal point for milliseconds
datetime_str = datetime_str[: decimal_index + 4]
return datetime.fromisoformat(datetime_str).replace(tzinfo=timezone.utc)
def format_duration(seconds):
message = ""
duration_td = timedelta(seconds=seconds)
days, day_remainder = divmod(duration_td.total_seconds(), 86400)
hours, remainder = divmod(day_remainder, 3600)
minutes, seconds = divmod(remainder, 60)
if days:
message = f"{int(days)} days "
if hours:
message = message + f"{int(hours)} hours "
if minutes:
message = message + f"{int(minutes)} minute "
if seconds:
message = message + f"{int(seconds)} seconds"
return message.strip()
def mapping_role_color(role):
if role == "admin":
role = f"[cyan]{role}[/cyan]"
elif role == "premium":
role = f"[magenta]{role}[/magenta]"
return role
def decode_query_id(query_id):
query_string = query_id
if "tgWebAppData" in query_id:
query_string = unquote(
string=query_id.split("tgWebAppData=", maxsplit=1)[1].split(
"&tgWebAppVersion", maxsplit=1
)[0]
)
parameters = query_string.split("&")
decoded_pairs = [(param.split("=")[0], unquote(param.split("=")[1])) for param in parameters]
result = dict()
for key, value in decoded_pairs:
result[key] = value
reassign(result)
return result
def reassign(d):
"""
check if you have a dict after using literal_eval and reassign
"""
for k, v in d.items():
if v[0] in {"{", "["}:
try:
evald = json.loads(v)
if isinstance(evald, dict):
d[k] = evald
except ValueError as err:
pass
async def get_query_ids():
temp_lines = []
with open("query_ids.txt", "r") as file:
temp_lines = file.readlines()
lines = [line.strip() for line in temp_lines]
return lines
def get_tele_user_obj_from_query_id(query_id):
# formatted_query_id = unquote(string=query_id)
query_obj = decode_query_id(query_id)
tele_user_obj = query_obj.get("user", {})
return tele_user_obj