diff --git a/packages/aws-library/src/aws_library/ec2/_client.py b/packages/aws-library/src/aws_library/ec2/_client.py index 14094939dde..a40cf794304 100644 --- a/packages/aws-library/src/aws_library/ec2/_client.py +++ b/packages/aws-library/src/aws_library/ec2/_client.py @@ -2,7 +2,7 @@ import logging from collections.abc import Iterable, Sequence from dataclasses import dataclass -from typing import cast +from typing import Literal, cast import aioboto3 import botocore.exceptions @@ -66,20 +66,28 @@ async def ping(self) -> bool: @ec2_exception_handler(_logger) async def get_ec2_instance_capabilities( self, - instance_type_names: set[InstanceTypeType], + instance_type_names: set[InstanceTypeType] | Literal["ALL"], ) -> list[EC2InstanceType]: - """returns the ec2 instance types from a list of instance type names - NOTE: the order might differ! + """Returns the ec2 instance types from a list of instance type names (sorted by name) + Arguments: - instance_type_names -- the types to filter with + instance_type_names -- the types to filter with or "ALL", to return all EC2 possible instances Raises: Ec2InstanceTypeInvalidError: some invalid types were used as filter ClustersKeeperRuntimeError: unexpected error communicating with EC2 """ + if instance_type_names == "ALL": + selection_or_all_if_empty = [] + else: + selection_or_all_if_empty = list(instance_type_names) + if len(selection_or_all_if_empty) == 0: + msg = "`instance_type_names` cannot be an empty set. Use either a selection or 'ALL'" + raise ValueError(msg) + instance_types = await self.client.describe_instance_types( - InstanceTypes=list(instance_type_names) + InstanceTypes=selection_or_all_if_empty ) list_instances: list[EC2InstanceType] = [] for instance in instance_types.get("InstanceTypes", []): @@ -95,7 +103,7 @@ async def get_ec2_instance_capabilities( ), ) ) - return list_instances + return sorted(list_instances, key=lambda i: i.name) @ec2_exception_handler(_logger) async def launch_instances( diff --git a/packages/aws-library/tests/test_ec2_client.py b/packages/aws-library/tests/test_ec2_client.py index 625555e9f5d..b940383fdd3 100644 --- a/packages/aws-library/tests/test_ec2_client.py +++ b/packages/aws-library/tests/test_ec2_client.py @@ -116,23 +116,25 @@ async def test_get_ec2_instance_capabilities( ) ) assert instance_types - assert len(instance_types) == len(ec2_allowed_instances) + assert [_.name for _ in instance_types] == sorted(ec2_allowed_instances) - # all the instance names are found and valid - assert all(i.name in ec2_allowed_instances for i in instance_types) - for instance_type_name in ec2_allowed_instances: - assert any(i.name == instance_type_name for i in instance_types) - -async def test_get_ec2_instance_capabilities_empty_list_returns_all_options( +async def test_get_ec2_instance_capabilities_returns_all_options( simcore_ec2_api: SimcoreEC2API, ): - instance_types = await simcore_ec2_api.get_ec2_instance_capabilities(set()) + instance_types = await simcore_ec2_api.get_ec2_instance_capabilities("ALL") assert instance_types # NOTE: this might need adaptation when moto is updated assert 700 < len(instance_types) < 828 +async def test_get_ec2_instance_capabilities_raise_with_empty_set( + simcore_ec2_api: SimcoreEC2API, +): + with pytest.raises(ValueError, match="instance_type_names"): + await simcore_ec2_api.get_ec2_instance_capabilities(set()) + + async def test_get_ec2_instance_capabilities_with_invalid_type_raises( simcore_ec2_api: SimcoreEC2API, faker: Faker, diff --git a/packages/service-integration/src/service_integration/pytest_plugin/folder_structure.py b/packages/service-integration/src/service_integration/pytest_plugin/folder_structure.py index 47969490661..ef87dbb5ceb 100644 --- a/packages/service-integration/src/service_integration/pytest_plugin/folder_structure.py +++ b/packages/service-integration/src/service_integration/pytest_plugin/folder_structure.py @@ -46,21 +46,6 @@ def metadata_file(project_slug_dir: Path, request: pytest.FixtureRequest) -> Pat return metadata_file -def get_expected_files(docker_name: str) -> tuple[str, ...]: - return ( - ".cookiecutterrc", - ".dockerignore", - "metadata:metadata.yml", - f"docker/{docker_name}:entrypoint.sh", - f"docker/{docker_name}:Dockerfile", - "service.cli:execute.sh", - "docker-compose-build.yml", - "docker-compose-meta.yml", - "docker-compose.devel.yml", - "docker-compose.yml", - ) - - def assert_path_in_repo(expected_path: str, project_slug_dir: Path): if ":" in expected_path: diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/clusters_keeper/ec2_instances.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/clusters_keeper/ec2_instances.py index 433eee07fa3..9358c60f7ca 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/clusters_keeper/ec2_instances.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/clusters_keeper/ec2_instances.py @@ -1,3 +1,5 @@ +from typing import Literal + from models_library.api_schemas_clusters_keeper import CLUSTERS_KEEPER_RPC_NAMESPACE from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceTypeGet from models_library.rabbitmq_basic_types import RPCMethodName @@ -7,7 +9,7 @@ async def get_instance_type_details( - client: RabbitMQRPCClient, *, instance_type_names: set[str] + client: RabbitMQRPCClient, *, instance_type_names: set[str] | Literal["ALL"] ) -> list[EC2InstanceTypeGet]: """**Remote method** diff --git a/services/autoscaling/.cookiecutterrc b/services/autoscaling/.cookiecutterrc deleted file mode 100644 index 5ce0be7364d..00000000000 --- a/services/autoscaling/.cookiecutterrc +++ /dev/null @@ -1,20 +0,0 @@ -# This file exists so you can easily regenerate your project. -# -# cookiecutter --overwrite-if-exists --config-file=.cookiecutterrc /home/crespo/devp/cookiecutter-simcore-py-fastapi -# - -default_context: - - _extensions: ['jinja2_time.TimeExtension'] - _output_dir: '/home/crespo/devp/cookiecutter-simcore-py-fastapi/.output/osparc-simcore/services' - _template: '/home/crespo/devp/cookiecutter-simcore-py-fastapi' - detailed_doc: 'n' - distribution_name: 'simcore-service-autoscaling' - full_name: 'Pedro Crespo-Valero' - github_username: 'pcrespov' - package_name: 'simcore_service_autoscaling' - project_name: 'Auto scaling service' - project_short_description: 'Service to auto-scale swarm' - project_slug: 'autoscaling' - version: '0.1.0-alpha' - year: '2022' diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index 23af4b958bf..78cde19b50e 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -60,7 +60,8 @@ class Config(EC2Settings.Config): class EC2InstancesSettings(BaseCustomSettings): EC2_INSTANCES_ALLOWED_TYPES: dict[str, EC2InstanceBootSpecific] = Field( ..., - description="Defines which EC2 instances are considered as candidates for new EC2 instance and their respective boot specific parameters", + description="Defines which EC2 instances are considered as candidates for new EC2 instance and their respective boot specific parameters" + "NOTE: minimum length >0", ) EC2_INSTANCES_KEY_NAME: str = Field( @@ -133,7 +134,7 @@ class EC2InstancesSettings(BaseCustomSettings): @validator("EC2_INSTANCES_TIME_BEFORE_DRAINING") @classmethod - def ensure_draining_delay_time_is_in_range( + def _ensure_draining_delay_time_is_in_range( cls, value: datetime.timedelta ) -> datetime.timedelta: if value < datetime.timedelta(seconds=10): @@ -144,7 +145,7 @@ def ensure_draining_delay_time_is_in_range( @validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION") @classmethod - def ensure_termination_delay_time_is_in_range( + def _ensure_termination_delay_time_is_in_range( cls, value: datetime.timedelta ) -> datetime.timedelta: if value < datetime.timedelta(minutes=0): @@ -155,12 +156,18 @@ def ensure_termination_delay_time_is_in_range( @validator("EC2_INSTANCES_ALLOWED_TYPES") @classmethod - def check_valid_instance_names( + def _check_valid_instance_names_and_not_empty( cls, value: dict[str, EC2InstanceBootSpecific] ) -> dict[str, EC2InstanceBootSpecific]: # NOTE: needed because of a flaw in BaseCustomSettings # issubclass raises TypeError if used on Aliases parse_obj_as(list[InstanceTypeType], list(value)) + + if not value: + # NOTE: Field( ... , min_items=...) cannot be used to contraint number of iterms in a dict + msg = "At least one item expecte EC2_INSTANCES_ALLOWED_TYPES, got none" + raise ValueError(msg) + return value @@ -293,12 +300,12 @@ def LOG_LEVEL(self): # noqa: N802 @validator("AUTOSCALING_LOGLEVEL") @classmethod - def valid_log_level(cls, value: str) -> str: + def _valid_log_level(cls, value: str) -> str: return cls.validate_log_level(value) @root_validator() @classmethod - def exclude_both_dynamic_computational_mode(cls, values): + def _exclude_both_dynamic_computational_mode(cls, values): if ( values.get("AUTOSCALING_DASK") is not None and values.get("AUTOSCALING_NODES_MONITORING") is not None diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index b1e629f4e7a..e7fc947cba5 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -2,6 +2,7 @@ import collections import dataclasses import datetime +import functools import itertools import logging from typing import Final, cast @@ -327,30 +328,30 @@ async def _try_attach_pending_ec2s( ) -async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: +async def _sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: app_settings: ApplicationSettings = app.state.settings assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec ec2_client = get_ec2_client(app) - # some instances might be able to run several tasks + allowed_instance_type_names = list( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES + ) + + assert ( # nosec + allowed_instance_type_names + ), "EC2_INSTANCES_ALLOWED_TYPES cannot be empty!" + allowed_instance_types: list[ EC2InstanceType ] = await ec2_client.get_ec2_instance_capabilities( - cast( - set[InstanceTypeType], - set( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES, - ), - ) + cast(set[InstanceTypeType], set(allowed_instance_type_names)) ) - def _sort_according_to_allowed_types(instance_type: EC2InstanceType) -> int: - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - return list( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES - ).index(f"{instance_type.name}") + def _as_selection(instance_type: EC2InstanceType) -> int: + # NOTE: will raise ValueError if allowed_instance_types not in allowed_instance_type_names + return allowed_instance_type_names.index(f"{instance_type.name}") - allowed_instance_types.sort(key=_sort_according_to_allowed_types) + allowed_instance_types.sort(key=_as_selection) return allowed_instance_types @@ -497,51 +498,44 @@ async def _assign_tasks_to_current_cluster( cluster: Cluster, auto_scaling_mode: BaseAutoscaling, ) -> tuple[list, Cluster]: + """ + Evaluates whether a task can be executed on any instance within the cluster. If the task's resource requirements are met, the task is *denoted* as assigned to the cluster. + Note: This is an estimation only since actual scheduling is handled by Dask/Docker (depending on the mode). + + Returns: + A tuple containing: + - A list of unassigned tasks (tasks whose resource requirements cannot be fulfilled by the available machines in the cluster). + - The same cluster instance passed as input. + """ unassigned_tasks = [] + assignment_predicates = [ + functools.partial(_try_assign_task_to_ec2_instance, instances=instances) + for instances in ( + cluster.active_nodes, + cluster.drained_nodes + cluster.buffer_drained_nodes, + cluster.pending_nodes, + cluster.pending_ec2s, + cluster.buffer_ec2s, + ) + ] + for task in tasks: task_required_resources = auto_scaling_mode.get_task_required_resources(task) task_required_ec2_instance = await auto_scaling_mode.get_task_defined_instance( app, task ) - assignment_functions = [ - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( - task, - instances=cluster.active_nodes, - task_required_ec2_instance=required_ec2, - task_required_resources=required_resources, - ), - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( - task, - instances=cluster.drained_nodes + cluster.buffer_drained_nodes, - task_required_ec2_instance=required_ec2, - task_required_resources=required_resources, - ), - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( - task, - instances=cluster.pending_nodes, - task_required_ec2_instance=required_ec2, - task_required_resources=required_resources, - ), - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( - task, - instances=cluster.pending_ec2s, - task_required_ec2_instance=required_ec2, - task_required_resources=required_resources, - ), - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( - task, - instances=cluster.buffer_ec2s, - task_required_ec2_instance=required_ec2, - task_required_resources=required_resources, - ), - ] - if any( - assignment(task, task_required_ec2_instance, task_required_resources) - for assignment in assignment_functions + is_assigned( + task, + task_required_ec2_instance=task_required_ec2_instance, + task_required_resources=task_required_resources, + ) + for is_assigned in assignment_predicates ): - _logger.debug("assigned task to cluster") + _logger.debug( + "task %s is assigned to one instance available in cluster", task + ) else: unassigned_tasks.append(task) @@ -1131,7 +1125,7 @@ async def _autoscale_cluster( # 1. check if we have pending tasks and resolve them by activating some drained nodes unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app) _logger.info("found %s unrunnable tasks", len(unrunnable_tasks)) - + # NOTE: this function predicts how dask will assign a task to a machine queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster( app, unrunnable_tasks, cluster, auto_scaling_mode ) @@ -1217,11 +1211,13 @@ async def auto_scale_cluster( If there are such tasks, this method will allocate new machines in AWS to cope with the additional load. """ - - allowed_instance_types = await sorted_allowed_instance_types(app) + # current state + allowed_instance_types = await _sorted_allowed_instance_types(app) cluster = await _analyze_current_cluster( app, auto_scaling_mode, allowed_instance_types ) + + # cleanup cluster = await _cleanup_disconnected_nodes(app, cluster) cluster = await _terminate_broken_ec2s(app, cluster) cluster = await _make_pending_buffer_ec2s_join_cluster(app, cluster) @@ -1230,8 +1226,11 @@ async def auto_scale_cluster( ) cluster = await _drain_retired_nodes(app, cluster) + # desired state cluster = await _autoscale_cluster( app, cluster, auto_scaling_mode, allowed_instance_types ) + + # notify await _notify_machine_creation_progress(app, cluster, auto_scaling_mode) await _notify_autoscaling_status(app, cluster, auto_scaling_mode) diff --git a/services/autoscaling/tests/unit/test_core_settings.py b/services/autoscaling/tests/unit/test_core_settings.py index 7b99bb6bf5a..9315c8fcfd1 100644 --- a/services/autoscaling/tests/unit/test_core_settings.py +++ b/services/autoscaling/tests/unit/test_core_settings.py @@ -132,12 +132,16 @@ def test_invalid_EC2_INSTANCES_TIME_BEFORE_TERMINATION( # noqa: N802 ) -def test_EC2_INSTANCES_ALLOWED_TYPES( # noqa: N802 +def test_EC2_INSTANCES_ALLOWED_TYPES_valid( # noqa: N802 app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker ): settings = ApplicationSettings.create_from_envs() assert settings.AUTOSCALING_EC2_INSTANCES + +def test_EC2_INSTANCES_ALLOWED_TYPES_passing_invalid_image_tags( # noqa: N802 + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker +): # passing an invalid image tag name will fail setenvs_from_dict( monkeypatch, @@ -155,6 +159,10 @@ def test_EC2_INSTANCES_ALLOWED_TYPES( # noqa: N802 with pytest.raises(ValidationError): ApplicationSettings.create_from_envs() + +def test_EC2_INSTANCES_ALLOWED_TYPES_passing_valid_image_tags( # noqa: N802 + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker +): # passing a valid will pass setenvs_from_dict( monkeypatch, @@ -176,14 +184,23 @@ def test_EC2_INSTANCES_ALLOWED_TYPES( # noqa: N802 ) settings = ApplicationSettings.create_from_envs() assert settings.AUTOSCALING_EC2_INSTANCES - assert [ + assert next( + iter(settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.values()) + ).pre_pull_images == [ "nginx:latest", "itisfoundation/my-very-nice-service:latest", "simcore/services/dynamic/another-nice-one:2.4.5", "asd", - ] == next( - iter(settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.values()) - ).pre_pull_images + ] + + +def test_EC2_INSTANCES_ALLOWED_TYPES_empty_not_allowed( # noqa: N802 + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch +): + monkeypatch.setenv("EC2_INSTANCES_ALLOWED_TYPES", "{}") + + with pytest.raises(ValidationError): + ApplicationSettings.create_from_envs() def test_invalid_instance_names( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py index 6578f5169d6..0b1e6a4c5a5 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/ec2_instances.py @@ -1,3 +1,5 @@ +from typing import Literal + from aws_library.ec2 import EC2InstanceType from fastapi import FastAPI from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceTypeGet @@ -10,7 +12,7 @@ @router.expose() async def get_instance_type_details( - app: FastAPI, *, instance_type_names: set[str] + app: FastAPI, *, instance_type_names: set[str] | Literal["ALL"] ) -> list[EC2InstanceTypeGet]: instance_capabilities: list[EC2InstanceType] = await get_ec2_client( app diff --git a/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py index f4eea132cdf..a833d13743e 100644 --- a/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py +++ b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py @@ -37,7 +37,7 @@ async def test_get_instance_type_details_all_options( # an empty set returns all options rpc_response = await get_instance_type_details( - clusters_keeper_rabbitmq_rpc_client, instance_type_names=set() + clusters_keeper_rabbitmq_rpc_client, instance_type_names="ALL" ) assert rpc_response assert isinstance(rpc_response, list) diff --git a/services/datcore-adapter/.cookiecutterrc b/services/datcore-adapter/.cookiecutterrc deleted file mode 100644 index bee7aaa59cf..00000000000 --- a/services/datcore-adapter/.cookiecutterrc +++ /dev/null @@ -1,19 +0,0 @@ -# This file exists so you can easily regenerate your project. -# -# cookiecutter --overwrite-if-exists --config-file=.cookiecutterrc /home/anderegg/dev/github/cookiecutter-simcore-py-fastapi -# - -default_context: - - _extensions: ['jinja2_time.TimeExtension'] - _template: '/home/anderegg/dev/github/cookiecutter-simcore-py-fastapi' - detailed_doc: 'n' - distribution_name: 'simcore-service-datcore-adapter' - full_name: 'Sylvain Anderegg' - github_username: 'sanderegg' - package_name: 'simcore_service_datcore_adapter' - project_name: 'datcore-adapter' - project_short_description: 'Interfaces with datcore storage' - project_slug: 'datcore-adapter' - version: '0.1.0-alpha' - year: '2021'