diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index f430ba508bc..f86c555a253 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -386,13 +386,18 @@ async def _activate_drained_nodes( if node.assigned_tasks ] - # activate these nodes now - await asyncio.gather( - *( - _activate_and_notify(app, auto_scaling_mode, node) - for node in nodes_to_activate + if not nodes_to_activate: + return cluster + + with log_context( + _logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes" + ): + await asyncio.gather( + *( + _activate_and_notify(app, auto_scaling_mode, node) + for node in nodes_to_activate + ) ) - ) new_active_node_ids = {node.ec2_instance.id for node in nodes_to_activate} remaining_drained_nodes = [ node @@ -421,12 +426,17 @@ async def _start_buffer_instances( if not instances_to_start: return cluster # change the buffer machine to an active one - await get_ec2_client(app).set_instances_tags( - instances_to_start, - tags=get_activated_buffer_ec2_tags(app, auto_scaling_mode), - ) + with log_context( + _logger, logging.INFO, f"start {len(instances_to_start)} buffer machines" + ): + await get_ec2_client(app).set_instances_tags( + instances_to_start, + tags=get_activated_buffer_ec2_tags(app, auto_scaling_mode), + ) - started_instances = await get_ec2_client(app).start_instances(instances_to_start) + started_instances = await get_ec2_client(app).start_instances( + instances_to_start + ) started_instance_ids = [i.id for i in started_instances] return dataclasses.replace( @@ -615,10 +625,10 @@ async def _find_needed_instances( _logger.exception("Unexpected error:") _logger.info( - "found following needed %s instances: %s", + "found following %s needed instances: %s", len(needed_new_instance_types_for_tasks), [ - f"{i.instance_type.name=}:{i.instance_type.resources} with {len(i.assigned_tasks)} tasks" + f"{i.instance_type.name}:{i.instance_type.resources} takes {len(i.assigned_tasks)} task{'s' if len(i.assigned_tasks)>1 else ''}" for i in needed_new_instance_types_for_tasks ], ) @@ -810,39 +820,6 @@ async def _launch_instances( return new_pending_instances -async def _scale_up_cluster( - app: FastAPI, - cluster: Cluster, - unassigned_tasks: list, - auto_scaling_mode: BaseAutoscaling, - allowed_instance_types: list[EC2InstanceType], -) -> Cluster: - app_settings: ApplicationSettings = app.state.settings - assert app_settings.AUTOSCALING_EC2_ACCESS # nosec - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - - # let's start these - if needed_ec2_instances := await _find_needed_instances( - app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode - ): - await auto_scaling_mode.log_message_from_tasks( - app, - unassigned_tasks, - "service is pending due to missing resources, scaling up cluster now...", - level=logging.INFO, - ) - new_pending_instances = await _launch_instances( - app, needed_ec2_instances, unassigned_tasks, auto_scaling_mode - ) - cluster.pending_ec2s.extend( - [NonAssociatedInstance(ec2_instance=i) for i in new_pending_instances] - ) - # NOTE: to check the logs of UserData in EC2 instance - # run: tail -f -n 1000 /var/log/cloud-init-output.log in the instance - - return cluster - - async def _find_drainable_nodes( app: FastAPI, cluster: Cluster ) -> list[AssociatedInstance]: @@ -898,23 +875,25 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: if not active_empty_instances: return cluster - # drain this empty nodes - updated_nodes: list[Node] = await asyncio.gather( - *( - utils_docker.set_node_osparc_ready( - app_settings, - docker_client, - node.node, - ready=False, + with log_context( + _logger, logging.INFO, f"drain {len(active_empty_instances)} empty nodes" + ): + updated_nodes: list[Node] = await asyncio.gather( + *( + utils_docker.set_node_osparc_ready( + app_settings, + docker_client, + node.node, + ready=False, + ) + for node in active_empty_instances ) - for node in active_empty_instances - ) - ) - if updated_nodes: - _logger.info( - "following nodes were set to drain: '%s'", - f"{[node.description.hostname for node in updated_nodes if node.description]}", ) + if updated_nodes: + _logger.info( + "following nodes were set to drain: '%s'", + f"{[node.description.hostname for node in updated_nodes if node.description]}", + ) newly_drained_instances = [ AssociatedInstance(node=node, ec2_instance=instance.ec2_instance) for instance, node in zip(active_empty_instances, updated_nodes, strict=True) @@ -1021,10 +1000,15 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster: if i.ec2_instance.id not in (new_terminating_instance_ids + terminated_instance_ids) ] + still_terminating_nodes = [ + i + for i in cluster.terminating_nodes + if i.ec2_instance.id not in terminated_instance_ids + ] return dataclasses.replace( cluster, drained_nodes=still_drained_nodes, - terminating_nodes=cluster.terminating_nodes + new_terminating_instances, + terminating_nodes=still_terminating_nodes + new_terminating_instances, terminated_instances=cluster.terminated_instances + [ NonAssociatedInstance(ec2_instance=i.ec2_instance) @@ -1119,7 +1103,7 @@ async def _drain_retired_nodes( ) -async def _scale_down_unused_cluster_machines( +async def _scale_down_unused_cluster_instances( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling, @@ -1129,57 +1113,89 @@ async def _scale_down_unused_cluster_machines( return await _try_scale_down_cluster(app, cluster) -async def _autoscale_cluster( +async def _scale_up_cluster( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling, allowed_instance_types: list[EC2InstanceType], + unassigned_tasks: list, ) -> Cluster: - # 1. check if we have pending tasks - unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app) - _logger.info("found %s unrunnable tasks", len(unrunnable_tasks)) - # NOTE: this function predicts how the backend will assign tasks - queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster( - app, unrunnable_tasks, cluster, auto_scaling_mode - ) - # 2. try to activate drained nodes to cover some of the tasks - cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode) - - # 3. start buffer instances to cover the remaining tasks - cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode) - - # 4. scale down unused machines - cluster = await _scale_down_unused_cluster_machines(app, cluster, auto_scaling_mode) - - # 4. let's check if there are still pending tasks or if the reserve was used app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + if not unassigned_tasks and ( + len(cluster.buffer_drained_nodes) + >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + ): + return cluster + if ( - queued_or_missing_instance_tasks - or ( - len(cluster.buffer_drained_nodes) - < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - ) - and ( - cluster.total_number_of_machines() - < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES - ) + cluster.total_number_of_machines() + >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES ): _logger.info( - "%s unrunnable tasks could not be assigned, slowly trying to scale up...", - len(queued_or_missing_instance_tasks), + "cluster already hit the maximum allowed amount of instances (%s), not scaling up. " + "%s tasks will wait until instances are free.", + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + len(unassigned_tasks), ) - cluster = await _scale_up_cluster( + return cluster + + # now we scale up + assert app_settings.AUTOSCALING_EC2_ACCESS # nosec + + # let's start these + if needed_ec2_instances := await _find_needed_instances( + app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode + ): + await auto_scaling_mode.log_message_from_tasks( app, - cluster, - queued_or_missing_instance_tasks, - auto_scaling_mode, - allowed_instance_types, + unassigned_tasks, + "service is pending due to missing resources, scaling up cluster now...", + level=logging.INFO, + ) + new_pending_instances = await _launch_instances( + app, needed_ec2_instances, unassigned_tasks, auto_scaling_mode + ) + cluster.pending_ec2s.extend( + [NonAssociatedInstance(ec2_instance=i) for i in new_pending_instances] ) + # NOTE: to check the logs of UserData in EC2 instance + # run: tail -f -n 1000 /var/log/cloud-init-output.log in the instance return cluster +async def _autoscale_cluster( + app: FastAPI, + cluster: Cluster, + auto_scaling_mode: BaseAutoscaling, + allowed_instance_types: list[EC2InstanceType], +) -> Cluster: + # 1. check if we have pending tasks + unnasigned_pending_tasks = await auto_scaling_mode.list_unrunnable_tasks(app) + _logger.info("found %s pending tasks", len(unnasigned_pending_tasks)) + # NOTE: this function predicts how the backend will assign tasks + still_pending_tasks, cluster = await _assign_tasks_to_current_cluster( + app, unnasigned_pending_tasks, cluster, auto_scaling_mode + ) + + # 2. activate available drained nodes to cover some of the tasks + cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode) + + # 3. start buffer instances to cover the remaining tasks + cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode) + + # 4. scale down unused instances + cluster = await _scale_down_unused_cluster_instances( + app, cluster, auto_scaling_mode + ) + + # 5. scale up if necessary + return await _scale_up_cluster( + app, cluster, auto_scaling_mode, allowed_instance_types, still_pending_tasks + ) + + async def _notify_autoscaling_status( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling ) -> None: