Skip to content

Commit

Permalink
šŸ›Autoscaling: Warm buffers do not replace hot buffers (ITISFoundationā€¦
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Dec 16, 2024
1 parent 4eeaa5c commit a7d1e3a
Show file tree
Hide file tree
Showing 11 changed files with 664 additions and 226 deletions.
10 changes: 5 additions & 5 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ flag_management:
statuses:
- type: project
target: auto
threshold: 1%
threshold: 2%
- type: patch
target: auto
threshold: 1%
threshold: 2%


component_management:
Expand All @@ -22,7 +22,7 @@ component_management:
statuses:
- type: project
target: auto
threshold: 1%
threshold: 2%
branches:
- "!master"
individual_components:
Expand Down Expand Up @@ -116,12 +116,12 @@ coverage:
project:
default:
informational: true
threshold: 1%
threshold: 2%

patch:
default:
informational: true
threshold: 1%
threshold: 2%

comment:
layout: "header,diff,flags,components,footer"
Expand Down
24 changes: 12 additions & 12 deletions .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/catalog.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -879,7 +879,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/datcore-adapter.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -930,7 +930,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/director.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -981,7 +981,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/director-v2.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -1910,7 +1910,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/webserver.bash test 01
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -1974,7 +1974,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/webserver.bash test 02
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2038,7 +2038,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/director-v2.bash test 01
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2111,7 +2111,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/director-v2.bash test 02
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2177,7 +2177,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/dynamic-sidecar.bash test 01
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2241,7 +2241,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/simcore-sdk.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2330,7 +2330,7 @@ jobs:
- name: test
run: ./ci/github/system-testing/public-api.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2395,7 +2395,7 @@ jobs:
name: ${{ github.job }}_services_settings_schemas
path: ./services/**/settings-schema.json
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ async def assert_autoscaled_dynamic_ec2_instances(
expected_instance_state: InstanceStateNameType,
expected_additional_tag_keys: list[str],
instance_filters: Sequence[FilterTypeDef] | None,
expected_user_data: list[str] | None = None,
) -> list[InstanceTypeDef]:
if expected_user_data is None:
expected_user_data = ["docker swarm join"]
return await assert_ec2_instances(
ec2_client,
expected_num_reservations=expected_num_reservations,
Expand All @@ -54,7 +57,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
"io.simcore.autoscaling.monitored_services_labels",
*expected_additional_tag_keys,
],
expected_user_data=["docker swarm join"],
expected_user_data=expected_user_data,
instance_filters=instance_filters,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,43 @@ async def _activate_drained_nodes(
)


async def _start_buffer_instances(
async def _start_warm_buffer_instances(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
) -> Cluster:
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""

app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

instances_to_start = [
i.ec2_instance for i in cluster.buffer_ec2s if i.assigned_tasks
]

if (
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
):
# check if we can migrate warm buffers to hot buffers
hot_buffer_instance_type = cast(
InstanceTypeType,
next(
iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)
),
)
free_startable_warm_buffers_to_replace_hot_buffers = [
warm_buffer.ec2_instance
for warm_buffer in cluster.buffer_ec2s
if (warm_buffer.ec2_instance.type == hot_buffer_instance_type)
and not warm_buffer.assigned_tasks
]
instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers[
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.buffer_drained_nodes)
]

if not instances_to_start:
return cluster
# change the buffer machine to an active one

with log_context(
_logger, logging.INFO, f"start {len(instances_to_start)} buffer machines"
):
Expand Down Expand Up @@ -1187,8 +1215,8 @@ async def _autoscale_cluster(
# 2. activate available drained nodes to cover some of the tasks
cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode)

# 3. start buffer instances to cover the remaining tasks
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)
# 3. start warm buffer instances to cover the remaining tasks
cluster = await _start_warm_buffer_instances(app, cluster, auto_scaling_mode)

# 4. scale down unused instances
cluster = await _scale_down_unused_cluster_instances(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _list_processing_tasks_on_worker(
async with _scheduler_client(scheduler_url, authentication) as client:
worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance)

_logger.debug("looking for processing tasksfor %s", f"{worker_url=}")
_logger.debug("looking for processing tasks for %s", f"{worker_url=}")

# now get the used resources
worker_processing_tasks: list[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,14 @@ async def tag_node(
tags: dict[DockerLabelKey, str],
available: bool,
) -> Node:
assert node.spec # nosec
if (node.spec.labels == tags) and (
(node.spec.availability is Availability.active) == available
):
# nothing to do
return node
with log_context(
logger, logging.DEBUG, msg=f"tagging {node.id=} with {tags=} and {available=}"
logger, logging.DEBUG, msg=f"tag {node.id=} with {tags=} and {available=}"
):
assert node.id # nosec

Expand Down
Loading

0 comments on commit a7d1e3a

Please sign in to comment.