Skip to content

Commit

Permalink
removed InternalClusterAuthentication
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 2, 2024
1 parent d7d6c82 commit 7214338
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import distributed
import pytest
from distributed import Client
from models_library.clusters import InternalClusterAuthentication, TLSAuthentication
from models_library.clusters import ClusterAuthentication, TLSAuthentication
from pydantic import AnyUrl

from .helpers.docker import get_service_published_port
Expand Down Expand Up @@ -72,7 +72,7 @@ def dask_backend_tls_certificates(
@pytest.fixture
def dask_scheduler_auth(
dask_backend_tls_certificates: _TLSCertificates,
) -> InternalClusterAuthentication:
) -> ClusterAuthentication:
return TLSAuthentication(
tls_ca_file=dask_backend_tls_certificates.tls_ca_file,
tls_client_cert=dask_backend_tls_certificates.tls_cert_file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
PortInt,
VersionTag,
)
from models_library.clusters import InternalClusterAuthentication
from models_library.clusters import ClusterAuthentication
from models_library.docker import DockerLabelKey
from pydantic import (
AliasChoices,
Expand Down Expand Up @@ -193,9 +193,9 @@ class NodesMonitoringSettings(BaseCustomSettings):

class DaskMonitoringSettings(BaseCustomSettings):
DASK_MONITORING_URL: AnyUrl = Field(
..., description="the url to the osparc-dask-scheduler"
..., description="the url to the dask-scheduler"
)
DASK_SCHEDULER_AUTH: InternalClusterAuthentication = Field(
DASK_SCHEDULER_AUTH: ClusterAuthentication = Field(
...,
description="defines the authentication of the clusters created via clusters-keeper (can be None or TLS)",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources
from fastapi import FastAPI
from models_library.clusters import InternalClusterAuthentication
from models_library.clusters import ClusterAuthentication
from models_library.docker import (
DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY,
DockerLabelKey,
Expand Down Expand Up @@ -37,7 +37,7 @@ def _scheduler_url(app: FastAPI) -> AnyUrl:
return app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL


def _scheduler_auth(app: FastAPI) -> InternalClusterAuthentication:
def _scheduler_auth(app: FastAPI) -> ClusterAuthentication:
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_DASK # nosec
return app_settings.AUTOSCALING_DASK.DASK_SCHEDULER_AUTH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from aws_library.ec2 import EC2InstanceData, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from distributed.core import Status
from models_library.clusters import InternalClusterAuthentication, TLSAuthentication
from models_library.clusters import ClusterAuthentication, TLSAuthentication
from pydantic import AnyUrl, ByteSize, TypeAdapter

from ..core.errors import (
Expand Down Expand Up @@ -43,7 +43,7 @@ async def _wrap_client_async_routine(

@contextlib.asynccontextmanager
async def _scheduler_client(
url: AnyUrl, authentication: InternalClusterAuthentication
url: AnyUrl, authentication: ClusterAuthentication
) -> AsyncIterator[distributed.Client]:
"""
Raises:
Expand Down Expand Up @@ -116,7 +116,7 @@ def _find_by_worker_host(

async def is_worker_connected(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
authentication: ClusterAuthentication,
worker_ec2_instance: EC2InstanceData,
) -> bool:
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
Expand All @@ -130,7 +130,7 @@ async def is_worker_connected(

async def is_worker_retired(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
authentication: ClusterAuthentication,
worker_ec2_instance: EC2InstanceData,
) -> bool:
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
Expand All @@ -156,7 +156,7 @@ def _dask_key_to_dask_task_id(key: dask.typing.Key) -> DaskTaskId:

async def list_unrunnable_tasks(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
authentication: ClusterAuthentication,
) -> list[DaskTask]:
"""
Raises:
Expand Down Expand Up @@ -188,7 +188,7 @@ def _list_tasks(

async def list_processing_tasks_per_worker(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
authentication: ClusterAuthentication,
) -> dict[DaskWorkerUrl, list[DaskTask]]:
"""
Raises:
Expand Down Expand Up @@ -227,7 +227,7 @@ def _list_processing_tasks(

async def get_worker_still_has_results_in_memory(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
authentication: ClusterAuthentication,
ec2_instance: EC2InstanceData,
) -> int:
"""
Expand All @@ -246,7 +246,7 @@ async def get_worker_still_has_results_in_memory(

async def get_worker_used_resources(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
authentication: ClusterAuthentication,
ec2_instance: EC2InstanceData,
) -> Resources:
"""
Expand Down Expand Up @@ -299,7 +299,7 @@ def _list_processing_tasks_on_worker(

async def compute_cluster_total_resources(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
authentication: ClusterAuthentication,
instances: list[AssociatedInstance],
) -> Resources:
if not instances:
Expand All @@ -320,7 +320,7 @@ async def compute_cluster_total_resources(


async def try_retire_nodes(
scheduler_url: AnyUrl, authentication: InternalClusterAuthentication
scheduler_url: AnyUrl, authentication: ClusterAuthentication
) -> None:
async with _scheduler_client(scheduler_url, authentication) as client:
await _wrap_client_async_routine(
Expand Down
24 changes: 12 additions & 12 deletions services/autoscaling/tests/unit/test_modules_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from aws_library.ec2 import Resources
from faker import Faker
from models_library.clusters import (
InternalClusterAuthentication,
ClusterAuthentication,
NoAuthentication,
TLSAuthentication,
)
Expand Down Expand Up @@ -52,7 +52,7 @@
"authentication", _authentication_types, ids=lambda p: f"authentication-{p.type}"
)
async def test__scheduler_client_with_wrong_url(
faker: Faker, authentication: InternalClusterAuthentication
faker: Faker, authentication: ClusterAuthentication
):
with pytest.raises(DaskSchedulerNotFoundError):
async with _scheduler_client(
Expand All @@ -72,7 +72,7 @@ def scheduler_url(dask_spec_local_cluster: distributed.SpecCluster) -> AnyUrl:


@pytest.fixture
def scheduler_authentication() -> InternalClusterAuthentication:
def scheduler_authentication() -> ClusterAuthentication:
return NoAuthentication()


Expand All @@ -92,7 +92,7 @@ def dask_workers_config() -> dict[str, Any]:


async def test__scheduler_client(
scheduler_url: AnyUrl, scheduler_authentication: InternalClusterAuthentication
scheduler_url: AnyUrl, scheduler_authentication: ClusterAuthentication
):
async with _scheduler_client(scheduler_url, scheduler_authentication):
...
Expand All @@ -109,7 +109,7 @@ async def test_list_unrunnable_tasks_with_no_workers(

async def test_list_unrunnable_tasks(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
create_dask_task: Callable[[DaskTaskResources], distributed.Future],
):
# we have nothing running now
Expand All @@ -131,7 +131,7 @@ async def test_list_unrunnable_tasks(

async def test_list_processing_tasks(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
dask_spec_cluster_client: distributed.Client,
):
def _add_fct(x: int, y: int) -> int:
Expand Down Expand Up @@ -190,7 +190,7 @@ def fake_ec2_instance_data_with_invalid_ec2_name(

async def test_get_worker_still_has_results_in_memory_with_invalid_ec2_name_raises(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
fake_ec2_instance_data_with_invalid_ec2_name: EC2InstanceData,
):
with pytest.raises(Ec2InvalidDnsNameError):
Expand All @@ -216,7 +216,7 @@ async def test_get_worker_still_has_results_in_memory_with_no_workers_raises(

async def test_get_worker_still_has_results_in_memory_with_invalid_worker_host_raises(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
fake_ec2_instance_data: Callable[..., EC2InstanceData],
):
ec2_instance_data = fake_ec2_instance_data()
Expand All @@ -229,7 +229,7 @@ async def test_get_worker_still_has_results_in_memory_with_invalid_worker_host_r
@pytest.mark.parametrize("fct_shall_err", [True, False], ids=str)
async def test_get_worker_still_has_results_in_memory(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
dask_spec_cluster_client: distributed.Client,
fake_localhost_ec2_instance_data: EC2InstanceData,
fct_shall_err: bool,
Expand Down Expand Up @@ -291,7 +291,7 @@ def _add_fct(x: int, y: int) -> int:

async def test_worker_used_resources_with_invalid_ec2_name_raises(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
fake_ec2_instance_data_with_invalid_ec2_name: EC2InstanceData,
):
with pytest.raises(Ec2InvalidDnsNameError):
Expand All @@ -317,7 +317,7 @@ async def test_worker_used_resources_with_no_workers_raises(

async def test_worker_used_resources_with_invalid_worker_host_raises(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
fake_ec2_instance_data: Callable[..., EC2InstanceData],
):
ec2_instance_data = fake_ec2_instance_data()
Expand All @@ -329,7 +329,7 @@ async def test_worker_used_resources_with_invalid_worker_host_raises(

async def test_worker_used_resources(
scheduler_url: AnyUrl,
scheduler_authentication: InternalClusterAuthentication,
scheduler_authentication: ClusterAuthentication,
dask_spec_cluster_client: distributed.Client,
fake_localhost_ec2_instance_data: EC2InstanceData,
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
assert_computation_task_out_obj,
)
from models_library.api_schemas_directorv2.comp_tasks import ComputationGet
from models_library.clusters import DEFAULT_CLUSTER_ID, InternalClusterAuthentication
from models_library.clusters import DEFAULT_CLUSTER_ID, ClusterAuthentication
from models_library.projects import (
Node,
NodesDict,
Expand Down Expand Up @@ -360,7 +360,7 @@ def mock_env(
network_name: str,
dev_feature_r_clone_enabled: str,
dask_scheduler_service: str,
dask_scheduler_auth: InternalClusterAuthentication,
dask_scheduler_auth: ClusterAuthentication,
minimal_configuration: None,
patch_storage_setup: None,
) -> None:
Expand Down

0 comments on commit 7214338

Please sign in to comment.