Skip to content

Commit

Permalink
🐛🎨External Clusters: improve work stealing (#5474)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Mar 13, 2024
1 parent 1588bd2 commit 1cea431
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 50 deletions.
1 change: 1 addition & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}'

CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest
CLUSTERS_KEEPER_DASK_NTHREADS=0
CLUSTERS_KEEPER_DASK_WORKER_SATURATION=inf
CLUSTERS_KEEPER_EC2_ACCESS=null
CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=5
CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES=null
Expand Down
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,13 @@ nodenv: node_modules ## builds node_modules local environ (TODO)

pylint: ## python linting
# pylint version info
@/bin/bash -c "pylint --version"
@pylint --version
# Running linter in packages and services (except director)
@folders=$$(find $(CURDIR)/services $(CURDIR)/packages -type d -not -path "*/director/*" -name 'src' -exec dirname {} \; | sort -u); \
exit_status=0; \
for folder in $$folders; do \
pushd "$$folder"; \
make pylint || exit_status=1; \
popd; \
echo "Linting $$folder"; \
$(MAKE_C) "$$folder" pylint || exit_status=1; \
done;\
exit $$exit_status
# Running linter elsewhere
Expand Down
21 changes: 16 additions & 5 deletions packages/settings-library/src/settings_library/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
)
from pydantic.error_wrappers import ErrorList, ErrorWrapper
from pydantic.fields import ModelField, Undefined
from pydantic.typing import is_literal_type

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

_DEFAULTS_TO_NONE_MSG: Final[
str
Expand All @@ -39,7 +40,7 @@ def _default_factory():
except ValidationError as err:
if field.allow_none:
# e.g. Optional[PostgresSettings] would warn if defaults to None
logger.warning(
_logger.warning(
_DEFAULTS_TO_NONE_MSG,
field.name,
)
Expand Down Expand Up @@ -101,8 +102,14 @@ def prepare_field(cls, field: ModelField) -> None:
is_not_composed = (
get_origin(field_type) is None
) # is not composed as dict[str, Any] or Generic[Base]

if is_not_composed and issubclass(field_type, BaseCustomSettings):
# avoid literals raising TypeError
is_not_literal = is_literal_type(field.type_) is False

if (
is_not_literal
and is_not_composed
and issubclass(field_type, BaseCustomSettings)
):
if auto_default_from_env:
assert field.field_info.default is Undefined
assert field.field_info.default_factory is None
Expand All @@ -112,7 +119,11 @@ def prepare_field(cls, field: ModelField) -> None:
field.default = None
field.required = False # has a default now

elif is_not_composed and issubclass(field_type, BaseSettings):
elif (
is_not_literal
and is_not_composed
and issubclass(field_type, BaseSettings)
):
msg = f"{cls}.{field.name} of type {field_type} must inherit from BaseCustomSettings"
raise ConfigError(msg)

Expand Down
3 changes: 2 additions & 1 deletion packages/settings-library/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# pylint: disable=unused-argument
# pylint: disable=unused-variable
# pylint: disable=too-many-arguments
# pylint: disable=protected-access

import inspect
import json
Expand Down Expand Up @@ -196,7 +197,7 @@ def test_auto_default_to_none_logs_a_warning(
create_settings_class: Callable[[str], type[BaseCustomSettings]],
mocker: MockerFixture,
):
logger_warn = mocker.spy(settings_library.base.logger, "warning")
logger_warn = mocker.spy(settings_library.base._logger, "warning") # noqa: SLF001

S = create_settings_class("S")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,30 @@ async def compute_node_used_resources(
app: FastAPI, instance: AssociatedInstance
) -> Resources:
try:
num_results_in_memory = await dask.get_worker_still_has_results_in_memory(
resource = await dask.get_worker_used_resources(
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
)
if num_results_in_memory > 0:
# NOTE: this is a trick to consider the node still useful
return Resources(cpus=0, ram=ByteSize(1024 * 1024 * 1024))
return await dask.get_worker_used_resources(
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
if resource == Resources.create_as_empty():
num_results_in_memory = (
await dask.get_worker_still_has_results_in_memory(
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
)
)
if num_results_in_memory > 0:
_logger.debug(
"found %s for %s",
f"{num_results_in_memory=}",
f"{instance.ec2_instance.id}",
)
# NOTE: this is a trick to consider the node still useful
return Resources(cpus=0, ram=ByteSize(1024 * 1024 * 1024))

_logger.debug(
"found %s for %s", f"{resource=}", f"{instance.ec2_instance.id}"
)
return resource
except (DaskWorkerNotFoundError, DaskNoWorkersError):
_logger.debug("no resource found for %s", f"{instance.ec2_instance.id}")
return Resources.create_as_empty()

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import contextlib
import logging
import re
Expand Down Expand Up @@ -217,30 +218,39 @@ async def get_worker_used_resources(
DaskNoWorkersError
"""

def _get_worker_used_resources(
def _list_processing_tasks_on_worker(
dask_scheduler: distributed.Scheduler, *, worker_url: str
) -> dict[str, float] | None:
for worker_name, worker_state in dask_scheduler.workers.items():
if worker_url != worker_name:
continue
if worker_state.status is distributed.Status.closing_gracefully:
# NOTE: when a worker was retired it is in this state
return {}
return dict(worker_state.used_resources)
return None
) -> list[tuple[DaskTaskId, DaskTaskResources]]:
processing_tasks = []
for task_key, task_state in dask_scheduler.tasks.items():
if task_state.processing_on and (
task_state.processing_on.address == worker_url
):
processing_tasks.append((task_key, task_state.resource_restrictions))
return processing_tasks

async with _scheduler_client(scheduler_url, authentication) as client:
worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance)

_logger.debug("looking for processing tasksfor %s", f"{worker_url=}")

# now get the used resources
worker_used_resources: dict[str, Any] | None = await _wrap_client_async_routine(
client.run_on_scheduler(_get_worker_used_resources, worker_url=worker_url),
worker_processing_tasks: list[
tuple[DaskTaskId, DaskTaskResources]
] = await _wrap_client_async_routine(
client.run_on_scheduler(
_list_processing_tasks_on_worker, worker_url=worker_url
),
)
if worker_used_resources is None:
raise DaskWorkerNotFoundError(worker_host=worker_url, url=scheduler_url)

total_resources_used: collections.Counter[str] = collections.Counter()
for _, task_resources in worker_processing_tasks:
total_resources_used.update(task_resources)

_logger.debug("found %s for %s", f"{total_resources_used=}", f"{worker_url=}")
return Resources(
cpus=worker_used_resources.get("CPU", 0),
ram=parse_obj_as(ByteSize, worker_used_resources.get("RAM", 0)),
cpus=total_resources_used.get("CPU", 0),
ram=parse_obj_as(ByteSize, total_resources_used.get("RAM", 0)),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,41 @@
_PENDING_DOCKER_TASK_MESSAGE: Final[str] = "pending task scheduling"
_INSUFFICIENT_RESOURCES_DOCKER_TASK_ERR: Final[str] = "insufficient resources on"
_NOT_SATISFIED_SCHEDULING_CONSTRAINTS_TASK_ERR: Final[str] = "no suitable node"
_OSPARC_SERVICE_READY_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
DockerLabelKey, "io.simcore.osparc-services-ready"
)
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
DockerLabelKey, f"{_OSPARC_SERVICE_READY_LABEL_KEY}-last-changed"
)
_OSPARC_SERVICE_READY_LABEL_KEYS: Final[list[DockerLabelKey]] = [
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
]


async def get_monitored_nodes(
docker_client: AutoscalingDocker, node_labels: list[DockerLabelKey]
) -> list[Node]:
node_label_filters = [f"{label}=true" for label in node_labels] + [
f"{label}" for label in _OSPARC_SERVICE_READY_LABEL_KEYS
]
return parse_obj_as(
list[Node],
await docker_client.nodes.list(
filters={"node.label": [f"{label}=true" for label in node_labels]}
),
await docker_client.nodes.list(filters={"node.label": node_label_filters}),
)


async def get_worker_nodes(docker_client: AutoscalingDocker) -> list[Node]:
return parse_obj_as(
list[Node],
await docker_client.nodes.list(filters={"role": ["worker"]}),
await docker_client.nodes.list(
filters={
"role": ["worker"],
"node.label": [
f"{label}" for label in _OSPARC_SERVICE_READY_LABEL_KEYS
],
}
),
)


Expand Down Expand Up @@ -550,14 +568,6 @@ def is_node_ready_and_available(node: Node, *, availability: Availability) -> bo
)


_OSPARC_SERVICE_READY_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
DockerLabelKey, "io.simcore.osparc-services-ready"
)
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
DockerLabelKey, f"{_OSPARC_SERVICE_READY_LABEL_KEY}-last-changed"
)


def is_node_osparc_ready(node: Node) -> bool:
if not is_node_ready_and_available(node, availability=Availability.active):
return False
Expand Down
9 changes: 8 additions & 1 deletion services/autoscaling/tests/unit/test_utils_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from simcore_service_autoscaling.modules.docker import AutoscalingDocker
from simcore_service_autoscaling.utils.utils_docker import (
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
Node,
_by_created_dt,
attach_node,
Expand Down Expand Up @@ -133,7 +134,13 @@ async def test_get_monitored_nodes_with_valid_label(
create_node_labels: Callable[[list[str]], Awaitable[None]],
):
labels = faker.pylist(allowed_types=(str,))
await create_node_labels(labels)
await create_node_labels(
[
*labels,
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
]
)
monitored_nodes = await get_monitored_nodes(autoscaling_docker, node_labels=labels)
assert len(monitored_nodes) == 1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
from functools import cached_property
from typing import Any, ClassVar, Final, cast
from typing import Any, ClassVar, Final, Literal, cast

from aws_library.ec2.models import EC2InstanceBootSpecific, EC2Tags
from fastapi import FastAPI
Expand All @@ -11,7 +11,14 @@
VersionTag,
)
from models_library.clusters import InternalClusterAuthentication
from pydantic import Field, NonNegativeInt, PositiveInt, parse_obj_as, validator
from pydantic import (
Field,
NonNegativeFloat,
NonNegativeInt,
PositiveInt,
parse_obj_as,
validator,
)
from settings_library.base import BaseCustomSettings
from settings_library.docker_registry import RegistrySettings
from settings_library.ec2 import EC2Settings
Expand Down Expand Up @@ -270,7 +277,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
description="defines the image tag to use for the computational backend sidecar image (NOTE: it currently defaults to use itisfoundation organisation in Dockerhub)",
)

CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: InternalClusterAuthentication = Field(
CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: (
InternalClusterAuthentication
) = Field(
...,
description="defines the authentication of the clusters created via clusters-keeper (can be None or TLS)",
)
Expand All @@ -280,6 +289,12 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
description="overrides the default number of threads in the dask-sidecars, setting it to 0 will use the default (see description in dask-sidecar)",
)

CLUSTERS_KEEPER_DASK_WORKER_SATURATION: NonNegativeFloat | Literal["inf"] = Field(
default="inf",
description="override the dask scheduler 'worker-saturation' field"
", see https://selectfrom.dev/deep-dive-into-dask-distributed-scheduler-9fdb3b36b7c7",
)

SWARM_STACK_NAME: str = Field(
..., description="Stack name defined upon deploy (see main Makefile)"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
DASK_TLS_KEY: ${DASK_TLS_KEY}
DASK_SCHEDULER_URL: tls://dask-scheduler:8786
DASK_START_AS_SCHEDULER: 1
DASK_WORKER_SATURATION: ${DASK_WORKER_SATURATION}
LOG_LEVEL: ${LOG_LEVEL}
ports:
- 8786:8786 # dask-scheduler access
Expand Down Expand Up @@ -49,6 +50,7 @@ services:
DASK_TLS_CA_FILE: ${DASK_TLS_CA_FILE}
DASK_TLS_CERT: ${DASK_TLS_CERT}
DASK_TLS_KEY: ${DASK_TLS_KEY}
DASK_WORKER_SATURATION: ${DASK_WORKER_SATURATION}
LOG_LEVEL: ${LOG_LEVEL}
SIDECAR_COMP_SERVICES_SHARED_FOLDER: /home/scu/computational_shared_data
SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME: computational_shared_data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str:
f"DASK_TLS_CA_FILE={_HOST_TLS_CA_FILE_PATH}",
f"DASK_TLS_CERT={_HOST_TLS_CERT_FILE_PATH}",
f"DASK_TLS_KEY={_HOST_TLS_KEY_FILE_PATH}",
f"DASK_WORKER_SATURATION={app_settings.CLUSTERS_KEEPER_DASK_WORKER_SATURATION}",
f"DOCKER_IMAGE_TAG={app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG}",
f"EC2_INSTANCES_NAME_PREFIX={cluster_machines_name_prefix}",
f"LOG_LEVEL={app_settings.LOG_LEVEL}",
Expand Down
1 change: 1 addition & 0 deletions services/clusters-keeper/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def app_environment(
"CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES": "{}",
"CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX": faker.pystr(),
"CLUSTERS_KEEPER_DASK_NTHREADS": f"{faker.pyint(min_value=0)}",
"CLUSTERS_KEEPER_DASK_WORKER_SATURATION": f"{faker.pyfloat(min_value=0.1)}",
"CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH": "{}",
"PRIMARY_EC2_INSTANCES_KEY_NAME": faker.pystr(),
"PRIMARY_EC2_INSTANCES_SECURITY_GROUP_IDS": json.dumps(
Expand Down
12 changes: 10 additions & 2 deletions services/dask-sidecar/docker/boot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,19 @@ logging:
distributed.scheduler: ${LOG_LEVEL:-warning}
EOF

# Check if DASK_TLS_CA_FILE is present
# Define the base configuration for distributed
# the worker-saturation defines how the scheduler loads
# the workers, see https://github.com/dask/distributed/blob/91350ab15c79de973597e319bd36cc8d56e9f999/distributed/scheduler.py
cat >/home/scu/.config/dask/distributed.yaml <<EOF
distributed:
scheduler:
worker-saturation: ${DASK_WORKER_SATURATION:-inf}
EOF

# Check if DASK_TLS_CA_FILE is present and add the necesary configs
if [ -n "${DASK_TLS_CA_FILE:-}" ]; then
print_info "TLS authentication enabled"
cat >>/home/scu/.config/dask/distributed.yaml <<EOF
distributed:
comm:
default-scheme: tls
require-encryption: true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import dask.config
import distributed

from ._meta import print_dask_scheduler_banner
Expand All @@ -11,6 +12,7 @@ async def dask_setup(scheduler: distributed.Scheduler) -> None:
"""This is a special function recognized by the dask worker when starting with flag --preload"""
_logger.info("Setting up scheduler...")
assert scheduler # nosec
print(f"dask config: {dask.config.config}", flush=True) # noqa: T201
print_dask_scheduler_banner()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def _comp_sidecar_fct(
key=job_id,
resources=dask_resources,
retries=0,
pure=False,
)
# NOTE: the callback is running in a secondary thread, and takes a future as arg
task_future.add_done_callback(lambda _: callback())
Expand Down
Loading

0 comments on commit 1cea431

Please sign in to comment.