-
Notifications
You must be signed in to change notification settings - Fork 0
/
slurm_commands.py
391 lines (287 loc) · 17.4 KB
/
slurm_commands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
import time
from collections import defaultdict
from paramiko import RSAKey
import os
from fabric import Connection
import re
from typing import List
IRIDIS_ADDRESS = "iridis5_a.soton.ac.uk"
USERID = "ii1g17"
#IRIDIS_SSH_KEY = os.path.expanduser("/Users/ioan/.ssh/iridis5")
IRIDIS_SSH_KEY = os.path.expanduser("/home/ii1g17/iridis-light-monitoring/iridis5")
PARTITION_TO_NODE = {"ecsstaff": ["alpha51", "alpha52", "alpha53"],
"ecsall": ["alpha54", "alpha55", "alpha56"],
"gpu": [f"indigo{i}" for i in range(51, 61)],
"gtx1080": [f"pink{i}" for i in range(51, 61)]}
PARTITION_TO_NODE_NAME = {"ecsstaff": "alpha",
'gpu': 'indigo',
'gtx1080': 'pink'}
NODE_NAME_TO_PARTITION = {v: k for k, v in PARTITION_TO_NODE_NAME.items()}
NODE_MAX_RES = {"alpha": {"CPU": 64, "RAM": 371, "GPU": 4},
"indigo": {"CPU": 40, "RAM": 187.5, "GPU": 2},
"pink": {"CPU": 56, "RAM": 125, "GPU": 4}}
PARTITION_MAX_RES = {"ecsstaff": {"CPU": 64 * 3, "RAM": 371 * 3, "GPU": 4 * 3},
"ecsall": {"CPU": 64 * 3, "RAM": 371 * 3, "GPU": 4 * 3},
"gpu": {"CPU": 40 * 10, "RAM": 187.5 * 10, "GPU": 2 * 10},
"gtx1080": {"CPU": 56 * 10, "RAM": 125 * 10, "GPU": 4 * 10}}
class RemoteConnectionManager:
def __init__(self):
self.private_key = RSAKey(filename=IRIDIS_SSH_KEY)
self.connection = self._establish_connection()
def _establish_connection(self):
return Connection(host=IRIDIS_ADDRESS, user=USERID, connect_kwargs={'pkey': self.private_key})
def run_command(self, command):
try:
if not self.connection.is_connected:
# we need to kill the current session so we don't consume all possbile connections
self.connection.close()
self.connection = self._establish_connection() # Establish a new connection
return self.connection.run(command, hide=True).stdout
except Exception as e:
print(f"Error: {e}")
return None
def check_node_still_exists(node_name: str, conn_manager: RemoteConnectionManager) -> bool:
"""Check if a node exists on the remote system."""
output = conn_manager.run_command(f"scontrol show node {node_name}")
if isinstance(output, str) and "not found" not in output:
return True
return False
def filter_existing_nodes(partition_to_node_dict, conn_manager):
filtered_partition_to_node = {}
for part_name, node_list in partition_to_node_dict.items():
filtered_node_list = [node_name for node_name in node_list if check_node_still_exists(node_name, conn_manager)]
filtered_partition_to_node[part_name] = filtered_node_list
print("Filtered:", filtered_partition_to_node)
return filtered_partition_to_node
def parse_scontrol_output(output: str) -> dict:
return dict(pair.split("=", maxsplit=1) for line in output.split('\n')
for pair in re.split(' ', line) if "=" in pair)
def memory_string_to_GB(memory_string: str) -> float:
"""Converts the memory string to GB"""
if memory_string == "":
return 0
memory_string = memory_string.strip().upper()
multipliers = {'G': 1, 'M': 1e-3, 'K': 1e-6, 'T': 1e3}
for unit, multiplier in multipliers.items():
if unit in memory_string:
return round(float(memory_string.replace(unit, '')) * multiplier, 1)
# Default to MB if no unit is specified
return round(float(memory_string.replace('M', '')) * 1e-3, 1)
# -------------------------------------------Node Info-------------------------------------------
def get_node_info(node_name: str, conn_manager: RemoteConnectionManager) -> dict:
"""
Gets the GPU, RAM, CPU etc. information for a node.
Args:
node_name: The name of the node to check.
Returns:
dict: A dictionary containing the node's information, property->value.
"""
output = conn_manager.run_command(f"scontrol show node {node_name}")
return parse_scontrol_output(output)
def get_node_GPU_name(node_info: dict) -> str:
return node_info["Gres"].split(":")[1]
def get_node_GPU_count(node_info: dict) -> int:
return int(node_info["Gres"].split(":")[2])
def get_node_CPU_cores_count(node_info: dict) -> int:
return int(node_info["CPUTot"])
def get_node_RAM_count(node_info: dict) -> float:
"""Returns the total RAM available in GB"""
return memory_string_to_GB(node_info["RealMemory"])
def get_node_allocated_GPU(node_info: dict, locked_usage: bool = True) -> int:
"""For gtx1080 and gpu partitions, the GPU resources are locked to the user,
so we can return the number of GPUs locked or used"""
node_name = re.sub(r'\d', '', node_info["NodeName"])
if not locked_usage or node_name == "alpha":
if "gres/gpu=" in node_info["AllocTRES"]:
return int(node_info["AllocTRES"].split("gres/gpu=")[1])
else:
# if there is no owner (user locked to resources) that means the node is free
if NODE_NAME_TO_PARTITION[node_name] in ["gtx1080", "gpu"] and node_info["Owner"].strip() != 'N/A':
return NODE_MAX_RES[node_name]["GPU"]
return 0
def get_node_allocated_CPU_cores(node_info: dict) -> int:
return int(node_info["CPUAlloc"])
def get_node_allocated_RAM(node_info: dict) -> float:
"""Returns the allocated RAM in GB"""
return memory_string_to_GB(node_info["AllocMem"])
def get_node_free_RAM(node_info: dict) -> float:
"""Returns the free RAM in GB"""
if "N/A" in node_info["FreeMem"]:
return 0
return memory_string_to_GB(node_info["FreeMem"])
def get_node_current_power_usage(node_info: dict) -> float:
return float(node_info["CurrentWatts"])
def get_node_avg_power_usage(node_info: dict) -> float:
return float(node_info["AverageWatts"])
def get_node_state(node_name: str, conn_manager: RemoteConnectionManager) -> str:
return conn_manager.run_command(f'sinfo --noheader -n {node_name} --Format StateLong').strip()
# -------------------------------------------Job Info-----------------------------------------------
def get_job_info(job_info: str, conn_manager: RemoteConnectionManager) -> dict:
"""
Gets the all the information about a job and returns it as a dictionary.
Args:
node_name: The name of the job to check.
Returns:
dict: A dictionary containing the job's information, property->value.
"""
output = conn_manager.run_command(f"scontrol show job {job_info}")
return parse_scontrol_output(output)
def get_job_allocated_GPU(job_info: dict) -> int:
if "gres/gpu=" not in job_info["AllocTRES"]:
return 0
return int(job_info["AllocTRES"].split("gres/gpu=")[1])
# -------------------------------------------User Info----------------------------------------------
def get_user_allocated_nodes_on_partition(partition_name: str,
conn_manager: RemoteConnectionManager,
username: str) -> List[str]:
nodes = conn_manager.run_command(f"sacct --noheader --partition={partition_name} -u {username} "
f"--state=RUNNING -X --format NodeList")
return [el for el in set([n.strip() for n in nodes.strip().split("\n")]) if el != ""]
def get_user_job_ids_on_partition(partition_name: str,
conn_manager: RemoteConnectionManager,
username: str,
job_state: str = "RUNNING") -> List[str]:
output = conn_manager.run_command(f"squeue --noheader -p {partition_name} -u {username} -t {job_state} -o %A")
return output.strip().split("\n")
def get_user_allocated_CPU_cores_on_partition(partition_name: str,
conn_manager: RemoteConnectionManager,
username: str) -> int:
cpus = conn_manager.run_command(f"sacct --noheader --partition={partition_name} "
f"-u {username} --state=RUNNING -X --format AllocCPUs")
if cpus.strip() == "":
return 0
return sum([int(cpu.strip()) for cpu in cpus.strip().split("\n")])
def get_user_allocated_GPU_on_partition(partition_name: str,
conn_manager: RemoteConnectionManager,
username: str,
locked_usage: bool) -> int:
"""I can't find a nice way of getting th GPU count. AllocTRES doesn't really return what we want.
However, the gpu and gtx1080 partitions lock the GPU resources on the node to the user,
regardless of the number of GPUs requested.
So, we can just count the number of nodes the user has allocated and multiply by the number of GPUs per node.
For ecsstaff and ecsall, we can't do this, so we just have to go through all the running jobs.
"""
if partition_name in ["gpu", "gtx1080"] and locked_usage:
return len(get_user_allocated_nodes_on_partition(partition_name, conn_manager, username)) * \
NODE_MAX_RES[PARTITION_TO_NODE_NAME[partition_name]]["GPU"]
else:
running_jobs = get_user_job_ids_on_partition(partition_name, conn_manager, username)
return sum([get_job_allocated_GPU(get_job_info(job_id, conn_manager)) for job_id in running_jobs])
def get_user_allocated_RAM_on_partition(partition_name: str,
conn_manager: RemoteConnectionManager,
username: str) -> int:
"""Returns the RAM usage in GB"""
rams = conn_manager.run_command(f"sacct --noheader --partition={partition_name} "
f"-u {username} --state=RUNNING -X --format ReqMem")
if rams.strip() == "":
return 0
return round(sum([memory_string_to_GB(r.strip()) for r in rams.strip().split("\n")]), 1)
# -------------------------------------------Partition Info-------------------------------------------
def get_partition_queue_job_info(partition_name: str,
conn_manager: RemoteConnectionManager,
attribute: str,
job_state: str = "RUNNING"):
output = conn_manager.run_command(f'squeue --noheader -p {partition_name} -t {job_state} --Format {attribute}')
return [o.strip() for o in output.strip().split("\n") if o.strip() != ""]
def get_partition_CPU_cores_info(partition_name: str,
conn_manager: RemoteConnectionManager,
resource_state: str) -> int:
cpu_info = conn_manager.run_command(f'sinfo --noheader -p {partition_name} --Format CPUsState')
allocated, idle, other, total = [int(c) if c != "" else 0 for c in cpu_info.strip().split("/")]
resource_state_map = {
"ALLOCATED": allocated,
"IDLE": idle,
"OTHER": other,
"TOTAL": total
}
return resource_state_map.get(resource_state.upper(), ValueError(f"Invalid resource state: {resource_state}"))
def get_partition_allocated_GPU(partition_name: str,
conn_manager: RemoteConnectionManager,
locked_usage: bool = False) -> int:
"""Can't find a way to get this number directly, so we are iterating through all the nodes"""
return sum([get_node_allocated_GPU(get_node_info(node, conn_manager), locked_usage=locked_usage)
for node in PARTITION_TO_NODE[partition_name]])
def get_partition_allocated_RAM(partition_name: str,
conn_manager: RemoteConnectionManager) -> float:
"""Returns the RAM usage in GB"""
rams = conn_manager.run_command(f"sacct --noheader --partition={partition_name} "
f"--allusers --state=RUNNING -X --format ReqMem")
return sum([memory_string_to_GB(r) for r in rams.strip().split("\n")])
# --------------------------------------------info gathering-----------------------------------------
def calculate_percentage(numerator, denominator):
return round((numerator / denominator) * 100, 1)
def aggregate_partition_info(conn_manager: RemoteConnectionManager, locked_usage: bool = True) -> dict:
"""Gets all the information about all GPU partitions and returns it as a dictionary.
This is needed to populate the front end"""
global PARTITION_TO_NODE
info = defaultdict(dict)
PARTITION_TO_NODE = filter_existing_nodes(PARTITION_TO_NODE, conn_manager)
for p in PARTITION_TO_NODE.keys():
info[p] = {'cpu_free': get_partition_CPU_cores_info(p, conn_manager, "idle"),
'cpu_total': get_partition_CPU_cores_info(p, conn_manager, "total"),
'ram_free': max(round((PARTITION_MAX_RES[p]['RAM'] - get_partition_allocated_RAM(p, conn_manager)), 1), 0),
'ram_total': PARTITION_MAX_RES[p]['RAM'],
'gpu_name': get_node_GPU_name(get_node_info(PARTITION_TO_NODE[p][0], conn_manager)),
'gpu_free': PARTITION_MAX_RES[p]['GPU'] - get_partition_allocated_GPU(p, conn_manager, locked_usage),
'gpu_total': PARTITION_MAX_RES[p]['GPU'],
'nodes': {}
}
info[p]['cpu_free_percentage'] = calculate_percentage(info[p]['cpu_free'], info[p]['cpu_total'])
info[p]['ram_free_percentage'] = calculate_percentage(info[p]['ram_free'], info[p]['ram_total'])
info[p]['gpu_free_percentage'] = calculate_percentage(info[p]['gpu_free'], info[p]['gpu_total'])
for n in PARTITION_TO_NODE[p]:
node_info = get_node_info(n, conn_manager)
node_name = "".join(x for x in n if x.isalpha())
info[p]['nodes'][n] = {'state': get_node_state(n, conn_manager),
'cpu_allocated': get_node_allocated_CPU_cores(node_info),
'cpu_total': NODE_MAX_RES[node_name]['CPU'],
# for gtx1080ti and gpu partitions, Alloc ram always returns 0??
'ram_allocated': get_node_allocated_RAM(node_info) if p in ['ecsstaff',
'ecsall'] else
round(NODE_MAX_RES[node_name]['RAM'] - get_node_free_RAM(node_info), 1),
'ram_total': NODE_MAX_RES[node_name]['RAM'],
'gpu_allocated': get_node_allocated_GPU(node_info, locked_usage=locked_usage),
'gpu_total': NODE_MAX_RES[node_name]['GPU']
}
info[p]['nodes'][n]['cpu_allocated_percentage'] = calculate_percentage(info[p]['nodes'][n]['cpu_allocated'],
info[p]['nodes'][n]['cpu_total'])
info[p]['nodes'][n]['ram_allocated_percentage'] = calculate_percentage(info[p]['nodes'][n]['ram_allocated'],
info[p]['nodes'][n]['ram_total'])
info[p]['nodes'][n]['gpu_allocated_percentage'] = calculate_percentage(info[p]['nodes'][n]['gpu_allocated'],
info[p]['nodes'][n]['gpu_total'])
return info
def aggregate_user_info(conn_manager: RemoteConnectionManager) -> dict:
"""gets the ram, cpu and gpu usage for each user that has a running job on the gpu partitions
"""
info = defaultdict(lambda: {'cpu_allocated': 0, 'ram_allocated': 0, 'gpu_allocated': 0, 'gpu_locked': 0})
user_running_set = set()
# get all the users that have running jobs on the cluster
for partition in PARTITION_TO_NODE.keys():
user_queue_job_info = get_partition_queue_job_info(partition, conn_manager, "UserName")
user_running_set.update(user_queue_job_info)
for user in user_running_set:
for partition in PARTITION_TO_NODE.keys():
info[user]['cpu_allocated'] += get_user_allocated_CPU_cores_on_partition(partition, conn_manager, user)
info[user]['ram_allocated'] += get_user_allocated_RAM_on_partition(partition, conn_manager, user)
info[user]['gpu_allocated'] += get_user_allocated_GPU_on_partition(partition, conn_manager, user, False)
info[user]['gpu_locked'] += get_user_allocated_GPU_on_partition(partition, conn_manager, user, True)
# apply round to RAM values with 1 decimal place
for user in info.keys():
info[user]['ram_allocated'] = round(info[user]['ram_allocated'], 1)
return info
def filter_users_with_no_GPU_usage(info: dict) -> dict:
"""Keeps only the bad users"""
return {user: user_info for user, user_info in info.items() if user_info['gpu_allocated'] == 0
and user_info['gpu_locked'] != 0}
def filter_users_with_partial_GPU_usage(info: dict) -> dict:
"""Keeps only the bad users"""
return {user: user_info for user, user_info in info.items() if user_info['gpu_allocated'] < user_info['gpu_locked']
and user_info['gpu_allocated'] != 0}
if __name__ == "__main__":
conn_manager = RemoteConnectionManager()
import pprint
while True:
node_info = get_node_info("pink51", conn_manager)
a = aggregate_user_info(conn_manager)
pprint.pprint(a)
time.sleep(1) # Wait for a minute before checking again