From 0e2a8b9e7568d213f7f8fef6724b54f9cd3eb2f2 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Wed, 8 Nov 2023 09:47:25 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8Computational=20autoscaling:=20find=20?= =?UTF-8?q?out=20which=20EC2=20type=20is=20necessary=20(#4975)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/dask_task_models_library/constants.py | 3 + .../src/models_library/docker.py | 5 + services/autoscaling/requirements/_tools.txt | 4 +- services/autoscaling/requirements/ci.txt | 1 + services/autoscaling/requirements/dev.txt | 1 + services/autoscaling/requirements/prod.txt | 1 + services/autoscaling/setup.cfg | 4 + .../core/errors.py | 4 + .../core/settings.py | 2 +- .../src/simcore_service_autoscaling/models.py | 3 + .../modules/auto_scaling_core.py | 178 ++++++----- .../modules/auto_scaling_mode_base.py | 19 +- .../auto_scaling_mode_computational.py | 40 ++- .../modules/auto_scaling_mode_dynamic.py | 39 ++- .../modules/ec2.py | 2 +- .../utils/auto_scaling_core.py | 96 +++++- .../utils/computational_scaling.py | 20 +- .../utils/dynamic_scaling.py | 14 +- .../utils/utils_docker.py | 74 ++++- .../utils/{ec2.py => utils_ec2.py} | 0 services/autoscaling/tests/unit/conftest.py | 53 ++-- ...test_modules_auto_scaling_computational.py | 231 +++++++++++++- .../unit/test_modules_auto_scaling_dynamic.py | 289 ++++++++---------- .../tests/unit/test_modules_ec2.py | 4 +- .../unit/test_utils_computational_scaling.py | 10 +- .../autoscaling/tests/unit/test_utils_ec2.py | 2 +- .../clusters-keeper/tests/unit/conftest.py | 2 +- .../tests/unit/test_modules_clusters.py | 2 +- .../test_modules_clusters_management_core.py | 2 +- .../tests/unit/test_modules_ec2.py | 4 +- .../tests/unit/test_rpc_clusters.py | 2 +- .../tests/unit/test_rpc_ec2_instances.py | 2 +- .../modules/dask_client.py | 3 +- 33 files changed, 779 insertions(+), 337 deletions(-) create mode 100644 packages/dask-task-models-library/src/dask_task_models_library/constants.py rename services/autoscaling/src/simcore_service_autoscaling/utils/{ec2.py => utils_ec2.py} (100%) diff --git a/packages/dask-task-models-library/src/dask_task_models_library/constants.py b/packages/dask-task-models-library/src/dask_task_models_library/constants.py new file mode 100644 index 00000000000..4c9db9d83c2 --- /dev/null +++ b/packages/dask-task-models-library/src/dask_task_models_library/constants.py @@ -0,0 +1,3 @@ +from typing import Final + +DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY: Final[str] = "EC2-INSTANCE-TYPE" diff --git a/packages/models-library/src/models_library/docker.py b/packages/models-library/src/models_library/docker.py index 44446a07e1a..e120f715b5a 100644 --- a/packages/models-library/src/models_library/docker.py +++ b/packages/models-library/src/models_library/docker.py @@ -51,6 +51,11 @@ class DockerGenericTag(ConstrainedStr): _UNDEFINED_LABEL_VALUE_INT: Final[str] = "0" +DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: Final[ + DockerLabelKey +] = parse_obj_as(DockerLabelKey, "ec2-instance-type") + + def to_simcore_runtime_docker_label_key(key: str) -> DockerLabelKey: return DockerLabelKey( f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}{key.replace('_', '-').lower()}" diff --git a/services/autoscaling/requirements/_tools.txt b/services/autoscaling/requirements/_tools.txt index da673edf4b3..4a11ab2e63d 100644 --- a/services/autoscaling/requirements/_tools.txt +++ b/services/autoscaling/requirements/_tools.txt @@ -76,9 +76,9 @@ tomli==2.0.1 # pip-tools # pylint # pyproject-hooks -tomlkit==0.12.1 +tomlkit==0.12.2 # via pylint -typing-extensions==4.5.0 +typing-extensions==4.8.0 # via # -c requirements/_base.txt # -c requirements/_test.txt diff --git a/services/autoscaling/requirements/ci.txt b/services/autoscaling/requirements/ci.txt index 7d9d3f2fe6e..d2b1c3498e4 100644 --- a/services/autoscaling/requirements/ci.txt +++ b/services/autoscaling/requirements/ci.txt @@ -15,6 +15,7 @@ ../../packages/pytest-simcore ../../packages/service-library[fastapi] ../../packages/settings-library +../../packages/dask-task-models-library # installs current package . diff --git a/services/autoscaling/requirements/dev.txt b/services/autoscaling/requirements/dev.txt index cf5e4d8a9fa..8cbfdbe8e3f 100644 --- a/services/autoscaling/requirements/dev.txt +++ b/services/autoscaling/requirements/dev.txt @@ -16,6 +16,7 @@ --editable ../../packages/pytest-simcore --editable ../../packages/service-library[fastapi] --editable ../../packages/settings-library +--editable ../../packages/dask-task-models-library # installs current package --editable . diff --git a/services/autoscaling/requirements/prod.txt b/services/autoscaling/requirements/prod.txt index 31bf2dd55ec..7d635391e63 100644 --- a/services/autoscaling/requirements/prod.txt +++ b/services/autoscaling/requirements/prod.txt @@ -13,5 +13,6 @@ ../../packages/models-library ../../packages/service-library[fastapi] ../../packages/settings-library +../../packages/dask-task-models-library # installs current package . diff --git a/services/autoscaling/setup.cfg b/services/autoscaling/setup.cfg index 48d4cf22b1c..d50d8fe2677 100644 --- a/services/autoscaling/setup.cfg +++ b/services/autoscaling/setup.cfg @@ -9,3 +9,7 @@ commit_args = --no-verify [tool:pytest] asyncio_mode = auto +markers = + slow: marks tests as slow (deselect with '-m "not slow"') + acceptance_test: "marks tests as 'acceptance tests' i.e. does the system do what the user expects? Typically those are workflows." + testit: "marks test to run during development" diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py index 522cd6638f8..92006fa12f7 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py @@ -17,6 +17,10 @@ class Ec2InstanceNotFoundError(AutoscalingRuntimeError): msg_template: str = "EC2 instance was not found" +class Ec2InstanceInvalidError(AutoscalingRuntimeError): + msg_template: str = "Invalid EC2 defined: {msg}" + + class Ec2TooManyInstancesError(AutoscalingRuntimeError): msg_template: str = ( "The maximum amount of instances {num_instances} is already reached!" diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index dc669e98907..057d59aa912 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -105,7 +105,7 @@ class EC2InstancesSettings(BaseCustomSettings): ) EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field( default=datetime.timedelta(minutes=1), - description="Time after which an EC2 instance may be terminated (repeat every hour, min 0, max 59 minutes)" + description="Time after which an EC2 instance may be terminated (0<=T<=59 minutes, is automatically capped)" "(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)", ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index f14c30026f1..a231fdc5446 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -18,6 +18,9 @@ def create_as_empty(cls) -> "Resources": def __ge__(self, other: "Resources") -> bool: return self.cpus >= other.cpus and self.ram >= other.ram + def __gt__(self, other: "Resources") -> bool: + return self.cpus > other.cpus or self.ram > other.ram + def __add__(self, other: "Resources") -> "Resources": return Resources.construct( **{ diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 4657a218838..81b177a42df 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -6,6 +6,7 @@ import logging from typing import cast +import arrow from fastapi import FastAPI from models_library.generated_models.docker_rest_api import ( Availability, @@ -17,6 +18,8 @@ from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.errors import ( + DaskWorkerNotFoundError, + Ec2InstanceInvalidError, Ec2InstanceNotFoundError, Ec2InvalidDnsNameError, Ec2TooManyInstancesError, @@ -29,10 +32,12 @@ EC2InstanceType, Resources, ) -from ..utils import ec2, utils_docker +from ..utils import utils_docker, utils_ec2 from ..utils.auto_scaling_core import ( associate_ec2_instances_with_nodes, ec2_startup_script, + filter_by_task_defined_instance, + find_selected_instance_type_for_task, node_host_name_from_ec2_private_dns, ) from ..utils.rabbitmq import post_autoscaling_status_message @@ -106,7 +111,10 @@ def _node_not_ready(node: Node) -> bool: async def _cleanup_disconnected_nodes(app: FastAPI, cluster: Cluster) -> Cluster: - await utils_docker.remove_nodes(get_docker_client(app), cluster.disconnected_nodes) + if cluster.disconnected_nodes: + await utils_docker.remove_nodes( + get_docker_client(app), cluster.disconnected_nodes + ) return dataclasses.replace(cluster, disconnected_nodes=[]) @@ -128,13 +136,13 @@ async def _try_attach_pending_ec2s( new_node = await utils_docker.tag_node( get_docker_client(app), new_node, - tags=auto_scaling_mode.get_new_node_docker_tags(app), + tags=auto_scaling_mode.get_new_node_docker_tags(app, instance_data), available=False, ) new_found_instances.append(AssociatedInstance(new_node, instance_data)) else: still_pending_ec2s.append(instance_data) - except Ec2InvalidDnsNameError: + except Ec2InvalidDnsNameError: # noqa: PERF203 _logger.exception("Unexpected EC2 private dns") # NOTE: first provision the reserve drained nodes if possible all_drained_nodes = ( @@ -177,6 +185,27 @@ def _sort_according_to_allowed_types(instance_type: EC2InstanceType) -> int: return allowed_instance_types +async def _activate_and_notify( + app: FastAPI, + auto_scaling_mode: BaseAutoscaling, + drained_node: AssociatedInstance, + tasks: list, +) -> list: + await asyncio.gather( + utils_docker.set_node_availability( + get_docker_client(app), drained_node.node, available=True + ), + auto_scaling_mode.log_message_from_tasks( + app, + tasks, + "cluster adjusted, service should start shortly...", + level=logging.INFO, + ), + auto_scaling_mode.progress_message_from_tasks(app, tasks, progress=1.0), + ) + return tasks + + async def _activate_drained_nodes( app: FastAPI, cluster: Cluster, @@ -210,28 +239,12 @@ async def _activate_drained_nodes( if assigned_tasks ] - async def _activate_and_notify( - drained_node: AssociatedInstance, tasks: list - ) -> list: - await asyncio.gather( - *( - utils_docker.set_node_availability( - get_docker_client(app), drained_node.node, available=True - ), - auto_scaling_mode.log_message_from_tasks( - app, - tasks, - "cluster adjusted, service should start shortly...", - level=logging.INFO, - ), - auto_scaling_mode.progress_message_from_tasks(app, tasks, progress=1.0), - ) - ) - return tasks - # activate these nodes now await asyncio.gather( - *(_activate_and_notify(node, tasks) for node, tasks in nodes_to_activate) + *( + _activate_and_notify(app, auto_scaling_mode, node, tasks) + for node, tasks in nodes_to_activate + ) ) new_active_nodes = [node for node, _ in nodes_to_activate] new_active_node_ids = {node.ec2_instance.id for node in new_active_nodes} @@ -263,50 +276,74 @@ async def _find_needed_instances( type_to_instance_map = {t.name: t for t in available_ec2_types} # 1. check first the pending task needs - active_instance_to_tasks: list[tuple[EC2InstanceData, list]] = [ + active_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [ (i.ec2_instance, []) for i in cluster.active_nodes ] - pending_instance_to_tasks: list[tuple[EC2InstanceData, list]] = [ + pending_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [ (i, []) for i in cluster.pending_ec2s ] needed_new_instance_types_for_tasks: list[tuple[EC2InstanceType, list]] = [] for task in pending_tasks: - if await auto_scaling_mode.try_assigning_task_to_pending_instances( - app, - task, - active_instance_to_tasks, - type_to_instance_map, - notify_progress=False, - ): - continue - if await auto_scaling_mode.try_assigning_task_to_pending_instances( - app, - task, - pending_instance_to_tasks, - type_to_instance_map, - notify_progress=True, - ): - continue + task_defined_ec2_type = await auto_scaling_mode.get_task_defined_instance( + app, task + ) + ( + filtered_active_instance_to_task, + filtered_pending_instance_to_task, + filtered_needed_new_instance_types_to_task, + ) = filter_by_task_defined_instance( + task_defined_ec2_type, + active_instances_to_tasks, + pending_instances_to_tasks, + needed_new_instance_types_for_tasks, + ) - if auto_scaling_mode.try_assigning_task_to_instance_types( - task, needed_new_instance_types_for_tasks + # try to assign the task to one of the active, pending or net created instances + if ( + await auto_scaling_mode.try_assigning_task_to_instances( + app, + task, + filtered_active_instance_to_task, + type_to_instance_map, + notify_progress=False, + ) + or await auto_scaling_mode.try_assigning_task_to_instances( + app, + task, + filtered_pending_instance_to_task, + type_to_instance_map, + notify_progress=True, + ) + or auto_scaling_mode.try_assigning_task_to_instance_types( + task, filtered_needed_new_instance_types_to_task + ) ): continue + # so we need to find what we can create now try: - # we need a new instance, let's find one - best_ec2_instance = ec2.find_best_fitting_ec2_instance( - available_ec2_types, - auto_scaling_mode.get_max_resources_from_task(task), - score_type=ec2.closest_instance_policy, - ) - needed_new_instance_types_for_tasks.append((best_ec2_instance, [task])) + # check if exact instance type is needed first + if task_defined_ec2_type: + defined_ec2 = find_selected_instance_type_for_task( + task_defined_ec2_type, available_ec2_types, auto_scaling_mode, task + ) + needed_new_instance_types_for_tasks.append((defined_ec2, [task])) + else: + # we go for best fitting type + best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance( + available_ec2_types, + auto_scaling_mode.get_max_resources_from_task(task), + score_type=utils_ec2.closest_instance_policy, + ) + needed_new_instance_types_for_tasks.append((best_ec2_instance, [task])) except Ec2InstanceNotFoundError: _logger.exception( "Task %s needs more resources than any EC2 instance " - "can provide with the current configuration. Please check.", + "can provide with the current configuration. Please check!", f"{task}", ) + except Ec2InstanceInvalidError: + _logger.exception("Unexpected error:") num_instances_per_type = collections.defaultdict( int, collections.Counter(t for t, _ in needed_new_instance_types_for_tasks) @@ -324,7 +361,7 @@ async def _find_needed_instances( # check if some are already pending remaining_pending_instances = [ instance - for instance, assigned_tasks in pending_instance_to_tasks + for instance, assigned_tasks in pending_instances_to_tasks if not assigned_tasks ] if len(remaining_pending_instances) < ( @@ -438,16 +475,19 @@ async def _deactivate_empty_nodes( active_empty_nodes: list[AssociatedInstance] = [] active_non_empty_nodes: list[AssociatedInstance] = [] for instance in cluster.active_nodes: - if ( - await auto_scaling_mode.compute_node_used_resources( + try: + node_used_resources = await auto_scaling_mode.compute_node_used_resources( app, instance, ) - == Resources.create_as_empty() - ): - active_empty_nodes.append(instance) - else: - active_non_empty_nodes.append(instance) + if node_used_resources == Resources.create_as_empty(): + active_empty_nodes.append(instance) + else: + active_non_empty_nodes.append(instance) + except DaskWorkerNotFoundError: # noqa: PERF203 + _logger.exception( + "EC2 node instance is not registered to dask-scheduler! TIP: Needs investigation" + ) # drain this empty nodes await asyncio.gather( @@ -462,7 +502,7 @@ async def _deactivate_empty_nodes( ) if active_empty_nodes: _logger.info( - "The following nodes set to drain: '%s'", + "following nodes set to drain: '%s'", f"{[node.node.Description.Hostname for node in active_empty_nodes if node.node.Description]}", ) return dataclasses.replace( @@ -486,17 +526,14 @@ async def _find_terminateable_instances( terminateable_nodes: list[AssociatedInstance] = [] for instance in cluster.drained_nodes: - # NOTE: AWS price is hourly based (e.g. same price for a machine used 2 minutes or 1 hour, so we wait until 55 minutes) - elapsed_time_since_launched = ( - datetime.datetime.now(datetime.timezone.utc) - - instance.ec2_instance.launch_time - ) - elapsed_time_since_full_hour = elapsed_time_since_launched % datetime.timedelta( - hours=1 + assert instance.node.UpdatedAt # nosec + node_last_updated = arrow.get(instance.node.UpdatedAt).datetime + elapsed_time_since_drained = ( + datetime.datetime.now(datetime.timezone.utc) - node_last_updated ) if ( - elapsed_time_since_full_hour - >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION + elapsed_time_since_drained + > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION ): # let's terminate that one terminateable_nodes.append(instance) @@ -522,6 +559,7 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster: f"{[i.node.Description.Hostname for i in terminateable_instances if i.node.Description]}", ) # since these nodes are being terminated, remove them from the swarm + await utils_docker.remove_nodes( get_docker_client(app), [i.node for i in terminateable_instances], diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py index 7faf910e010..2adf6268e8c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py @@ -1,10 +1,12 @@ from abc import ABC, abstractmethod +from collections.abc import Iterable from dataclasses import dataclass from fastapi import FastAPI from models_library.docker import DockerLabelKey from models_library.generated_models.docker_rest_api import Node as DockerNode from servicelib.logging_utils import LogLevelInt +from types_aiobotocore_ec2.literals import InstanceTypeType from ..models import AssociatedInstance, EC2InstanceData, EC2InstanceType, Resources @@ -23,7 +25,9 @@ def get_ec2_tags(app: FastAPI) -> dict[str, str]: @staticmethod @abstractmethod - def get_new_node_docker_tags(app: FastAPI) -> dict[DockerLabelKey, str]: + def get_new_node_docker_tags( + app: FastAPI, ec2_instance_data: EC2InstanceData + ) -> dict[DockerLabelKey, str]: ... @staticmethod @@ -34,16 +38,16 @@ async def list_unrunnable_tasks(app: FastAPI) -> list: @staticmethod @abstractmethod def try_assigning_task_to_node( - task, instance_to_tasks: list[tuple[AssociatedInstance, list]] + task, instances_to_tasks: list[tuple[AssociatedInstance, list]] ) -> bool: ... @staticmethod @abstractmethod - async def try_assigning_task_to_pending_instances( + async def try_assigning_task_to_instances( app: FastAPI, pending_task, - list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool @@ -54,7 +58,7 @@ async def try_assigning_task_to_pending_instances( @abstractmethod def try_assigning_task_to_instance_types( pending_task, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]], ) -> bool: ... @@ -77,6 +81,11 @@ async def progress_message_from_tasks( def get_max_resources_from_task(task) -> Resources: ... + @staticmethod + @abstractmethod + async def get_task_defined_instance(app: FastAPI, task) -> InstanceTypeType | None: + ... + @staticmethod @abstractmethod async def compute_node_used_resources( diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 68c17ff5fff..159d9ccd9ab 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -1,12 +1,17 @@ import collections import logging +from collections.abc import Iterable from fastapi import FastAPI -from models_library.docker import DockerLabelKey +from models_library.docker import ( + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, + DockerLabelKey, +) from models_library.generated_models.docker_rest_api import Node from pydantic import AnyUrl, ByteSize from servicelib.logging_utils import LogLevelInt from servicelib.utils import logged_gather +from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import get_application_settings from ..models import ( @@ -17,7 +22,7 @@ Resources, ) from ..utils import computational_scaling as utils -from ..utils import ec2, utils_docker +from ..utils import utils_docker, utils_ec2 from . import dask from .auto_scaling_mode_base import BaseAutoscaling from .docker import get_docker_client @@ -39,12 +44,16 @@ async def get_monitored_nodes(app: FastAPI) -> list[Node]: @staticmethod def get_ec2_tags(app: FastAPI) -> dict[str, str]: app_settings = get_application_settings(app) - return ec2.get_ec2_tags_computational(app_settings) + return utils_ec2.get_ec2_tags_computational(app_settings) @staticmethod - def get_new_node_docker_tags(app: FastAPI) -> dict[DockerLabelKey, str]: + def get_new_node_docker_tags( + app: FastAPI, ec2_instance_data: EC2InstanceData + ) -> dict[DockerLabelKey, str]: assert app # nosec - return {} + return { + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: ec2_instance_data.type + } @staticmethod async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: @@ -53,23 +62,23 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: @staticmethod def try_assigning_task_to_node( task: DaskTask, - instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]], + instances_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]], ) -> bool: - return utils.try_assigning_task_to_node(task, instance_to_tasks) + return utils.try_assigning_task_to_node(task, instances_to_tasks) @staticmethod - async def try_assigning_task_to_pending_instances( + async def try_assigning_task_to_instances( app: FastAPI, pending_task, - list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool ) -> bool: - return await utils.try_assigning_task_to_pending_instances( + return await utils.try_assigning_task_to_instances( app, pending_task, - list_of_pending_instance_to_tasks, + instances_to_tasks, type_to_instance_map, notify_progress=notify_progress, ) @@ -77,10 +86,10 @@ async def try_assigning_task_to_pending_instances( @staticmethod def try_assigning_task_to_instance_types( pending_task, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]], ) -> bool: return utils.try_assigning_task_to_instance_types( - pending_task, list_of_instance_to_tasks + pending_task, instance_types_to_tasks ) @staticmethod @@ -101,6 +110,11 @@ async def progress_message_from_tasks(app: FastAPI, tasks: list, progress: float def get_max_resources_from_task(task) -> Resources: return utils.get_max_resources_from_dask_task(task) + @staticmethod + async def get_task_defined_instance(app: FastAPI, task) -> InstanceTypeType | None: + assert app # nosec + return utils.get_task_instance_restriction(task) + @staticmethod async def compute_node_used_resources( app: FastAPI, instance: AssociatedInstance diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py index b959b4364f4..6a2d5814c85 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py @@ -1,12 +1,15 @@ +from collections.abc import Iterable + from fastapi import FastAPI from models_library.docker import DockerLabelKey from models_library.generated_models.docker_rest_api import Node, Task from servicelib.logging_utils import LogLevelInt +from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import get_application_settings from ..models import AssociatedInstance, EC2InstanceData, EC2InstanceType, Resources from ..utils import dynamic_scaling as utils -from ..utils import ec2, utils_docker +from ..utils import utils_docker, utils_ec2 from ..utils.rabbitmq import log_tasks_message, progress_tasks_message from .auto_scaling_mode_base import BaseAutoscaling from .docker import get_docker_client @@ -25,12 +28,14 @@ async def get_monitored_nodes(app: FastAPI) -> list[Node]: @staticmethod def get_ec2_tags(app: FastAPI) -> dict[str, str]: app_settings = get_application_settings(app) - return ec2.get_ec2_tags_dynamic(app_settings) + return utils_ec2.get_ec2_tags_dynamic(app_settings) @staticmethod - def get_new_node_docker_tags(app: FastAPI) -> dict[DockerLabelKey, str]: + def get_new_node_docker_tags( + app: FastAPI, ec2_instance_data: EC2InstanceData + ) -> dict[DockerLabelKey, str]: app_settings = get_application_settings(app) - return utils_docker.get_docker_tags(app_settings) + return utils_docker.get__new_node_docker_tags(app_settings, ec2_instance_data) @staticmethod async def list_unrunnable_tasks(app: FastAPI) -> list[Task]: @@ -43,15 +48,15 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[Task]: @staticmethod def try_assigning_task_to_node( - task, instance_to_tasks: list[tuple[AssociatedInstance, list]] + task, instances_to_tasks: Iterable[tuple[AssociatedInstance, list]] ) -> bool: - return utils.try_assigning_task_to_node(task, instance_to_tasks) + return utils.try_assigning_task_to_node(task, instances_to_tasks) @staticmethod - async def try_assigning_task_to_pending_instances( + async def try_assigning_task_to_instances( app: FastAPI, pending_task, - list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool @@ -59,7 +64,7 @@ async def try_assigning_task_to_pending_instances( return await utils.try_assigning_task_to_pending_instances( app, pending_task, - list_of_pending_instance_to_tasks, + instances_to_tasks, type_to_instance_map, notify_progress=notify_progress, ) @@ -67,10 +72,10 @@ async def try_assigning_task_to_pending_instances( @staticmethod def try_assigning_task_to_instance_types( pending_task, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]], ) -> bool: return utils.try_assigning_task_to_instances( - pending_task, list_of_instance_to_tasks + pending_task, instance_types_to_tasks ) @staticmethod @@ -80,13 +85,21 @@ async def log_message_from_tasks( await log_tasks_message(app, tasks, message, level=level) @staticmethod - async def progress_message_from_tasks(app: FastAPI, tasks: list, progress: float): - await progress_tasks_message(app, tasks, progress=1.0) + async def progress_message_from_tasks( + app: FastAPI, tasks: list, progress: float + ) -> None: + await progress_tasks_message(app, tasks, progress=progress) @staticmethod def get_max_resources_from_task(task) -> Resources: return utils_docker.get_max_resources_from_docker_task(task) + @staticmethod + async def get_task_defined_instance(app: FastAPI, task) -> InstanceTypeType | None: + return await utils_docker.get_task_instance_restriction( + get_docker_client(app), task + ) + @staticmethod async def compute_node_used_resources( app: FastAPI, instance: AssociatedInstance diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py index d2ddd94b90f..f55948e9d81 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py @@ -25,7 +25,7 @@ ) from ..core.settings import EC2InstancesSettings, EC2Settings from ..models import EC2InstanceData, EC2InstanceType -from ..utils.ec2 import compose_user_data +from ..utils.utils_ec2 import compose_user_data logger = logging.getLogger(__name__) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py index a337f34041e..904501e7d8b 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py @@ -1,12 +1,15 @@ +import functools import logging import re from typing import Final from models_library.generated_models.docker_rest_api import Node +from types_aiobotocore_ec2.literals import InstanceTypeType -from ..core.errors import Ec2InvalidDnsNameError +from ..core.errors import Ec2InstanceInvalidError, Ec2InvalidDnsNameError from ..core.settings import ApplicationSettings -from ..models import AssociatedInstance, EC2InstanceData +from ..models import AssociatedInstance, EC2InstanceData, EC2InstanceType, Resources +from ..modules.auto_scaling_mode_base import BaseAutoscaling from . import utils_docker _EC2_INTERNAL_DNS_RE: Final[re.Pattern] = re.compile(r"^(?Pip-[^.]+).*$") @@ -92,3 +95,92 @@ async def ec2_startup_script(app_settings: ApplicationSettings) -> str: ) return " && ".join(startup_commands) + + +def _instance_type_by_type_name( + ec2_type: EC2InstanceType, *, type_name: InstanceTypeType | None +) -> bool: + if type_name is None: + return True + return bool(ec2_type.name == type_name) + + +def _instance_type_map_by_type_name( + mapping: tuple[EC2InstanceType, list], *, type_name: InstanceTypeType | None +) -> bool: + ec2_type, _ = mapping + return _instance_type_by_type_name(ec2_type, type_name=type_name) + + +def _instance_data_map_by_type_name( + mapping: tuple[EC2InstanceData, list], *, type_name: InstanceTypeType | None +) -> bool: + if type_name is None: + return True + ec2_data, _ = mapping + return bool(ec2_data.type == type_name) + + +def filter_by_task_defined_instance( + instance_type_name: InstanceTypeType | None, + active_instances_to_tasks, + pending_instances_to_tasks, + needed_new_instance_types_for_tasks, +) -> tuple: + return ( + filter( + functools.partial( + _instance_data_map_by_type_name, type_name=instance_type_name + ), + active_instances_to_tasks, + ), + filter( + functools.partial( + _instance_data_map_by_type_name, type_name=instance_type_name + ), + pending_instances_to_tasks, + ), + filter( + functools.partial( + _instance_type_map_by_type_name, type_name=instance_type_name + ), + needed_new_instance_types_for_tasks, + ), + ) + + +def find_selected_instance_type_for_task( + instance_type_name: InstanceTypeType, + available_ec2_types: list[EC2InstanceType], + auto_scaling_mode: BaseAutoscaling, + task, +) -> EC2InstanceType: + filtered_instances = list( + filter( + functools.partial( + _instance_type_by_type_name, type_name=instance_type_name + ), + available_ec2_types, + ) + ) + if not filtered_instances: + msg = ( + f"Task {task} requires an unauthorized EC2 instance type." + f"Asked for {instance_type_name}, authorized are {available_ec2_types}. Please check!" + ) + raise Ec2InstanceInvalidError(msg=msg) + + assert len(filtered_instances) == 1 # nosec + selected_instance = filtered_instances[0] + + # check that the assigned resources and the machine resource fit + if auto_scaling_mode.get_max_resources_from_task(task) > Resources( + cpus=selected_instance.cpus, ram=selected_instance.ram + ): + msg = ( + f"Task {task} requires more resources than the selected instance provides." + f" Asked for {selected_instance}, but task needs {auto_scaling_mode.get_max_resources_from_task(task)}. Please check!" + ) + raise Ec2InstanceInvalidError(msg=msg) + + return selected_instance diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py index 22c1f430371..cb9a164aeaf 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py @@ -1,9 +1,11 @@ import datetime import logging -from typing import Final +from typing import Final, Iterable +from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from fastapi import FastAPI from servicelib.utils_formatting import timedelta_as_minute_second +from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import get_application_settings from ..models import ( @@ -28,6 +30,10 @@ def get_max_resources_from_dask_task(task: DaskTask) -> Resources: ) +def get_task_instance_restriction(task: DaskTask) -> InstanceTypeType | None: + return task.required_resources.get(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY) + + def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources: total = Resources.create_as_empty() for t in tasks: @@ -37,7 +43,7 @@ def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources: def try_assigning_task_to_node( pending_task: DaskTask, - instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]], + instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]], ) -> bool: for instance, node_assigned_tasks in instance_to_tasks: instance_total_resource = utils_docker.get_node_total_resources(instance.node) @@ -50,10 +56,10 @@ def try_assigning_task_to_node( return False -async def try_assigning_task_to_pending_instances( +async def try_assigning_task_to_instances( app: FastAPI, pending_task: DaskTask, - list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list[DaskTask]]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list[DaskTask]]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool, @@ -63,7 +69,7 @@ async def try_assigning_task_to_pending_instances( instance_max_time_to_start = ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) - for instance, instance_assigned_tasks in list_of_pending_instance_to_tasks: + for instance, instance_assigned_tasks in instances_to_tasks: instance_type = type_to_instance_map[instance.type] instance_total_resources = Resources( cpus=instance_type.cpus, ram=instance_type.ram @@ -97,9 +103,9 @@ async def try_assigning_task_to_pending_instances( def try_assigning_task_to_instance_types( pending_task: DaskTask, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list[DaskTask]]], ) -> bool: - for instance, instance_assigned_tasks in list_of_instance_to_tasks: + for instance, instance_assigned_tasks in instance_types_to_tasks: instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram) tasks_needed_resources = _compute_tasks_needed_resources( instance_assigned_tasks diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py index 5c166e28323..549b59bb38c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py @@ -1,5 +1,6 @@ import datetime import logging +from collections.abc import Iterable from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Task @@ -14,9 +15,10 @@ def try_assigning_task_to_node( - pending_task: Task, instance_to_tasks: list[tuple[AssociatedInstance, list[Task]]] + pending_task: Task, + instances_to_tasks: Iterable[tuple[AssociatedInstance, list[Task]]], ) -> bool: - for instance, node_assigned_tasks in instance_to_tasks: + for instance, node_assigned_tasks in instances_to_tasks: instance_total_resource = utils_docker.get_node_total_resources(instance.node) tasks_needed_resources = utils_docker.compute_tasks_needed_resources( node_assigned_tasks @@ -31,9 +33,9 @@ def try_assigning_task_to_node( def try_assigning_task_to_instances( pending_task: Task, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list[Task]]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list[Task]]], ) -> bool: - for instance, instance_assigned_tasks in list_of_instance_to_tasks: + for instance, instance_assigned_tasks in instance_types_to_tasks: instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram) tasks_needed_resources = utils_docker.compute_tasks_needed_resources( instance_assigned_tasks @@ -49,7 +51,7 @@ def try_assigning_task_to_instances( async def try_assigning_task_to_pending_instances( app: FastAPI, pending_task: Task, - list_of_instances_to_tasks: list[tuple[EC2InstanceData, list[Task]]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list[Task]]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool, @@ -59,7 +61,7 @@ async def try_assigning_task_to_pending_instances( instance_max_time_to_start = ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) - for instance, instance_assigned_tasks in list_of_instances_to_tasks: + for instance, instance_assigned_tasks in instances_to_tasks: instance_type = type_to_instance_map[instance.type] instance_total_resources = Resources( cpus=instance_type.cpus, ram=instance_type.ram diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index 92c5fb5d116..0e5e88585c2 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -4,6 +4,7 @@ import asyncio import collections +import contextlib import datetime import logging import re @@ -12,7 +13,11 @@ from typing import Final, cast import yaml -from models_library.docker import DockerGenericTag, DockerLabelKey +from models_library.docker import ( + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, + DockerGenericTag, + DockerLabelKey, +) from models_library.generated_models.docker_rest_api import ( Node, NodeState, @@ -20,14 +25,15 @@ Task, TaskState, ) -from pydantic import ByteSize, parse_obj_as +from pydantic import ByteSize, ValidationError, parse_obj_as from servicelib.docker_utils import to_datetime from servicelib.logging_utils import log_context from servicelib.utils import logged_gather from settings_library.docker_registry import RegistrySettings +from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import ApplicationSettings -from ..models import Resources +from ..models import EC2InstanceData, Resources from ..modules.docker import AutoscalingDocker logger = logging.getLogger(__name__) @@ -49,6 +55,7 @@ _PENDING_DOCKER_TASK_MESSAGE: Final[str] = "pending task scheduling" _INSUFFICIENT_RESOURCES_DOCKER_TASK_ERR: Final[str] = "insufficient resources on" +_NOT_SATISFIED_SCHEDULING_CONSTRAINTS_TASK_ERR: Final[str] = "no suitable node" async def get_monitored_nodes( @@ -110,7 +117,10 @@ def _is_task_waiting_for_resources(task: Task) -> bool: return ( task.Status.State == TaskState.pending and task.Status.Message == _PENDING_DOCKER_TASK_MESSAGE - and _INSUFFICIENT_RESOURCES_DOCKER_TASK_ERR in task.Status.Err + and ( + _INSUFFICIENT_RESOURCES_DOCKER_TASK_ERR in task.Status.Err + or _NOT_SATISFIED_SCHEDULING_CONSTRAINTS_TASK_ERR in task.Status.Err + ) ) @@ -253,6 +263,40 @@ def get_max_resources_from_docker_task(task: Task) -> Resources: return Resources(cpus=0, ram=ByteSize(0)) +async def get_task_instance_restriction( + docker_client: AutoscalingDocker, task: Task +) -> InstanceTypeType | None: + with contextlib.suppress(ValidationError): + assert task.ServiceID # nosec + service_inspect = parse_obj_as( + Service, await docker_client.services.inspect(task.ServiceID) + ) + assert service_inspect.Spec # nosec + assert service_inspect.Spec.TaskTemplate # nosec + + if ( + not service_inspect.Spec.TaskTemplate.Placement + or not service_inspect.Spec.TaskTemplate.Placement.Constraints + ): + return None + # parse the placement contraints + service_placement_constraints = ( + service_inspect.Spec.TaskTemplate.Placement.Constraints + ) + # should be node.labels.{} + node_label_to_find = ( + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}==" + ) + for constraint in service_placement_constraints: + if constraint.startswith(node_label_to_find): + return parse_obj_as( + InstanceTypeType, constraint.removeprefix(node_label_to_find) + ) + + return None + return None + + def compute_tasks_needed_resources(tasks: list[Task]) -> Resources: total = Resources.create_as_empty() for t in tasks: @@ -471,12 +515,18 @@ async def set_node_availability( ) -def get_docker_tags(app_settings: ApplicationSettings) -> dict[DockerLabelKey, str]: +def get__new_node_docker_tags( + app_settings: ApplicationSettings, ec2_instance: EC2InstanceData +) -> dict[DockerLabelKey, str]: assert app_settings.AUTOSCALING_NODES_MONITORING # nosec - return { - tag_key: "true" - for tag_key in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS - } | { - tag_key: "true" - for tag_key in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS - } + return ( + { + tag_key: "true" + for tag_key in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS + } + | { + tag_key: "true" + for tag_key in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS + } + | {DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: ec2_instance.type} + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py similarity index 100% rename from services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py rename to services/autoscaling/src/simcore_service_autoscaling/utils/utils_ec2.py diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 02dbe18a6bd..7688ac3b21b 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -4,6 +4,7 @@ import asyncio import dataclasses +import datetime import json import random from collections.abc import AsyncIterator, Awaitable, Callable, Iterator @@ -239,8 +240,8 @@ def _creator(**node_overrides) -> Node: default_config = { "ID": faker.uuid4(), "Version": ObjectVersion(Index=faker.pyint()), - "CreatedAt": faker.date_time(tzinfo=timezone.utc).isoformat(), - "UpdatedAt": faker.date_time(tzinfo=timezone.utc).isoformat(), + "CreatedAt": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), + "UpdatedAt": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), "Description": NodeDescription( Hostname=faker.pystr(), Resources=ResourceObject( @@ -322,17 +323,24 @@ async def _creator( task_template: dict[str, Any], labels: dict[DockerLabelKey, str] | None = None, wait_for_service_state="running", + placement_constraints: list[str] | None = None, ) -> Service: service_name = f"pytest_{faker.pystr()}" - if labels: - task_labels = task_template.setdefault("ContainerSpec", {}).setdefault( - "Labels", {} + base_labels = {} + task_labels = task_template.setdefault("ContainerSpec", {}).setdefault( + "Labels", base_labels + ) + if placement_constraints: + task_template.setdefault("Placement", {}).setdefault( + "Constraints", placement_constraints ) + if labels: task_labels |= labels + base_labels |= labels service = await async_docker_client.services.create( task_template=task_template, name=service_name, - labels=labels or {}, # type: ignore + labels=base_labels, ) assert service service = parse_obj_as( @@ -340,7 +348,7 @@ async def _creator( ) assert service.Spec print(f"--> created docker service {service.ID} with {service.Spec.Name}") - assert service.Spec.Labels == (labels or {}) + assert service.Spec.Labels == base_labels created_services.append(service) # get more info on that service @@ -351,6 +359,8 @@ async def _creator( "Runtime", "root['ContainerSpec']['Isolation']", } + if not base_labels: + excluded_paths.add("root['ContainerSpec']['Labels']") for reservation in ["MemoryBytes", "NanoCPUs"]: if ( task_template.get("Resources", {}) @@ -369,7 +379,7 @@ async def _creator( exclude_paths=excluded_paths, ) assert not diff, f"{diff}" - assert service.Spec.Labels == (labels or {}) + assert service.Spec.Labels == base_labels await assert_for_service_state( async_docker_client, service, [wait_for_service_state] ) @@ -378,7 +388,8 @@ async def _creator( yield _creator await asyncio.gather( - *(async_docker_client.services.delete(s.ID) for s in created_services) + *(async_docker_client.services.delete(s.ID) for s in created_services), + return_exceptions=True, ) # wait until all tasks are gone @@ -494,21 +505,25 @@ def mocked_aws_server_envs( return app_environment | setenvs_from_dict(monkeypatch, changed_envs) +@pytest.fixture(scope="session") +def aws_allowed_ec2_instance_type_names() -> list[InstanceTypeType]: + return [ + "t2.xlarge", + "t2.2xlarge", + "g3.4xlarge", + "r5n.4xlarge", + "r5n.8xlarge", + ] + + @pytest.fixture -def aws_allowed_ec2_instance_type_names( +def aws_allowed_ec2_instance_type_names_env( app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, + aws_allowed_ec2_instance_type_names: list[InstanceTypeType], ) -> EnvVarsDict: changed_envs = { - "EC2_INSTANCES_ALLOWED_TYPES": json.dumps( - [ - "t2.xlarge", - "t2.2xlarge", - "g3.4xlarge", - "r5n.4xlarge", - "r5n.8xlarge", - ] - ), + "EC2_INSTANCES_ALLOWED_TYPES": json.dumps(aws_allowed_ec2_instance_type_names), } return app_environment | setenvs_from_dict(monkeypatch, changed_envs) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index 4adc33735cd..b187de71aee 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -3,10 +3,13 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable # pylint: disable=too-many-arguments +# pylint: disable=too-many-statements import asyncio import base64 +import datetime +import logging from collections.abc import Callable, Iterator from copy import deepcopy from dataclasses import dataclass @@ -15,7 +18,10 @@ import distributed import pytest +from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY +from faker import Faker from fastapi import FastAPI +from models_library.docker import DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY from models_library.generated_models.docker_rest_api import Availability from models_library.generated_models.docker_rest_api import Node as DockerNode from models_library.generated_models.docker_rest_api import NodeState, NodeStatus @@ -40,6 +46,7 @@ from simcore_service_autoscaling.modules.dask import DaskTaskResources from simcore_service_autoscaling.modules.docker import get_docker_client from types_aiobotocore_ec2.client import EC2Client +from types_aiobotocore_ec2.literals import InstanceTypeType @pytest.fixture @@ -66,7 +73,7 @@ def minimal_configuration( aws_subnet_id: str, aws_security_group_id: str, aws_ami_id: str, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], mocked_redis_server: None, ) -> None: ... @@ -288,7 +295,48 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( ) -async def test_cluster_scaling_up( +@pytest.fixture +def create_dask_task_resources() -> Callable[..., DaskTaskResources]: + def _do( + ec2_instance_type: InstanceTypeType | None, ram: ByteSize + ) -> DaskTaskResources: + resources = DaskTaskResources( + { + "RAM": int(ram), + } + ) + if ec2_instance_type is not None: + resources[DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY] = ec2_instance_type + return resources + + return _do + + +@pytest.mark.acceptance_test() +@pytest.mark.parametrize( + "dask_task_imposed_ec2_type, dask_ram, expected_ec2_type", + [ + pytest.param( + None, + parse_obj_as(ByteSize, "128Gib"), + "r5n.4xlarge", + id="No explicit instance defined", + ), + pytest.param( + "t2.xlarge", + parse_obj_as(ByteSize, "4Gib"), + "t2.xlarge", + id="Explicitely ask for t2.xlarge", + ), + pytest.param( + "r5n.8xlarge", + parse_obj_as(ByteSize, "128Gib"), + "r5n.8xlarge", + id="Explicitely ask for r5n.8xlarge", + ), + ], +) +async def test_cluster_scaling_up_and_down( # noqa: PLR0915 minimal_configuration: None, app_settings: ApplicationSettings, initialized_app: FastAPI, @@ -302,13 +350,20 @@ async def test_cluster_scaling_up( mock_compute_node_used_resources: mock.Mock, mocker: MockerFixture, dask_spec_local_cluster: distributed.SpecCluster, + create_dask_task_resources: Callable[..., DaskTaskResources], + dask_task_imposed_ec2_type: InstanceTypeType | None, + dask_ram: ByteSize, + expected_ec2_type: InstanceTypeType, ): # we have nothing running now all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] # create a task that needs more power - dask_future = create_dask_task({"RAM": int(parse_obj_as(ByteSize, "128GiB"))}) + dask_task_resources = create_dask_task_resources( + dask_task_imposed_ec2_type, dask_ram + ) + dask_future = create_dask_task(dask_task_resources) assert dask_future # this should trigger a scaling up as we have no nodes @@ -321,7 +376,7 @@ async def test_cluster_scaling_up( ec2_client, num_reservations=1, num_instances=1, - instance_type="r5n.4xlarge", + instance_type=expected_ec2_type, instance_state="running", ) @@ -350,7 +405,7 @@ async def test_cluster_scaling_up( ec2_client, num_reservations=1, num_instances=1, - instance_type="r5n.4xlarge", + instance_type=expected_ec2_type, instance_state="running", ) assert len(internal_dns_names) == 1 @@ -359,7 +414,9 @@ async def test_cluster_scaling_up( # the node is tagged and made active right away since we still have the pending task mock_find_node_with_name.assert_called_once() mock_find_node_with_name.reset_mock() - expected_docker_node_tags = {} + expected_docker_node_tags = { + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type + } mock_tag_node.assert_called_once_with( get_docker_client(initialized_app), fake_node, @@ -371,6 +428,7 @@ async def test_cluster_scaling_up( get_docker_client(initialized_app), fake_node, available=True ) mock_set_node_availability.reset_mock() + # in this case there is no message sent since the worker was not started yet mock_rabbitmq_post_message.assert_not_called() @@ -402,14 +460,171 @@ async def test_cluster_scaling_up( ec2_client, num_reservations=1, num_instances=1, - instance_type="r5n.4xlarge", + instance_type=expected_ec2_type, instance_state="running", ) - # check rabbit messages were sent + # check no rabbit messages were sent # NOTE: we currently have no real dask-worker here mock_rabbitmq_post_message.assert_not_called() + # + # 4. now scaling down, as we deleted all the tasks + # + del dask_future + # the worker will not be found since there is none in this testing environment, but it should not raise + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + # check the number of instances did not change and is still running + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + # now mock the call so that it triggers deactivation + mocked_dask_get_worker_still_has_results_in_memory = mocker.patch( + "simcore_service_autoscaling.modules.dask.get_worker_still_has_results_in_memory", + return_value=0, + autospec=True, + ) + mocked_dask_get_worker_used_resources = mocker.patch( + "simcore_service_autoscaling.modules.dask.get_worker_used_resources", + return_value=Resources.create_as_empty(), + autospec=True, + ) + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mocked_dask_get_worker_still_has_results_in_memory.assert_called() + assert mocked_dask_get_worker_still_has_results_in_memory.call_count == 2 + mocked_dask_get_worker_still_has_results_in_memory.reset_mock() + mocked_dask_get_worker_used_resources.assert_called() + assert mocked_dask_get_worker_used_resources.call_count == 2 + mocked_dask_get_worker_used_resources.reset_mock() + # the node shall be set to drain, but not yet terminated + mock_set_node_availability.assert_called_once_with( + get_docker_client(initialized_app), fake_node, available=False + ) + mock_set_node_availability.reset_mock() + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + + # we artifically set the node to drain + fake_node.Spec.Availability = Availability.drain + fake_node.UpdatedAt = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + # the node will be not be terminated beforet the timeout triggers + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert ( + datetime.timedelta(seconds=5) + < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION + ) + mocked_docker_remove_node = mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + return_value=None, + autospec=True, + ) + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mocked_docker_remove_node.assert_not_called() + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + + # now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance + fake_node.UpdatedAt = ( + datetime.datetime.now(tz=datetime.timezone.utc) + - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION + - datetime.timedelta(seconds=1) + ).isoformat() + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mocked_docker_remove_node.assert_called_once_with(mock.ANY, [fake_node], force=True) + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="terminated", + ) + + +async def test_cluster_does_not_scale_up_if_defined_instance_is_not_allowed( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + create_dask_task_resources: Callable[..., DaskTaskResources], + ec2_client: EC2Client, + faker: Faker, + caplog: pytest.LogCaptureFixture, +): + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # create a task that needs more power + dask_task_resources = create_dask_task_resources( + faker.pystr(), parse_obj_as(ByteSize, "128GiB") + ) + dask_future = create_dask_task(dask_task_resources) + assert dask_future + + # this should trigger a scaling up as we have no nodes + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + + # nothing runs + assert not all_instances["Reservations"] + # check there is an error in the logs + error_messages = [ + x.message for x in caplog.get_records("call") if x.levelno == logging.ERROR + ] + assert len(error_messages) == 1 + assert "Unexpected error:" in error_messages[0] + + +async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_resources( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + create_dask_task_resources: Callable[..., DaskTaskResources], + ec2_client: EC2Client, + faker: Faker, + caplog: pytest.LogCaptureFixture, +): + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # create a task that needs more power + dask_task_resources = create_dask_task_resources( + "t2.xlarge", parse_obj_as(ByteSize, "128GiB") + ) + dask_future = create_dask_task(dask_task_resources) + assert dask_future + + # this should trigger a scaling up as we have no nodes + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + + # nothing runs + assert not all_instances["Reservations"] + # check there is an error in the logs + error_messages = [ + x.message for x in caplog.get_records("call") if x.levelno == logging.ERROR + ] + assert len(error_messages) == 1 + assert "Unexpected error:" in error_messages[0] + @dataclass(frozen=True) class _ScaleUpParams: diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 091f6b4a869..9b1cfc77824 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -3,6 +3,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable # pylint: disable=too-many-arguments +# pylint: disable=too-many-statements import asyncio @@ -17,7 +18,10 @@ import pytest from faker import Faker from fastapi import FastAPI -from models_library.docker import DockerLabelKey +from models_library.docker import ( + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, + DockerLabelKey, +) from models_library.generated_models.docker_rest_api import ( Availability, Node, @@ -48,6 +52,7 @@ ) from simcore_service_autoscaling.modules.ec2 import EC2InstanceData from types_aiobotocore_ec2.client import EC2Client +from types_aiobotocore_ec2.literals import InstanceTypeType @pytest.fixture @@ -206,7 +211,7 @@ def minimal_configuration( aws_subnet_id: str, aws_security_group_id: str, aws_ami_id: str, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], mocked_redis_server: None, ) -> None: ... @@ -230,10 +235,7 @@ def _assert_rabbit_autoscaling_message_sent( instances_running=0, ) expected_message = default_message.copy(update=message_update_kwargs) - mock_rabbitmq_post_message.assert_called_once_with( - app, - expected_message, - ) + assert mock_rabbitmq_post_message.call_args == mock.call(app, expected_message) async def test_cluster_scaling_from_labelled_services_with_no_services_does_nothing( @@ -259,7 +261,7 @@ async def test_cluster_scaling_from_labelled_services_with_no_services_and_machi mock_machines_buffer: int, app_settings: ApplicationSettings, initialized_app: FastAPI, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], mock_rabbitmq_post_message: mock.Mock, mock_compute_node_used_resources: mock.Mock, mock_find_node_with_name: mock.Mock, @@ -422,13 +424,38 @@ async def _assert_ec2_instances( return internal_dns_names -async def test_cluster_scaling_up( +@pytest.mark.testit +@pytest.mark.acceptance_test() +@pytest.mark.parametrize( + "docker_service_imposed_ec2_type, docker_service_ram, expected_ec2_type", + [ + pytest.param( + None, + parse_obj_as(ByteSize, "128Gib"), + "r5n.4xlarge", + id="No explicit instance defined", + ), + pytest.param( + "t2.xlarge", + parse_obj_as(ByteSize, "4Gib"), + "t2.xlarge", + id="Explicitely ask for t2.xlarge", + ), + pytest.param( + "r5n.8xlarge", + parse_obj_as(ByteSize, "128Gib"), + "r5n.8xlarge", + id="Explicitely ask for r5n.8xlarge", + ), + ], +) +async def test_cluster_scaling_up_and_down( minimal_configuration: None, service_monitored_labels: dict[DockerLabelKey, str], app_settings: ApplicationSettings, initialized_app: FastAPI, create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] + [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] ], task_template: dict[str, Any], create_task_reservations: Callable[[int, int], dict[str, Any]], @@ -440,16 +467,25 @@ async def test_cluster_scaling_up( mock_set_node_availability: mock.Mock, mock_compute_node_used_resources: mock.Mock, mocker: MockerFixture, + docker_service_imposed_ec2_type: InstanceTypeType | None, + docker_service_ram: ByteSize, + expected_ec2_type: InstanceTypeType, + async_docker_client: aiodocker.Docker, ): # we have nothing running now all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] # create a task that needs more power - await create_service( - task_template | create_task_reservations(4, parse_obj_as(ByteSize, "128GiB")), + docker_service = await create_service( + task_template | create_task_reservations(4, docker_service_ram), service_monitored_labels, "pending", + [ + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={ docker_service_imposed_ec2_type}" + ] + if docker_service_imposed_ec2_type + else [], ) # this should trigger a scaling up as we have no nodes @@ -462,7 +498,7 @@ async def test_cluster_scaling_up( ec2_client, num_reservations=1, num_instances=1, - instance_type="r5n.4xlarge", + instance_type=expected_ec2_type, instance_state="running", ) @@ -495,11 +531,12 @@ async def test_cluster_scaling_up( ec2_client, num_reservations=1, num_instances=1, - instance_type="r5n.4xlarge", + instance_type=expected_ec2_type, instance_state="running", ) assert len(internal_dns_names) == 1 internal_dns_name = internal_dns_names[0].removesuffix(".ec2.internal") + # the node is tagged and made active right away since we still have the pending task mock_find_node_with_name.assert_called_once() mock_find_node_with_name.reset_mock() @@ -510,7 +547,7 @@ async def test_cluster_scaling_up( app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS ) - } + } | {DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type} mock_tag_node.assert_called_once_with( get_docker_client(initialized_app), fake_node, @@ -523,7 +560,7 @@ async def test_cluster_scaling_up( ) mock_set_node_availability.reset_mock() - # check rabbit messages were sent + # check rabbit messages were sent, we do have worker assert fake_node.Description assert fake_node.Description.Resources assert fake_node.Description.Resources.NanoCPUs @@ -573,7 +610,7 @@ async def test_cluster_scaling_up( ec2_client, num_reservations=1, num_instances=1, - instance_type="r5n.4xlarge", + instance_type=expected_ec2_type, instance_state="running", ) @@ -581,6 +618,80 @@ async def test_cluster_scaling_up( mock_rabbitmq_post_message.assert_called() mock_rabbitmq_post_message.reset_mock() + # + # 4. now scaling down by removing the docker service + # + assert docker_service.ID + await async_docker_client.services.delete(docker_service.ID) + # + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + # check the number of instances did not change and is still running + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + mock_set_node_availability.assert_called_once_with( + get_docker_client(initialized_app), fake_node, available=False + ) + mock_set_node_availability.reset_mock() + + # calling again does the exact same + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mock_set_node_availability.assert_called_once_with( + get_docker_client(initialized_app), fake_node, available=False + ) + mock_set_node_availability.reset_mock() + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + + # we artifically set the node to drain + fake_node.Spec.Availability = Availability.drain + fake_node.UpdatedAt = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + # the node will be not be terminated before the timeout triggers + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert ( + datetime.timedelta(seconds=5) + < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION + ) + mocked_docker_remove_node = mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + return_value=None, + autospec=True, + ) + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mocked_docker_remove_node.assert_not_called() + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="running", + ) + + # now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance + fake_node.UpdatedAt = ( + datetime.datetime.now(tz=datetime.timezone.utc) + - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION + - datetime.timedelta(seconds=1) + ).isoformat() + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mocked_docker_remove_node.assert_called_once_with(mock.ANY, [fake_node], force=True) + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type=expected_ec2_type, + instance_state="terminated", + ) + @dataclass(frozen=True) class _ScaleUpParams: @@ -784,114 +895,6 @@ async def test__find_terminateable_nodes_with_no_hosts( assert await _find_terminateable_instances(initialized_app, active_cluster) == [] -async def test__find_terminateable_nodes_with_drained_host( - minimal_configuration: None, - with_valid_time_before_termination: datetime.timedelta, - initialized_app: FastAPI, - cluster: Callable[..., Cluster], - drained_host_node: Node, - app_settings: ApplicationSettings, - fake_ec2_instance_data: Callable[..., EC2InstanceData], -): - assert app_settings.AUTOSCALING_EC2_INSTANCES - assert ( - datetime.timedelta(seconds=10) - < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - ), "this tests relies on the fact that the time before termination is above 10 seconds" - - # if the instance started just about now, then it should not be terminateable - active_cluster_with_drained_nodes_started_now = cluster( - drained_nodes=[ - AssociatedInstance( - drained_host_node, - fake_ec2_instance_data( - launch_time=datetime.datetime.now(datetime.timezone.utc) - ), - ) - ], - reserve_drained_nodes=[ - AssociatedInstance( - drained_host_node, - fake_ec2_instance_data( - launch_time=datetime.datetime.now(datetime.timezone.utc) - ), - ) - ], - ) - assert ( - await _find_terminateable_instances( - initialized_app, active_cluster_with_drained_nodes_started_now - ) - == [] - ) - - # if the instance started just after the termination time, even on several days, it is not terminateable - active_cluster_with_drained_nodes_not_inthe_window = cluster( - drained_nodes=[ - AssociatedInstance( - drained_host_node, - fake_ec2_instance_data( - launch_time=datetime.datetime.now(datetime.timezone.utc) - - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - - datetime.timedelta(days=21) - + datetime.timedelta(seconds=10) - ), - ) - ], - reserve_drained_nodes=[ - AssociatedInstance( - drained_host_node, - fake_ec2_instance_data( - launch_time=datetime.datetime.now(datetime.timezone.utc) - - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - - datetime.timedelta(days=21) - + datetime.timedelta(seconds=10) - ), - ) - ], - ) - assert ( - await _find_terminateable_instances( - initialized_app, active_cluster_with_drained_nodes_not_inthe_window - ) - == [] - ) - - # if the instance started just before the termination time, even on several days, it is terminateable - active_cluster_with_drained_nodes_long_time_ago_terminateable = cluster( - drained_nodes=[ - AssociatedInstance( - drained_host_node, - fake_ec2_instance_data( - launch_time=datetime.datetime.now(datetime.timezone.utc) - - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - - datetime.timedelta(days=21) - - datetime.timedelta(seconds=10), - ), - ) - ], - reserve_drained_nodes=[ - AssociatedInstance( - drained_host_node, - fake_ec2_instance_data( - launch_time=datetime.datetime.now(datetime.timezone.utc) - - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - - datetime.timedelta(days=21) - - datetime.timedelta(seconds=10), - ), - ) - ], - ) - - assert ( - await _find_terminateable_instances( - initialized_app, - active_cluster_with_drained_nodes_long_time_ago_terminateable, - ) - == active_cluster_with_drained_nodes_long_time_ago_terminateable.drained_nodes - ) - - @pytest.fixture def create_associated_instance( fake_ec2_instance_data: Callable[..., EC2InstanceData], @@ -950,44 +953,6 @@ async def test__try_scale_down_cluster_with_no_nodes( mock_remove_nodes.assert_not_called() -async def test__try_scale_down_cluster( - minimal_configuration: None, - with_valid_time_before_termination: datetime.timedelta, - initialized_app: FastAPI, - cluster: Callable[..., Cluster], - host_node: Node, - drained_host_node: Node, - mock_terminate_instances: mock.Mock, - mock_remove_nodes: mock.Mock, - app_settings: ApplicationSettings, - create_associated_instance: Callable[[Node, bool], AssociatedInstance], -): - assert app_settings.AUTOSCALING_EC2_INSTANCES - assert ( - datetime.timedelta(seconds=10) - < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - ), "this tests relies on the fact that the time before termination is above 10 seconds" - - active_cluster = cluster( - active_nodes=[create_associated_instance(host_node, True)], # noqa: FBT003 - drained_nodes=[ - create_associated_instance(drained_host_node, True) # noqa: FBT003 - ], - reserve_drained_nodes=[ - create_associated_instance(drained_host_node, True) # noqa: FBT003 - ], - ) - - updated_cluster = await _try_scale_down_cluster(initialized_app, active_cluster) - assert not updated_cluster.drained_nodes - assert updated_cluster.reserve_drained_nodes - assert updated_cluster.reserve_drained_nodes == active_cluster.reserve_drained_nodes - assert updated_cluster.active_nodes - assert updated_cluster.active_nodes == active_cluster.active_nodes - mock_terminate_instances.assert_called_once() - mock_remove_nodes.assert_called_once() - - async def test__activate_drained_nodes_with_no_tasks( minimal_configuration: None, with_valid_time_before_termination: datetime.timedelta, diff --git a/services/autoscaling/tests/unit/test_modules_ec2.py b/services/autoscaling/tests/unit/test_modules_ec2.py index f412e40f36d..e186b14b4ac 100644 --- a/services/autoscaling/tests/unit/test_modules_ec2.py +++ b/services/autoscaling/tests/unit/test_modules_ec2.py @@ -96,7 +96,7 @@ async def test_ec2_client_when_ec2_server_goes_up_and_down( async def test_ping( mocked_aws_server: ThreadedMotoServer, mocked_aws_server_envs: None, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], app_settings: ApplicationSettings, autoscaling_ec2: AutoscalingEC2, ): @@ -109,7 +109,7 @@ async def test_ping( async def test_get_ec2_instance_capabilities( mocked_aws_server_envs: None, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], app_settings: ApplicationSettings, autoscaling_ec2: AutoscalingEC2, ): diff --git a/services/autoscaling/tests/unit/test_utils_computational_scaling.py b/services/autoscaling/tests/unit/test_utils_computational_scaling.py index 76bf27a4857..4cd3ae46208 100644 --- a/services/autoscaling/tests/unit/test_utils_computational_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_computational_scaling.py @@ -25,8 +25,8 @@ _DEFAULT_MAX_RAM, get_max_resources_from_dask_task, try_assigning_task_to_instance_types, + try_assigning_task_to_instances, try_assigning_task_to_node, - try_assigning_task_to_pending_instances, ) @@ -140,7 +140,7 @@ async def test_try_assigning_task_to_pending_instances_with_no_instances( ): task = fake_task() assert ( - await try_assigning_task_to_pending_instances( + await try_assigning_task_to_instances( fake_app, task, [], {}, notify_progress=True ) is False @@ -164,7 +164,7 @@ async def test_try_assigning_task_to_pending_instances( } # calling once should allow to add that task to the instance assert ( - await try_assigning_task_to_pending_instances( + await try_assigning_task_to_instances( fake_app, task, pending_instance_to_tasks, @@ -176,7 +176,7 @@ async def test_try_assigning_task_to_pending_instances( assert pending_instance_to_tasks[0][1] == [task] # calling a second time as well should allow to add that task to the instance assert ( - await try_assigning_task_to_pending_instances( + await try_assigning_task_to_instances( fake_app, task, pending_instance_to_tasks, @@ -188,7 +188,7 @@ async def test_try_assigning_task_to_pending_instances( assert pending_instance_to_tasks[0][1] == [task, task] # calling a third time should fail assert ( - await try_assigning_task_to_pending_instances( + await try_assigning_task_to_instances( fake_app, task, pending_instance_to_tasks, diff --git a/services/autoscaling/tests/unit/test_utils_ec2.py b/services/autoscaling/tests/unit/test_utils_ec2.py index 3ca06a3da63..dc9ea43d8be 100644 --- a/services/autoscaling/tests/unit/test_utils_ec2.py +++ b/services/autoscaling/tests/unit/test_utils_ec2.py @@ -13,7 +13,7 @@ Ec2InstanceNotFoundError, ) from simcore_service_autoscaling.models import Resources -from simcore_service_autoscaling.utils.ec2 import ( +from simcore_service_autoscaling.utils.utils_ec2 import ( EC2InstanceType, closest_instance_policy, compose_user_data, diff --git a/services/clusters-keeper/tests/unit/conftest.py b/services/clusters-keeper/tests/unit/conftest.py index eb240ee7848..c3b06077764 100644 --- a/services/clusters-keeper/tests/unit/conftest.py +++ b/services/clusters-keeper/tests/unit/conftest.py @@ -220,7 +220,7 @@ def mocked_aws_server_envs( @pytest.fixture -def aws_allowed_ec2_instance_type_names( +def aws_allowed_ec2_instance_type_names_env( app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, ) -> EnvVarsDict: diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters.py b/services/clusters-keeper/tests/unit/test_modules_clusters.py index a38d17d6629..a0d020cb5e3 100644 --- a/services/clusters-keeper/tests/unit/test_modules_clusters.py +++ b/services/clusters-keeper/tests/unit/test_modules_clusters.py @@ -50,7 +50,7 @@ def _base_configuration( aws_subnet_id: str, aws_security_group_id: str, aws_ami_id: str, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], mocked_redis_server: None, ) -> None: ... diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py b/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py index d7f11b0982d..6461955bdfc 100644 --- a/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py +++ b/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py @@ -60,7 +60,7 @@ def _base_configuration( aws_subnet_id: str, aws_security_group_id: str, aws_ami_id: str, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], disabled_rabbitmq: None, mocked_redis_server: None, ) -> None: diff --git a/services/clusters-keeper/tests/unit/test_modules_ec2.py b/services/clusters-keeper/tests/unit/test_modules_ec2.py index 9962de4a5f6..ed7ee7ec4fc 100644 --- a/services/clusters-keeper/tests/unit/test_modules_ec2.py +++ b/services/clusters-keeper/tests/unit/test_modules_ec2.py @@ -101,7 +101,7 @@ async def test_ec2_client_when_ec2_server_goes_up_and_down( async def test_ping( mocked_aws_server: ThreadedMotoServer, mocked_aws_server_envs: None, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], app_settings: ApplicationSettings, clusters_keeper_ec2: ClustersKeeperEC2, ): @@ -114,7 +114,7 @@ async def test_ping( async def test_get_ec2_instance_capabilities( mocked_aws_server_envs: None, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], app_settings: ApplicationSettings, clusters_keeper_ec2: ClustersKeeperEC2, ): diff --git a/services/clusters-keeper/tests/unit/test_rpc_clusters.py b/services/clusters-keeper/tests/unit/test_rpc_clusters.py index ccb64f434db..34d4dad6fec 100644 --- a/services/clusters-keeper/tests/unit/test_rpc_clusters.py +++ b/services/clusters-keeper/tests/unit/test_rpc_clusters.py @@ -46,7 +46,7 @@ def _base_configuration( aws_subnet_id: str, aws_security_group_id: str, aws_ami_id: str, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], mocked_redis_server: None, initialized_app: FastAPI, ) -> None: diff --git a/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py index 538edc4d166..fef4c438487 100644 --- a/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py +++ b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py @@ -24,7 +24,7 @@ def _base_configuration( aws_subnet_id: str, aws_security_group_id: str, aws_ami_id: str, - aws_allowed_ec2_instance_type_names: list[str], + aws_allowed_ec2_instance_type_names_env: list[str], mocked_redis_server: None, initialized_app: FastAPI, ) -> None: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index d392eb247b1..d2dd0c1926a 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -19,6 +19,7 @@ from typing import Any import distributed +from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from dask_task_models_library.container_tasks.docker import DockerBasicAuth from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.io import ( @@ -264,7 +265,7 @@ def _comp_sidecar_fct( # pylint: disable=too-many-arguments # noqa: PLR0913 ) if hardware_info.aws_ec2_instances: dask_resources[ - f"EC2-INSTANCE-TYPE:{hardware_info.aws_ec2_instances[0]}" + f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{hardware_info.aws_ec2_instances[0]}" ] = 1 check_scheduler_is_still_the_same(