Skip to content

Commit

Permalink
improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 11, 2024
1 parent 0de1bed commit 3a40819
Showing 1 changed file with 113 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,18 @@ async def _activate_drained_nodes(
if node.assigned_tasks
]

# activate these nodes now
await asyncio.gather(
*(
_activate_and_notify(app, auto_scaling_mode, node)
for node in nodes_to_activate
if not nodes_to_activate:
return cluster

with log_context(
_logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes"
):
await asyncio.gather(
*(
_activate_and_notify(app, auto_scaling_mode, node)
for node in nodes_to_activate
)
)
)
new_active_node_ids = {node.ec2_instance.id for node in nodes_to_activate}
remaining_drained_nodes = [
node
Expand Down Expand Up @@ -421,12 +426,17 @@ async def _start_buffer_instances(
if not instances_to_start:
return cluster
# change the buffer machine to an active one
await get_ec2_client(app).set_instances_tags(
instances_to_start,
tags=get_activated_buffer_ec2_tags(app, auto_scaling_mode),
)
with log_context(
_logger, logging.INFO, f"start {len(instances_to_start)} buffer machines"
):
await get_ec2_client(app).set_instances_tags(
instances_to_start,
tags=get_activated_buffer_ec2_tags(app, auto_scaling_mode),
)

started_instances = await get_ec2_client(app).start_instances(instances_to_start)
started_instances = await get_ec2_client(app).start_instances(
instances_to_start
)
started_instance_ids = [i.id for i in started_instances]

return dataclasses.replace(
Expand Down Expand Up @@ -615,10 +625,10 @@ async def _find_needed_instances(
_logger.exception("Unexpected error:")

_logger.info(
"found following needed %s instances: %s",
"found following %s needed instances: %s",
len(needed_new_instance_types_for_tasks),
[
f"{i.instance_type.name=}:{i.instance_type.resources} with {len(i.assigned_tasks)} tasks"
f"{i.instance_type.name}:{i.instance_type.resources} takes {len(i.assigned_tasks)} task{'s' if len(i.assigned_tasks)>1 else ''}"
for i in needed_new_instance_types_for_tasks
],
)
Expand Down Expand Up @@ -810,39 +820,6 @@ async def _launch_instances(
return new_pending_instances


async def _scale_up_cluster(
app: FastAPI,
cluster: Cluster,
unassigned_tasks: list,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
) -> Cluster:
app_settings: ApplicationSettings = app.state.settings
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

# let's start these
if needed_ec2_instances := await _find_needed_instances(
app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode
):
await auto_scaling_mode.log_message_from_tasks(
app,
unassigned_tasks,
"service is pending due to missing resources, scaling up cluster now...",
level=logging.INFO,
)
new_pending_instances = await _launch_instances(
app, needed_ec2_instances, unassigned_tasks, auto_scaling_mode
)
cluster.pending_ec2s.extend(
[NonAssociatedInstance(ec2_instance=i) for i in new_pending_instances]
)
# NOTE: to check the logs of UserData in EC2 instance
# run: tail -f -n 1000 /var/log/cloud-init-output.log in the instance

return cluster


async def _find_drainable_nodes(
app: FastAPI, cluster: Cluster
) -> list[AssociatedInstance]:
Expand Down Expand Up @@ -898,23 +875,25 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
if not active_empty_instances:
return cluster

# drain this empty nodes
updated_nodes: list[Node] = await asyncio.gather(
*(
utils_docker.set_node_osparc_ready(
app_settings,
docker_client,
node.node,
ready=False,
with log_context(
_logger, logging.INFO, f"drain {len(active_empty_instances)} empty nodes"
):
updated_nodes: list[Node] = await asyncio.gather(
*(
utils_docker.set_node_osparc_ready(
app_settings,
docker_client,
node.node,
ready=False,
)
for node in active_empty_instances
)
for node in active_empty_instances
)
)
if updated_nodes:
_logger.info(
"following nodes were set to drain: '%s'",
f"{[node.description.hostname for node in updated_nodes if node.description]}",
)
if updated_nodes:
_logger.info(
"following nodes were set to drain: '%s'",
f"{[node.description.hostname for node in updated_nodes if node.description]}",
)
newly_drained_instances = [
AssociatedInstance(node=node, ec2_instance=instance.ec2_instance)
for instance, node in zip(active_empty_instances, updated_nodes, strict=True)
Expand Down Expand Up @@ -1021,10 +1000,15 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster:
if i.ec2_instance.id
not in (new_terminating_instance_ids + terminated_instance_ids)
]
still_terminating_nodes = [
i
for i in cluster.terminating_nodes
if i.ec2_instance.id not in terminated_instance_ids
]
return dataclasses.replace(
cluster,
drained_nodes=still_drained_nodes,
terminating_nodes=cluster.terminating_nodes + new_terminating_instances,
terminating_nodes=still_terminating_nodes + new_terminating_instances,
terminated_instances=cluster.terminated_instances
+ [
NonAssociatedInstance(ec2_instance=i.ec2_instance)
Expand Down Expand Up @@ -1119,7 +1103,7 @@ async def _drain_retired_nodes(
)


async def _scale_down_unused_cluster_machines(
async def _scale_down_unused_cluster_instances(
app: FastAPI,
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
Expand All @@ -1129,57 +1113,89 @@ async def _scale_down_unused_cluster_machines(
return await _try_scale_down_cluster(app, cluster)


async def _autoscale_cluster(
async def _scale_up_cluster(
app: FastAPI,
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
unassigned_tasks: list,
) -> Cluster:
# 1. check if we have pending tasks
unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
_logger.info("found %s unrunnable tasks", len(unrunnable_tasks))
# NOTE: this function predicts how the backend will assign tasks
queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster(
app, unrunnable_tasks, cluster, auto_scaling_mode
)
# 2. try to activate drained nodes to cover some of the tasks
cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode)

# 3. start buffer instances to cover the remaining tasks
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)

# 4. scale down unused machines
cluster = await _scale_down_unused_cluster_machines(app, cluster, auto_scaling_mode)

# 4. let's check if there are still pending tasks or if the reserve was used
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
if not unassigned_tasks and (
len(cluster.buffer_drained_nodes)
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
):
return cluster

if (
queued_or_missing_instance_tasks
or (
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
)
and (
cluster.total_number_of_machines()
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
)
cluster.total_number_of_machines()
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
):
_logger.info(
"%s unrunnable tasks could not be assigned, slowly trying to scale up...",
len(queued_or_missing_instance_tasks),
"cluster already hit the maximum allowed amount of instances (%s), not scaling up. "
"%s tasks will wait until instances are free.",
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES,
len(unassigned_tasks),
)
cluster = await _scale_up_cluster(
return cluster

# now we scale up
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec

# let's start these
if needed_ec2_instances := await _find_needed_instances(
app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode
):
await auto_scaling_mode.log_message_from_tasks(
app,
cluster,
queued_or_missing_instance_tasks,
auto_scaling_mode,
allowed_instance_types,
unassigned_tasks,
"service is pending due to missing resources, scaling up cluster now...",
level=logging.INFO,
)
new_pending_instances = await _launch_instances(
app, needed_ec2_instances, unassigned_tasks, auto_scaling_mode
)
cluster.pending_ec2s.extend(
[NonAssociatedInstance(ec2_instance=i) for i in new_pending_instances]
)
# NOTE: to check the logs of UserData in EC2 instance
# run: tail -f -n 1000 /var/log/cloud-init-output.log in the instance

return cluster


async def _autoscale_cluster(
app: FastAPI,
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
) -> Cluster:
# 1. check if we have pending tasks
unnasigned_pending_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
_logger.info("found %s pending tasks", len(unnasigned_pending_tasks))
# NOTE: this function predicts how the backend will assign tasks
still_pending_tasks, cluster = await _assign_tasks_to_current_cluster(
app, unnasigned_pending_tasks, cluster, auto_scaling_mode
)

# 2. activate available drained nodes to cover some of the tasks
cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode)

# 3. start buffer instances to cover the remaining tasks
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)

# 4. scale down unused instances
cluster = await _scale_down_unused_cluster_instances(
app, cluster, auto_scaling_mode
)

# 5. scale up if necessary
return await _scale_up_cluster(
app, cluster, auto_scaling_mode, allowed_instance_types, still_pending_tasks
)


async def _notify_autoscaling_status(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
) -> None:
Expand Down

0 comments on commit 3a40819

Please sign in to comment.