Skip to content

Commit

Permalink
šŸ›Autoscaling: Fixes return value of Docker node activation (ITISFoundā€¦
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Dec 11, 2024
1 parent 08981e0 commit a612a27
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,10 @@ async def _activate_and_notify(
app: FastAPI,
auto_scaling_mode: BaseAutoscaling,
drained_node: AssociatedInstance,
) -> None:
) -> AssociatedInstance:
app_settings = get_application_settings(app)
docker_client = get_docker_client(app)
await asyncio.gather(
updated_node, *_ = await asyncio.gather(
utils_docker.set_node_osparc_ready(
app_settings, docker_client, drained_node.node, ready=True
),
Expand All @@ -373,6 +373,7 @@ async def _activate_and_notify(
app, drained_node.assigned_tasks, progress=1.0
),
)
return dataclasses.replace(drained_node, node=updated_node)


async def _activate_drained_nodes(
Expand All @@ -392,13 +393,13 @@ async def _activate_drained_nodes(
with log_context(
_logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes"
):
await asyncio.gather(
activated_nodes = await asyncio.gather(
*(
_activate_and_notify(app, auto_scaling_mode, node)
for node in nodes_to_activate
)
)
new_active_node_ids = {node.ec2_instance.id for node in nodes_to_activate}
new_active_node_ids = {node.ec2_instance.id for node in activated_nodes}
remaining_drained_nodes = [
node
for node in cluster.drained_nodes
Expand All @@ -411,7 +412,7 @@ async def _activate_drained_nodes(
]
return dataclasses.replace(
cluster,
active_nodes=cluster.active_nodes + nodes_to_activate,
active_nodes=cluster.active_nodes + activated_nodes,
drained_nodes=remaining_drained_nodes,
buffer_drained_nodes=remaining_reserved_drained_nodes,
)
Expand Down Expand Up @@ -878,7 +879,7 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
with log_context(
_logger, logging.INFO, f"drain {len(active_empty_instances)} empty nodes"
):
updated_nodes: list[Node] = await asyncio.gather(
updated_nodes = await asyncio.gather(
*(
utils_docker.set_node_osparc_ready(
app_settings,
Expand Down Expand Up @@ -1076,7 +1077,7 @@ async def _drain_retired_nodes(
app_settings = get_application_settings(app)
docker_client = get_docker_client(app)
# drain this empty nodes
updated_nodes: list[Node] = await asyncio.gather(
updated_nodes = await asyncio.gather(
*(
utils_docker.set_node_osparc_ready(
app_settings,
Expand Down Expand Up @@ -1173,7 +1174,11 @@ async def _autoscale_cluster(
) -> Cluster:
# 1. check if we have pending tasks
unnasigned_pending_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
_logger.info("found %s pending tasks", len(unnasigned_pending_tasks))
_logger.info(
"found %s pending task%s",
len(unnasigned_pending_tasks),
"s" if len(unnasigned_pending_tasks) > 1 else "",
)
# NOTE: this function predicts how the backend will assign tasks
still_pending_tasks, cluster = await _assign_tasks_to_current_cluster(
app, unnasigned_pending_tasks, cluster, auto_scaling_mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]:
_scheduler_url(app), _scheduler_auth(app)
)
# NOTE: any worker "processing" more than 1 task means that the other tasks are queued!
# NOTE: that is not necessarily true, in cases where 1 worker takes multiple tasks?? (osparc.io)
processing_tasks_by_worker = await dask.list_processing_tasks_per_worker(
_scheduler_url(app), _scheduler_auth(app)
)
queued_tasks = []
for tasks in processing_tasks_by_worker.values():
queued_tasks += tasks[1:]
_logger.debug(
"found %s unrunnable tasks and %s potentially queued tasks",
"found %s pending tasks and %s potentially queued tasks",
len(unrunnable_tasks),
len(queued_tasks),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,10 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]:
available=with_drain_nodes_labelled,
)
# update our fake node
fake_attached_node.spec.labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true"
fake_attached_node.spec.labels[
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
] = mock_docker_tag_node.call_args_list[0][1]["tags"][
] = mock_docker_tag_node.call_args_list[2][1]["tags"][
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
]
# check the activate time is later than attach time
Expand All @@ -590,13 +591,15 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]:
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
]
)
fake_attached_node.spec.availability = Availability.active
mock_compute_node_used_resources.assert_called_once_with(
get_docker_client(initialized_app),
fake_attached_node,
)
mock_compute_node_used_resources.reset_mock()
# check activate call
assert mock_docker_tag_node.call_args_list[1] == mock.call(

assert mock_docker_tag_node.call_args_list[2] == mock.call(
get_docker_client(initialized_app),
fake_attached_node,
tags=fake_node.spec.labels
Expand Down Expand Up @@ -1766,7 +1769,17 @@ async def test__activate_drained_nodes_with_drained_node(
updated_cluster = await _activate_drained_nodes(
initialized_app, cluster_with_drained_nodes, DynamicAutoscaling()
)
assert updated_cluster.active_nodes == cluster_with_drained_nodes.drained_nodes
# they are the same nodes, but the availability might have changed here
assert updated_cluster.active_nodes != cluster_with_drained_nodes.drained_nodes
assert (
updated_cluster.active_nodes[0].assigned_tasks
== cluster_with_drained_nodes.drained_nodes[0].assigned_tasks
)
assert (
updated_cluster.active_nodes[0].ec2_instance
== cluster_with_drained_nodes.drained_nodes[0].ec2_instance
)

assert drained_host_node.spec
mock_docker_tag_node.assert_called_once_with(
mock.ANY,
Expand Down

0 comments on commit a612a27

Please sign in to comment.