Skip to content

Commit

Permalink
ongoing test
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 3, 2024
1 parent 4404ac9 commit 7004721
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1116,16 +1116,26 @@ async def _drain_retired_nodes(
)


async def _scale_down_unused_cluster_machines(
app: FastAPI,
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
) -> Cluster:
await auto_scaling_mode.try_retire_nodes(app)
cluster = await _deactivate_empty_nodes(app, cluster)
return await _try_scale_down_cluster(app, cluster)


async def _autoscale_cluster(
app: FastAPI,
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
) -> Cluster:
# 1. check if we have pending tasks and resolve them by activating some drained nodes
# 1. check if we have pending tasks
unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
_logger.info("found %s unrunnable tasks", len(unrunnable_tasks))
# NOTE: this function predicts how dask will assign a task to a machine
# NOTE: this function predicts how the backend will assign tasks
queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster(
app, unrunnable_tasks, cluster, auto_scaling_mode
)
Expand All @@ -1135,41 +1145,34 @@ async def _autoscale_cluster(
# 3. start buffer instances to cover the remaining tasks
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)

# 4. scale down unused machines
cluster = await _scale_down_unused_cluster_machines(app, cluster, auto_scaling_mode)

# 4. let's check if there are still pending tasks or if the reserve was used
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
if queued_or_missing_instance_tasks or (
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
):
if (
if (
queued_or_missing_instance_tasks
or (
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
)
and (
cluster.total_number_of_machines()
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
):
_logger.info(
"%s unrunnable tasks could not be assigned, slowly trying to scale up...",
len(queued_or_missing_instance_tasks),
)
cluster = await _scale_up_cluster(
app,
cluster,
queued_or_missing_instance_tasks,
auto_scaling_mode,
allowed_instance_types,
)

elif (
len(queued_or_missing_instance_tasks) == len(unrunnable_tasks) == 0
and cluster.can_scale_down()
)
):
_logger.info(
"there is %s waiting task, slowly and gracefully scaling down...",
"%s unrunnable tasks could not be assigned, slowly trying to scale up...",
len(queued_or_missing_instance_tasks),
)
# NOTE: we only scale down in case we did not just scale up. The swarm needs some time to adjust
await auto_scaling_mode.try_retire_nodes(app)
cluster = await _deactivate_empty_nodes(app, cluster)
cluster = await _try_scale_down_cluster(app, cluster)
cluster = await _scale_up_cluster(
app,
cluster,
queued_or_missing_instance_tasks,
auto_scaling_mode,
allowed_instance_types,
)

return cluster

Expand Down
108 changes: 94 additions & 14 deletions services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,20 @@ async def test_cluster_scaling_up_starts_multiple_instances(
mock_rabbitmq_post_message.reset_mock()


@pytest.fixture
async def mocked_associate_ec2_instances_with_nodes(mocker: MockerFixture) -> mock.Mock:
async def _(
nodes: list[Node], ec2_instances: list[EC2InstanceData]
) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]:
return [], ec2_instances

return mocker.patch(
"simcore_service_autoscaling.modules.auto_scaling_core.associate_ec2_instances_with_nodes",
autospec=True,
side_effect=_,
)


@pytest.mark.parametrize(
"with_docker_join_drained", ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], indirect=True
)
Expand Down Expand Up @@ -1240,6 +1254,11 @@ async def test_cluster_adapts_machines_on_the_fly(
async_docker_client: aiodocker.Docker,
scale_up_params1: _ScaleUpParams,
scale_up_params2: _ScaleUpParams,
mocked_associate_ec2_instances_with_nodes: mock.Mock,
create_fake_node: Callable[..., Node],
mock_docker_tag_node: mock.Mock,
mock_compute_node_used_resources: mock.Mock,
spied_cluster_analysis: MockType,
):
# pre-requisites
assert app_settings.AUTOSCALING_EC2_INSTANCES
Expand Down Expand Up @@ -1276,20 +1295,73 @@ async def test_cluster_adapts_machines_on_the_fly(
for _ in range(scale_up_params1.num_services)
)
)
for _ in range(3):
# it will only scale once and do nothing else
await auto_scale_cluster(
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
)
await assert_autoscaled_dynamic_ec2_instances(
ec2_client,
expected_num_reservations=1,
expected_num_instances=scale_up_params1.expected_num_instances,
expected_instance_type=scale_up_params1.expected_instance_type,
expected_instance_state="running",
expected_additional_tag_keys=list(ec2_instance_custom_tags),
instance_filters=instance_type_filters,
)

# it will only scale once and do nothing else
await auto_scale_cluster(
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
)
await assert_autoscaled_dynamic_ec2_instances(
ec2_client,
expected_num_reservations=1,
expected_num_instances=scale_up_params1.expected_num_instances,
expected_instance_type=scale_up_params1.expected_instance_type,
expected_instance_state="running",
expected_additional_tag_keys=list(ec2_instance_custom_tags),
instance_filters=instance_type_filters,
)
_assert_cluster_state(
spied_cluster_analysis,
expected_calls=1,
expected_num_machines=0,
)
mocked_associate_ec2_instances_with_nodes.assert_called_once_with([], [])
mocked_associate_ec2_instances_with_nodes.reset_mock()

fake_node_to_instance_map = {}

async def _fake_node_creator(
nodes: list[Node], ec2_instances: list[EC2InstanceData]
) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]:
def _create_fake_node_with_labels(instance: EC2InstanceData) -> Node:
if instance not in fake_node_to_instance_map:
fake_node = create_fake_node()
assert fake_node.spec
fake_node.spec.availability = Availability.active
assert fake_node.status
fake_node.status.state = NodeState.ready
assert fake_node.spec.labels
fake_node.spec.labels |= {
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: arrow.utcnow().isoformat(),
_OSPARC_SERVICE_READY_LABEL_KEY: "true",
}
fake_node_to_instance_map[instance] = fake_node
return fake_node_to_instance_map[instance]

associated_instances = [
AssociatedInstance(node=_create_fake_node_with_labels(i), ec2_instance=i)
for i in ec2_instances
]

return associated_instances, []

mocked_associate_ec2_instances_with_nodes.side_effect = _fake_node_creator

#
# 2. now the machines are associated
await auto_scale_cluster(
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
)
_assert_cluster_state(
spied_cluster_analysis,
expected_calls=1,
expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES,
)
mocked_associate_ec2_instances_with_nodes.assert_called_once()
mock_docker_tag_node.assert_called()
assert (
mock_docker_tag_node.call_count
== app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
)

#
# 2. now we start the second batch of services requiring a different type of machines
Expand Down Expand Up @@ -1330,6 +1402,10 @@ async def test_cluster_adapts_machines_on_the_fly(
instance_filters=instance_type_filters,
)

assert isinstance(spied_cluster_analysis.spy_return, Cluster)
assert spied_cluster_analysis.spy_return.active_nodes
assert not spied_cluster_analysis.spy_return.drained_nodes

# now we simulate that some of the services in the 1st batch have completed and that we are 1 below the max
# a machine should switch off and another type should be started
completed_services_to_stop = random.sample(
Expand All @@ -1350,6 +1426,10 @@ async def test_cluster_adapts_machines_on_the_fly(
await auto_scale_cluster(
app=initialized_app, auto_scaling_mode=DynamicAutoscaling()
)

assert spied_cluster_analysis.spy_return.active_nodes
assert not spied_cluster_analysis.spy_return.drained_nodes

all_instances = await ec2_client.describe_instances()
assert len(all_instances["Reservations"]) == 2, "there should be 2 Reservations"
reservation1 = all_instances["Reservations"][0]
Expand Down

0 comments on commit 7004721

Please sign in to comment.