-
Notifications
You must be signed in to change notification settings - Fork 8
/
inbound.py
140 lines (111 loc) · 4.92 KB
/
inbound.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import requests
import logging
# Marzban information
DOMAIN = 'domain.com' # Replace with your actual domain
PORT = 1234 # Replace with the actual port number
username = 'user' # Replace with your actual username
password = 'pass' # Replace with your actual password
# Inbounds Information
protocol = 'vless' # Replace with your protocol
inbounds_to_remove = ["Vless TCP Inbound"] # Replace with your inbounds list
inbounds_to_add = ["Vless TCP Inbound"] # Replace with your inbounds list
flow = 'xtls-rprx-vision' # Replace with your flow (only works for vless)
def get_access_token(username, password):
url = f'https://{DOMAIN}:{PORT}/api/admin/token'
data = {
'username': username,
'password': password
}
try:
response = requests.post(url, data=data)
response.raise_for_status()
access_token = response.json()['access_token']
return access_token
except requests.exceptions.RequestException as e:
logging.error(f'Error occurred while obtaining access token: {e}')
return None
def get_users_list(access_token):
url = f'https://{DOMAIN}:{PORT}/api/users'
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {access_token}'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
users_list = response.json()
return users_list
except requests.exceptions.RequestException as e:
logging.error(f'Error occurred while retrieving users list: {e}')
return None
def create_empty_proxy_inbound(user_details, protocol):
# Create an empty proxy for the specified protocol
if protocol == "vless":
user_details['proxies'][protocol] = {
'flow': flow
}
else:
user_details['proxies'][protocol] = {}
# Create an empty inbound for the specified protocol
user_details['inbounds'][protocol] = []
def update_inbounds_for_protocol(username, protocol):
url = f'https://{DOMAIN}:{PORT}/api/user/{username}'
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
user_details = response.json()
if 'inbounds' in user_details:
inbounds = user_details['inbounds'].get(protocol, [])
if inbounds_to_remove:
updated_inbounds = [inbound for inbound in inbounds if inbound not in inbounds_to_remove]
user_details['inbounds'][protocol] = updated_inbounds
if protocol not in user_details['proxies'] or protocol not in user_details['inbounds']:
# If the protocol is not present, create an empty proxy and inbound
create_empty_proxy_inbound(user_details, protocol)
if inbounds_to_add:
user_details['inbounds'][protocol] += inbounds_to_add
if user_details['status'] not in ["disabled", "on_hold"]:
user_details['status'] = "active"
# Create a list of keys to remove
keys_to_remove = []
for inbound_protocol in user_details.get('inbounds', {}):
if not user_details['inbounds'][inbound_protocol]:
keys_to_remove.append(inbound_protocol)
# Remove the keys outside of the loop
for key in keys_to_remove:
user_details['inbounds'].pop(key, None)
user_details['proxies'].pop(key, None)
# Modify 'links' and 'subscription_url'
user_details['links'] = []
user_details['subscription_url'] = ""
response = requests.put(url, json=user_details, headers=headers)
response.raise_for_status()
return True
else:
return False
except requests.exceptions.RequestException as e:
logging.error(f'Error occurred while removing inbounds for protocol: {e}')
return False
# Configure logging settings
logging.basicConfig(filename='script_log.log', level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')
access_token = get_access_token(username, password)
if access_token:
users_list = get_users_list(access_token)
if users_list:
for user in users_list['users']:
# Remove specified inbounds for the specified protocol
if 'inbounds' in user:
if update_inbounds_for_protocol(user['username'], protocol):
print(f"Inbounds updated successfully for user {user['username']} and protocol {protocol}.")
else:
print(f"No specified inbounds found for user {user['username']} and protocol {protocol}.")
print("All users modified successfully.")
else:
print("Failed to retrieve the users list.")
else:
print("Failed to obtain the access token.")