diff --git a/.env-devel b/.env-devel index e827e442c05..a01261c5cff 100644 --- a/.env-devel +++ b/.env-devel @@ -36,6 +36,7 @@ CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}' CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest CLUSTERS_KEEPER_DASK_NTHREADS=0 +CLUSTERS_KEEPER_DASK_WORKER_SATURATION=inf CLUSTERS_KEEPER_EC2_ACCESS=null CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=5 CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES=null diff --git a/Makefile b/Makefile index 51b1a1dba57..ea01b0a0bae 100644 --- a/Makefile +++ b/Makefile @@ -523,14 +523,13 @@ nodenv: node_modules ## builds node_modules local environ (TODO) pylint: ## python linting # pylint version info - @/bin/bash -c "pylint --version" + @pylint --version # Running linter in packages and services (except director) @folders=$$(find $(CURDIR)/services $(CURDIR)/packages -type d -not -path "*/director/*" -name 'src' -exec dirname {} \; | sort -u); \ exit_status=0; \ for folder in $$folders; do \ - pushd "$$folder"; \ - make pylint || exit_status=1; \ - popd; \ + echo "Linting $$folder"; \ + $(MAKE_C) "$$folder" pylint || exit_status=1; \ done;\ exit $$exit_status # Running linter elsewhere diff --git a/packages/settings-library/src/settings_library/base.py b/packages/settings-library/src/settings_library/base.py index b40f658c944..296b453e26c 100644 --- a/packages/settings-library/src/settings_library/base.py +++ b/packages/settings-library/src/settings_library/base.py @@ -13,8 +13,9 @@ ) from pydantic.error_wrappers import ErrorList, ErrorWrapper from pydantic.fields import ModelField, Undefined +from pydantic.typing import is_literal_type -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) _DEFAULTS_TO_NONE_MSG: Final[ str @@ -39,7 +40,7 @@ def _default_factory(): except ValidationError as err: if field.allow_none: # e.g. Optional[PostgresSettings] would warn if defaults to None - logger.warning( + _logger.warning( _DEFAULTS_TO_NONE_MSG, field.name, ) @@ -101,8 +102,14 @@ def prepare_field(cls, field: ModelField) -> None: is_not_composed = ( get_origin(field_type) is None ) # is not composed as dict[str, Any] or Generic[Base] - - if is_not_composed and issubclass(field_type, BaseCustomSettings): + # avoid literals raising TypeError + is_not_literal = is_literal_type(field.type_) is False + + if ( + is_not_literal + and is_not_composed + and issubclass(field_type, BaseCustomSettings) + ): if auto_default_from_env: assert field.field_info.default is Undefined assert field.field_info.default_factory is None @@ -112,7 +119,11 @@ def prepare_field(cls, field: ModelField) -> None: field.default = None field.required = False # has a default now - elif is_not_composed and issubclass(field_type, BaseSettings): + elif ( + is_not_literal + and is_not_composed + and issubclass(field_type, BaseSettings) + ): msg = f"{cls}.{field.name} of type {field_type} must inherit from BaseCustomSettings" raise ConfigError(msg) diff --git a/packages/settings-library/tests/test_base.py b/packages/settings-library/tests/test_base.py index 55350d3cce3..f6afeeee093 100644 --- a/packages/settings-library/tests/test_base.py +++ b/packages/settings-library/tests/test_base.py @@ -2,6 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable # pylint: disable=too-many-arguments +# pylint: disable=protected-access import inspect import json @@ -196,7 +197,7 @@ def test_auto_default_to_none_logs_a_warning( create_settings_class: Callable[[str], type[BaseCustomSettings]], mocker: MockerFixture, ): - logger_warn = mocker.spy(settings_library.base.logger, "warning") + logger_warn = mocker.spy(settings_library.base._logger, "warning") # noqa: SLF001 S = create_settings_class("S") diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 1c1d7c6b970..a26fb81381c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -115,16 +115,30 @@ async def compute_node_used_resources( app: FastAPI, instance: AssociatedInstance ) -> Resources: try: - num_results_in_memory = await dask.get_worker_still_has_results_in_memory( + resource = await dask.get_worker_used_resources( _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance ) - if num_results_in_memory > 0: - # NOTE: this is a trick to consider the node still useful - return Resources(cpus=0, ram=ByteSize(1024 * 1024 * 1024)) - return await dask.get_worker_used_resources( - _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance + if resource == Resources.create_as_empty(): + num_results_in_memory = ( + await dask.get_worker_still_has_results_in_memory( + _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance + ) + ) + if num_results_in_memory > 0: + _logger.debug( + "found %s for %s", + f"{num_results_in_memory=}", + f"{instance.ec2_instance.id}", + ) + # NOTE: this is a trick to consider the node still useful + return Resources(cpus=0, ram=ByteSize(1024 * 1024 * 1024)) + + _logger.debug( + "found %s for %s", f"{resource=}", f"{instance.ec2_instance.id}" ) + return resource except (DaskWorkerNotFoundError, DaskNoWorkersError): + _logger.debug("no resource found for %s", f"{instance.ec2_instance.id}") return Resources.create_as_empty() @staticmethod diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index 512984a1fdb..269a5b823d0 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -1,3 +1,4 @@ +import collections import contextlib import logging import re @@ -217,30 +218,39 @@ async def get_worker_used_resources( DaskNoWorkersError """ - def _get_worker_used_resources( + def _list_processing_tasks_on_worker( dask_scheduler: distributed.Scheduler, *, worker_url: str - ) -> dict[str, float] | None: - for worker_name, worker_state in dask_scheduler.workers.items(): - if worker_url != worker_name: - continue - if worker_state.status is distributed.Status.closing_gracefully: - # NOTE: when a worker was retired it is in this state - return {} - return dict(worker_state.used_resources) - return None + ) -> list[tuple[DaskTaskId, DaskTaskResources]]: + processing_tasks = [] + for task_key, task_state in dask_scheduler.tasks.items(): + if task_state.processing_on and ( + task_state.processing_on.address == worker_url + ): + processing_tasks.append((task_key, task_state.resource_restrictions)) + return processing_tasks async with _scheduler_client(scheduler_url, authentication) as client: worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance) + _logger.debug("looking for processing tasksfor %s", f"{worker_url=}") + # now get the used resources - worker_used_resources: dict[str, Any] | None = await _wrap_client_async_routine( - client.run_on_scheduler(_get_worker_used_resources, worker_url=worker_url), + worker_processing_tasks: list[ + tuple[DaskTaskId, DaskTaskResources] + ] = await _wrap_client_async_routine( + client.run_on_scheduler( + _list_processing_tasks_on_worker, worker_url=worker_url + ), ) - if worker_used_resources is None: - raise DaskWorkerNotFoundError(worker_host=worker_url, url=scheduler_url) + + total_resources_used: collections.Counter[str] = collections.Counter() + for _, task_resources in worker_processing_tasks: + total_resources_used.update(task_resources) + + _logger.debug("found %s for %s", f"{total_resources_used=}", f"{worker_url=}") return Resources( - cpus=worker_used_resources.get("CPU", 0), - ram=parse_obj_as(ByteSize, worker_used_resources.get("RAM", 0)), + cpus=total_resources_used.get("CPU", 0), + ram=parse_obj_as(ByteSize, total_resources_used.get("RAM", 0)), ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index f5639eac8ce..a70319801d6 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -59,23 +59,41 @@ _PENDING_DOCKER_TASK_MESSAGE: Final[str] = "pending task scheduling" _INSUFFICIENT_RESOURCES_DOCKER_TASK_ERR: Final[str] = "insufficient resources on" _NOT_SATISFIED_SCHEDULING_CONSTRAINTS_TASK_ERR: Final[str] = "no suitable node" +_OSPARC_SERVICE_READY_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( + DockerLabelKey, "io.simcore.osparc-services-ready" +) +_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( + DockerLabelKey, f"{_OSPARC_SERVICE_READY_LABEL_KEY}-last-changed" +) +_OSPARC_SERVICE_READY_LABEL_KEYS: Final[list[DockerLabelKey]] = [ + _OSPARC_SERVICE_READY_LABEL_KEY, + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, +] async def get_monitored_nodes( docker_client: AutoscalingDocker, node_labels: list[DockerLabelKey] ) -> list[Node]: + node_label_filters = [f"{label}=true" for label in node_labels] + [ + f"{label}" for label in _OSPARC_SERVICE_READY_LABEL_KEYS + ] return parse_obj_as( list[Node], - await docker_client.nodes.list( - filters={"node.label": [f"{label}=true" for label in node_labels]} - ), + await docker_client.nodes.list(filters={"node.label": node_label_filters}), ) async def get_worker_nodes(docker_client: AutoscalingDocker) -> list[Node]: return parse_obj_as( list[Node], - await docker_client.nodes.list(filters={"role": ["worker"]}), + await docker_client.nodes.list( + filters={ + "role": ["worker"], + "node.label": [ + f"{label}" for label in _OSPARC_SERVICE_READY_LABEL_KEYS + ], + } + ), ) @@ -550,14 +568,6 @@ def is_node_ready_and_available(node: Node, *, availability: Availability) -> bo ) -_OSPARC_SERVICE_READY_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( - DockerLabelKey, "io.simcore.osparc-services-ready" -) -_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( - DockerLabelKey, f"{_OSPARC_SERVICE_READY_LABEL_KEY}-last-changed" -) - - def is_node_osparc_ready(node: Node) -> bool: if not is_node_ready_and_available(node, availability=Availability.active): return False diff --git a/services/autoscaling/tests/unit/test_utils_docker.py b/services/autoscaling/tests/unit/test_utils_docker.py index d088388af31..e21a720cfca 100644 --- a/services/autoscaling/tests/unit/test_utils_docker.py +++ b/services/autoscaling/tests/unit/test_utils_docker.py @@ -37,6 +37,7 @@ from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_SERVICE_READY_LABEL_KEY, + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, Node, _by_created_dt, attach_node, @@ -133,7 +134,13 @@ async def test_get_monitored_nodes_with_valid_label( create_node_labels: Callable[[list[str]], Awaitable[None]], ): labels = faker.pylist(allowed_types=(str,)) - await create_node_labels(labels) + await create_node_labels( + [ + *labels, + _OSPARC_SERVICE_READY_LABEL_KEY, + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, + ] + ) monitored_nodes = await get_monitored_nodes(autoscaling_docker, node_labels=labels) assert len(monitored_nodes) == 1 diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py index 4275e8efa1e..1870f3fe969 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py @@ -1,6 +1,6 @@ import datetime from functools import cached_property -from typing import Any, ClassVar, Final, cast +from typing import Any, ClassVar, Final, Literal, cast from aws_library.ec2.models import EC2InstanceBootSpecific, EC2Tags from fastapi import FastAPI @@ -11,7 +11,14 @@ VersionTag, ) from models_library.clusters import InternalClusterAuthentication -from pydantic import Field, NonNegativeInt, PositiveInt, parse_obj_as, validator +from pydantic import ( + Field, + NonNegativeFloat, + NonNegativeInt, + PositiveInt, + parse_obj_as, + validator, +) from settings_library.base import BaseCustomSettings from settings_library.docker_registry import RegistrySettings from settings_library.ec2 import EC2Settings @@ -270,7 +277,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): description="defines the image tag to use for the computational backend sidecar image (NOTE: it currently defaults to use itisfoundation organisation in Dockerhub)", ) - CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: InternalClusterAuthentication = Field( + CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: ( + InternalClusterAuthentication + ) = Field( ..., description="defines the authentication of the clusters created via clusters-keeper (can be None or TLS)", ) @@ -280,6 +289,12 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): description="overrides the default number of threads in the dask-sidecars, setting it to 0 will use the default (see description in dask-sidecar)", ) + CLUSTERS_KEEPER_DASK_WORKER_SATURATION: NonNegativeFloat | Literal["inf"] = Field( + default="inf", + description="override the dask scheduler 'worker-saturation' field" + ", see https://selectfrom.dev/deep-dive-into-dask-distributed-scheduler-9fdb3b36b7c7", + ) + SWARM_STACK_NAME: str = Field( ..., description="Stack name defined upon deploy (see main Makefile)" ) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml index c2d2f866bb4..1bb0a81e26f 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml @@ -11,6 +11,7 @@ services: DASK_TLS_KEY: ${DASK_TLS_KEY} DASK_SCHEDULER_URL: tls://dask-scheduler:8786 DASK_START_AS_SCHEDULER: 1 + DASK_WORKER_SATURATION: ${DASK_WORKER_SATURATION} LOG_LEVEL: ${LOG_LEVEL} ports: - 8786:8786 # dask-scheduler access @@ -49,6 +50,7 @@ services: DASK_TLS_CA_FILE: ${DASK_TLS_CA_FILE} DASK_TLS_CERT: ${DASK_TLS_CERT} DASK_TLS_KEY: ${DASK_TLS_KEY} + DASK_WORKER_SATURATION: ${DASK_WORKER_SATURATION} LOG_LEVEL: ${LOG_LEVEL} SIDECAR_COMP_SERVICES_SHARED_FOLDER: /home/scu/computational_shared_data SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME: computational_shared_data diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py index 50d0c781df0..0a77d4bee00 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -67,6 +67,7 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str: f"DASK_TLS_CA_FILE={_HOST_TLS_CA_FILE_PATH}", f"DASK_TLS_CERT={_HOST_TLS_CERT_FILE_PATH}", f"DASK_TLS_KEY={_HOST_TLS_KEY_FILE_PATH}", + f"DASK_WORKER_SATURATION={app_settings.CLUSTERS_KEEPER_DASK_WORKER_SATURATION}", f"DOCKER_IMAGE_TAG={app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG}", f"EC2_INSTANCES_NAME_PREFIX={cluster_machines_name_prefix}", f"LOG_LEVEL={app_settings.LOG_LEVEL}", diff --git a/services/clusters-keeper/tests/unit/conftest.py b/services/clusters-keeper/tests/unit/conftest.py index 2cd3249f2b1..59e63d0db09 100644 --- a/services/clusters-keeper/tests/unit/conftest.py +++ b/services/clusters-keeper/tests/unit/conftest.py @@ -102,6 +102,7 @@ def app_environment( "CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES": "{}", "CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX": faker.pystr(), "CLUSTERS_KEEPER_DASK_NTHREADS": f"{faker.pyint(min_value=0)}", + "CLUSTERS_KEEPER_DASK_WORKER_SATURATION": f"{faker.pyfloat(min_value=0.1)}", "CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH": "{}", "PRIMARY_EC2_INSTANCES_KEY_NAME": faker.pystr(), "PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps( diff --git a/services/dask-sidecar/docker/boot.sh b/services/dask-sidecar/docker/boot.sh index 15dccc1f8e3..213377cbf0e 100755 --- a/services/dask-sidecar/docker/boot.sh +++ b/services/dask-sidecar/docker/boot.sh @@ -43,11 +43,19 @@ logging: distributed.scheduler: ${LOG_LEVEL:-warning} EOF -# Check if DASK_TLS_CA_FILE is present +# Define the base configuration for distributed +# the worker-saturation defines how the scheduler loads +# the workers, see https://github.com/dask/distributed/blob/91350ab15c79de973597e319bd36cc8d56e9f999/distributed/scheduler.py +cat >/home/scu/.config/dask/distributed.yaml <>/home/scu/.config/dask/distributed.yaml < None: """This is a special function recognized by the dask worker when starting with flag --preload""" _logger.info("Setting up scheduler...") assert scheduler # nosec + print(f"dask config: {dask.config.config}", flush=True) # noqa: T201 print_dask_scheduler_banner() diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 1908fe9aa46..c504288cd7c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -258,6 +258,7 @@ def _comp_sidecar_fct( key=job_id, resources=dask_resources, retries=0, + pure=False, ) # NOTE: the callback is running in a secondary thread, and takes a future as arg task_future.add_done_callback(lambda _: callback()) diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 2282e3163c0..a9f2b1eab02 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -156,6 +156,7 @@ services: - CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=${CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG} - CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH=${CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH:-{"type":"tls","tls_ca_file":"${DASK_TLS_CERT}","tls_client_cert":"${DASK_TLS_CERT}","tls_client_key":"${DASK_TLS_KEY}"}} - CLUSTERS_KEEPER_DASK_NTHREADS=${CLUSTERS_KEEPER_DASK_NTHREADS} + - CLUSTERS_KEEPER_DASK_WORKER_SATURATION=${CLUSTERS_KEEPER_DASK_WORKER_SATURATION} - CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=${CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION} - CLUSTERS_KEEPER_TASK_INTERVAL=${CLUSTERS_KEEPER_TASK_INTERVAL} - CLUSTERS_KEEPER_LOGLEVEL=${CLUSTERS_KEEPER_LOGLEVEL:-${LOG_LEVEL:-WARNING}}