Skip to content

Commit

Permalink
now prints dask information as well
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 13, 2023
1 parent a35da27 commit be73bd0
Showing 1 changed file with 92 additions and 42 deletions.
134 changes: 92 additions & 42 deletions scripts/maintenance/computational-clusters/osparc_clusters.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
#! /usr/bin/env python3

import asyncio
import contextlib
import datetime
import json
import re
from collections import defaultdict, namedtuple
from dataclasses import dataclass, replace
from enum import Enum
from pathlib import Path
from typing import Final
from typing import Any, Final

import arrow
import boto3
import distributed
import paramiko
import parse
import typer
Expand Down Expand Up @@ -65,6 +67,10 @@ class ComputationalCluster:
primary: ComputationalInstance
workers: list[ComputationalInstance]

scheduler_info: dict[str, Any]
datasets: tuple[str, ...]
processing_jobs: dict[str, str]


def _get_instance_name(instance) -> str:
for tag in instance.tags:
Expand Down Expand Up @@ -213,6 +219,8 @@ def _needs_manual_intervention(
]
except (paramiko.AuthenticationException, paramiko.SSHException) as exc:
raise typer.Abort from exc
except TimeoutError:
return []
finally:
# Close the SSH connection
client.close()
Expand Down Expand Up @@ -291,8 +299,10 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
"State",
"UserID",
"WalletID",
"DaskSchedulerUI",
"Dask (UI+scheduler)",
"last heartbeat since",
"known jobs",
"processing jobs",
title="computational clusters",
)

Expand All @@ -316,8 +326,10 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
instance_state,
f"{cluster.primary.user_id}",
f"{cluster.primary.wallet_id}",
f"http://{cluster.primary.ec2_instance.public_ip_address}:8787",
f"http://{cluster.primary.ec2_instance.public_ip_address}:8787\ntcp://{cluster.primary.ec2_instance.public_ip_address}:8786",
_timedelta_formatting(time_now - cluster.primary.last_heartbeat),
f"{len(cluster.datasets)}",
json.dumps(cluster.processing_jobs),
)
# now add the workers
for worker in cluster.workers:
Expand Down Expand Up @@ -346,51 +358,67 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
print(table)


def _detect_instances(
instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None
) -> tuple[list[DynamicInstance], list[ComputationalCluster]]:
dynamic_instances = []
computational_instances = []

for instance in track(instances, description="Detecting running instances..."):
if comp_instance := _parse_computational(instance):
computational_instances.append(comp_instance)
elif dyn_instance := _parse_dynamic(instance):
dynamic_instances.append(dyn_instance)

if ssh_key_path:
# this construction makes the retrieval much faster
all_running_services = asyncio.get_event_loop().run_until_complete(
asyncio.gather(
*(
asyncio.get_event_loop().run_in_executor(
None,
_ssh_and_list_running_dyn_services,
instance.ec2_instance,
"ubuntu",
ssh_key_path,
)
for instance in dynamic_instances
def _analyze_dynamic_instances_running_services(
dynamic_instances: list[DynamicInstance], ssh_key_path: Path
) -> list[DynamicInstance]:
# this construction makes the retrieval much faster
all_running_services = asyncio.get_event_loop().run_until_complete(
asyncio.gather(
*(
asyncio.get_event_loop().run_in_executor(
None,
_ssh_and_list_running_dyn_services,
instance.ec2_instance,
"ubuntu",
ssh_key_path,
)
for instance in dynamic_instances
)
)
)

more_detailed_instances = [
replace(
instance,
running_services=running_services,
)
for instance, running_services in zip(
dynamic_instances, all_running_services, strict=True
return [
replace(
instance,
running_services=running_services,
)
for instance, running_services in zip(
dynamic_instances, all_running_services, strict=True
)
]


def _analyze_computational_instances(
computational_instances: list[ComputationalInstance],
) -> list[ComputationalCluster]:
computational_clusters = []
for instance in track(
computational_instances, description="Collecting computational clusters data..."
):
if instance.role is InstanceRole.manager:
scheduler_info = {}
datasets_on_cluster = ()
processing_jobs = {}
with contextlib.suppress(TimeoutError, OSError):
client = distributed.Client(
f"tcp://{instance.ec2_instance.public_ip_address}:8786", timeout=5
)
scheduler_info = client.scheduler_info()
datasets_on_cluster = client.list_datasets()
processing_jobs = client.processing()

assert isinstance(datasets_on_cluster, tuple)
assert isinstance(processing_jobs, dict)
computational_clusters.append(
ComputationalCluster(
primary=instance,
workers=[],
scheduler_info=scheduler_info,
datasets=datasets_on_cluster,
processing_jobs=processing_jobs,
)
)
]
dynamic_instances = more_detailed_instances

computational_clusters = [
ComputationalCluster(primary=instance, workers=[])
for instance in computational_instances
if instance.role is InstanceRole.manager
]
for instance in computational_instances:
if instance.role is InstanceRole.worker:
# assign the worker to correct cluster
Expand All @@ -401,6 +429,28 @@ def _detect_instances(
):
cluster.workers.append(instance)

return computational_clusters


def _detect_instances(
instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None
) -> tuple[list[DynamicInstance], list[ComputationalCluster]]:
dynamic_instances = []
computational_instances = []

for instance in track(instances, description="Detecting running instances..."):
if comp_instance := _parse_computational(instance):
computational_instances.append(comp_instance)
elif dyn_instance := _parse_dynamic(instance):
dynamic_instances.append(dyn_instance)

if ssh_key_path:
dynamic_instances = _analyze_dynamic_instances_running_services(
dynamic_instances, ssh_key_path
)

computational_clusters = _analyze_computational_instances(computational_instances)

return dynamic_instances, computational_clusters


Expand Down

0 comments on commit be73bd0

Please sign in to comment.