diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index f86c555a253..e2212195aed 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -356,10 +356,10 @@ async def _activate_and_notify( app: FastAPI, auto_scaling_mode: BaseAutoscaling, drained_node: AssociatedInstance, -) -> None: +) -> AssociatedInstance: app_settings = get_application_settings(app) docker_client = get_docker_client(app) - await asyncio.gather( + updated_node, *_ = await asyncio.gather( utils_docker.set_node_osparc_ready( app_settings, docker_client, drained_node.node, ready=True ), @@ -373,6 +373,7 @@ async def _activate_and_notify( app, drained_node.assigned_tasks, progress=1.0 ), ) + return dataclasses.replace(drained_node, node=updated_node) async def _activate_drained_nodes( @@ -392,13 +393,13 @@ async def _activate_drained_nodes( with log_context( _logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes" ): - await asyncio.gather( + activated_nodes = await asyncio.gather( *( _activate_and_notify(app, auto_scaling_mode, node) for node in nodes_to_activate ) ) - new_active_node_ids = {node.ec2_instance.id for node in nodes_to_activate} + new_active_node_ids = {node.ec2_instance.id for node in activated_nodes} remaining_drained_nodes = [ node for node in cluster.drained_nodes @@ -411,7 +412,7 @@ async def _activate_drained_nodes( ] return dataclasses.replace( cluster, - active_nodes=cluster.active_nodes + nodes_to_activate, + active_nodes=cluster.active_nodes + activated_nodes, drained_nodes=remaining_drained_nodes, buffer_drained_nodes=remaining_reserved_drained_nodes, ) @@ -878,7 +879,7 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: with log_context( _logger, logging.INFO, f"drain {len(active_empty_instances)} empty nodes" ): - updated_nodes: list[Node] = await asyncio.gather( + updated_nodes = await asyncio.gather( *( utils_docker.set_node_osparc_ready( app_settings, @@ -1076,7 +1077,7 @@ async def _drain_retired_nodes( app_settings = get_application_settings(app) docker_client = get_docker_client(app) # drain this empty nodes - updated_nodes: list[Node] = await asyncio.gather( + updated_nodes = await asyncio.gather( *( utils_docker.set_node_osparc_ready( app_settings, @@ -1173,7 +1174,11 @@ async def _autoscale_cluster( ) -> Cluster: # 1. check if we have pending tasks unnasigned_pending_tasks = await auto_scaling_mode.list_unrunnable_tasks(app) - _logger.info("found %s pending tasks", len(unnasigned_pending_tasks)) + _logger.info( + "found %s pending task%s", + len(unnasigned_pending_tasks), + "s" if len(unnasigned_pending_tasks) > 1 else "", + ) # NOTE: this function predicts how the backend will assign tasks still_pending_tasks, cluster = await _assign_tasks_to_current_cluster( app, unnasigned_pending_tasks, cluster, auto_scaling_mode diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index a632afe956e..6a133e565cb 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -69,6 +69,7 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: _scheduler_url(app), _scheduler_auth(app) ) # NOTE: any worker "processing" more than 1 task means that the other tasks are queued! + # NOTE: that is not necessarily true, in cases where 1 worker takes multiple tasks?? (osparc.io) processing_tasks_by_worker = await dask.list_processing_tasks_per_worker( _scheduler_url(app), _scheduler_auth(app) ) @@ -76,7 +77,7 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: for tasks in processing_tasks_by_worker.values(): queued_tasks += tasks[1:] _logger.debug( - "found %s unrunnable tasks and %s potentially queued tasks", + "found %s pending tasks and %s potentially queued tasks", len(unrunnable_tasks), len(queued_tasks), ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 4aa6c302fca..ccdb2461c04 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -575,9 +575,10 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: available=with_drain_nodes_labelled, ) # update our fake node + fake_attached_node.spec.labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" fake_attached_node.spec.labels[ _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY - ] = mock_docker_tag_node.call_args_list[0][1]["tags"][ + ] = mock_docker_tag_node.call_args_list[2][1]["tags"][ _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY ] # check the activate time is later than attach time @@ -590,13 +591,15 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY ] ) + fake_attached_node.spec.availability = Availability.active mock_compute_node_used_resources.assert_called_once_with( get_docker_client(initialized_app), fake_attached_node, ) mock_compute_node_used_resources.reset_mock() # check activate call - assert mock_docker_tag_node.call_args_list[1] == mock.call( + + assert mock_docker_tag_node.call_args_list[2] == mock.call( get_docker_client(initialized_app), fake_attached_node, tags=fake_node.spec.labels @@ -1766,7 +1769,17 @@ async def test__activate_drained_nodes_with_drained_node( updated_cluster = await _activate_drained_nodes( initialized_app, cluster_with_drained_nodes, DynamicAutoscaling() ) - assert updated_cluster.active_nodes == cluster_with_drained_nodes.drained_nodes + # they are the same nodes, but the availability might have changed here + assert updated_cluster.active_nodes != cluster_with_drained_nodes.drained_nodes + assert ( + updated_cluster.active_nodes[0].assigned_tasks + == cluster_with_drained_nodes.drained_nodes[0].assigned_tasks + ) + assert ( + updated_cluster.active_nodes[0].ec2_instance + == cluster_with_drained_nodes.drained_nodes[0].ec2_instance + ) + assert drained_host_node.spec mock_docker_tag_node.assert_called_once_with( mock.ANY,