From be73bd05284b7fead53629c59ea5e4dddda1d6af Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Wed, 13 Dec 2023 08:36:34 +0100 Subject: [PATCH] now prints dask information as well --- .../computational-clusters/osparc_clusters.py | 134 ++++++++++++------ 1 file changed, 92 insertions(+), 42 deletions(-) diff --git a/scripts/maintenance/computational-clusters/osparc_clusters.py b/scripts/maintenance/computational-clusters/osparc_clusters.py index 8ab772b447ca..e0d7b530d081 100755 --- a/scripts/maintenance/computational-clusters/osparc_clusters.py +++ b/scripts/maintenance/computational-clusters/osparc_clusters.py @@ -1,6 +1,7 @@ #! /usr/bin/env python3 import asyncio +import contextlib import datetime import json import re @@ -8,10 +9,11 @@ from dataclasses import dataclass, replace from enum import Enum from pathlib import Path -from typing import Final +from typing import Any, Final import arrow import boto3 +import distributed import paramiko import parse import typer @@ -65,6 +67,10 @@ class ComputationalCluster: primary: ComputationalInstance workers: list[ComputationalInstance] + scheduler_info: dict[str, Any] + datasets: tuple[str, ...] + processing_jobs: dict[str, str] + def _get_instance_name(instance) -> str: for tag in instance.tags: @@ -213,6 +219,8 @@ def _needs_manual_intervention( ] except (paramiko.AuthenticationException, paramiko.SSHException) as exc: raise typer.Abort from exc + except TimeoutError: + return [] finally: # Close the SSH connection client.close() @@ -291,8 +299,10 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: "State", "UserID", "WalletID", - "DaskSchedulerUI", + "Dask (UI+scheduler)", "last heartbeat since", + "known jobs", + "processing jobs", title="computational clusters", ) @@ -316,8 +326,10 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: instance_state, f"{cluster.primary.user_id}", f"{cluster.primary.wallet_id}", - f"http://{cluster.primary.ec2_instance.public_ip_address}:8787", + f"http://{cluster.primary.ec2_instance.public_ip_address}:8787\ntcp://{cluster.primary.ec2_instance.public_ip_address}:8786", _timedelta_formatting(time_now - cluster.primary.last_heartbeat), + f"{len(cluster.datasets)}", + json.dumps(cluster.processing_jobs), ) # now add the workers for worker in cluster.workers: @@ -346,51 +358,67 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: print(table) -def _detect_instances( - instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None -) -> tuple[list[DynamicInstance], list[ComputationalCluster]]: - dynamic_instances = [] - computational_instances = [] - - for instance in track(instances, description="Detecting running instances..."): - if comp_instance := _parse_computational(instance): - computational_instances.append(comp_instance) - elif dyn_instance := _parse_dynamic(instance): - dynamic_instances.append(dyn_instance) - - if ssh_key_path: - # this construction makes the retrieval much faster - all_running_services = asyncio.get_event_loop().run_until_complete( - asyncio.gather( - *( - asyncio.get_event_loop().run_in_executor( - None, - _ssh_and_list_running_dyn_services, - instance.ec2_instance, - "ubuntu", - ssh_key_path, - ) - for instance in dynamic_instances +def _analyze_dynamic_instances_running_services( + dynamic_instances: list[DynamicInstance], ssh_key_path: Path +) -> list[DynamicInstance]: + # this construction makes the retrieval much faster + all_running_services = asyncio.get_event_loop().run_until_complete( + asyncio.gather( + *( + asyncio.get_event_loop().run_in_executor( + None, + _ssh_and_list_running_dyn_services, + instance.ec2_instance, + "ubuntu", + ssh_key_path, ) + for instance in dynamic_instances ) ) + ) - more_detailed_instances = [ - replace( - instance, - running_services=running_services, - ) - for instance, running_services in zip( - dynamic_instances, all_running_services, strict=True + return [ + replace( + instance, + running_services=running_services, + ) + for instance, running_services in zip( + dynamic_instances, all_running_services, strict=True + ) + ] + + +def _analyze_computational_instances( + computational_instances: list[ComputationalInstance], +) -> list[ComputationalCluster]: + computational_clusters = [] + for instance in track( + computational_instances, description="Collecting computational clusters data..." + ): + if instance.role is InstanceRole.manager: + scheduler_info = {} + datasets_on_cluster = () + processing_jobs = {} + with contextlib.suppress(TimeoutError, OSError): + client = distributed.Client( + f"tcp://{instance.ec2_instance.public_ip_address}:8786", timeout=5 + ) + scheduler_info = client.scheduler_info() + datasets_on_cluster = client.list_datasets() + processing_jobs = client.processing() + + assert isinstance(datasets_on_cluster, tuple) + assert isinstance(processing_jobs, dict) + computational_clusters.append( + ComputationalCluster( + primary=instance, + workers=[], + scheduler_info=scheduler_info, + datasets=datasets_on_cluster, + processing_jobs=processing_jobs, + ) ) - ] - dynamic_instances = more_detailed_instances - computational_clusters = [ - ComputationalCluster(primary=instance, workers=[]) - for instance in computational_instances - if instance.role is InstanceRole.manager - ] for instance in computational_instances: if instance.role is InstanceRole.worker: # assign the worker to correct cluster @@ -401,6 +429,28 @@ def _detect_instances( ): cluster.workers.append(instance) + return computational_clusters + + +def _detect_instances( + instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None +) -> tuple[list[DynamicInstance], list[ComputationalCluster]]: + dynamic_instances = [] + computational_instances = [] + + for instance in track(instances, description="Detecting running instances..."): + if comp_instance := _parse_computational(instance): + computational_instances.append(comp_instance) + elif dyn_instance := _parse_dynamic(instance): + dynamic_instances.append(dyn_instance) + + if ssh_key_path: + dynamic_instances = _analyze_dynamic_instances_running_services( + dynamic_instances, ssh_key_path + ) + + computational_clusters = _analyze_computational_instances(computational_instances) + return dynamic_instances, computational_clusters