Skip to content

Commit

Permalink
removed clusters endpoint from dv-2
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 2, 2024
1 parent cbfbbf5 commit 159422d
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 1,191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from .._meta import API_VTAG
from .routes import (
clusters,
computations,
computations_tasks,
dynamic_scheduler,
Expand All @@ -27,7 +26,6 @@
v2_router.include_router(
dynamic_services.router, tags=["dynamic services"], prefix="/dynamic_services"
)
v2_router.include_router(clusters.router, tags=["clusters"], prefix="/clusters")

v2_router.include_router(
dynamic_scheduler.router, tags=["dynamic scheduler"], prefix="/dynamic_scheduler"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
from ..utils.dask_client_utils import (
DaskSubSystem,
TaskHandlers,
create_internal_client_based_on_auth,
connect_to_dask_scheduler,
)

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -133,7 +133,7 @@ async def create(
) -> "DaskClient":
_logger.info(
"Initiating connection to %s with auth: %s, type: %s",
f"dask-scheduler/gateway at {endpoint}",
f"dask-scheduler at {endpoint}",
authentication,
cluster_type,
)
Expand All @@ -149,9 +149,7 @@ async def create(
endpoint,
attempt.retry_state.attempt_number,
)
backend = await create_internal_client_based_on_auth(
endpoint, authentication
)
backend = await connect_to_dask_scheduler(endpoint, authentication)
dask_utils.check_scheduler_status(backend.client)
instance = cls(
app=app,
Expand All @@ -162,7 +160,7 @@ async def create(
)
_logger.info(
"Connection to %s succeeded [%s]",
f"dask-scheduler/gateway at {endpoint}",
f"dask-scheduler at {endpoint}",
json.dumps(attempt.retry_state.retry_object.statistics),
)
_logger.info(
Expand Down Expand Up @@ -331,14 +329,12 @@ async def send_computation_tasks(
)
dask_utils.check_communication_with_scheduler_is_open(self.backend.client)
dask_utils.check_scheduler_status(self.backend.client)
# NOTE: in case it's a gateway or it is an on-demand cluster
# NOTE: in case it is an on-demand cluster
# we do not check a priori if the task
# is runnable because we CAN'T. A cluster might auto-scale, the worker(s)
# might also auto-scale and the gateway does not know that a priori.
# might also auto-scale we do not know that a priori.
# So, we'll just send the tasks over and see what happens after a while.
if (self.cluster_type != ClusterTypeInModel.ON_DEMAND) and (
self.backend.gateway is None
):
if self.cluster_type != ClusterTypeInModel.ON_DEMAND:
dask_utils.check_if_cluster_is_able_to_run_pipeline(
project_id=project_id,
node_id=node_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@
import socket
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import Final

import distributed
import httpx
from aiohttp import ClientConnectionError, ClientResponseError
from dask_task_models_library.container_tasks.events import (
TaskLogEvent,
TaskProgressEvent,
)
from models_library.clusters import (
ClusterAuthentication,
InternalClusterAuthentication,
TLSAuthentication,
)
from models_library.clusters import ClusterAuthentication, TLSAuthentication
from pydantic import AnyUrl

from ..core.errors import ComputationalSchedulerError, ConfigurationError
from ..core.errors import ConfigurationError
from .dask import wrap_client_async_routine


Expand Down Expand Up @@ -53,8 +46,8 @@ async def close(self) -> None:
await wrap_client_async_routine(self.client.close())


async def _connect_to_dask_scheduler(
endpoint: AnyUrl, authentication: InternalClusterAuthentication
async def connect_to_dask_scheduler(
endpoint: AnyUrl, authentication: ClusterAuthentication
) -> DaskSubSystem:
try:
security = distributed.Security()
Expand All @@ -75,37 +68,3 @@ async def _connect_to_dask_scheduler(
except TypeError as exc:
msg = f"Scheduler has invalid configuration: {endpoint=}"
raise ConfigurationError(msg=msg) from exc


async def create_internal_client_based_on_auth(
endpoint: AnyUrl, authentication: ClusterAuthentication
) -> DaskSubSystem:
return await _connect_to_dask_scheduler(endpoint, authentication) # type: ignore[arg-type] # _is_dask_scheduler checks already that it is a valid type


_PING_TIMEOUT_S: Final[int] = 5
_DASK_SCHEDULER_RUNNING_STATE: Final[str] = "running"


async def test_scheduler_endpoint(endpoint: AnyUrl) -> None:
"""This method will try to connect to a scheduler endpoint and raise a ConfigurationError in case of problem
:raises ConfigurationError: contians some information as to why the connection failed
"""
try:
async with distributed.Client(
address=f"{endpoint}", timeout=f"{_PING_TIMEOUT_S}", asynchronous=True
) as dask_client:
if dask_client.status != _DASK_SCHEDULER_RUNNING_STATE:
msg = "internal scheduler is not running!"
raise ComputationalSchedulerError(msg=msg) # noqa: TRY301

except (
ClientConnectionError,
ClientResponseError,
httpx.HTTPError,
ComputationalSchedulerError,
) as exc:
logger.debug("Pinging %s, failed: %s", f"{endpoint=}", f"{exc=!r}")
msg = f"Could not connect to cluster in {endpoint}: error: {exc}"
raise ConfigurationError(msg=msg) from exc
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,6 @@ async def test_changed_scheduler_raises_exception(
mocked_user_completed_cb.assert_not_called()


@pytest.mark.flaky(max_runs=3)
@pytest.mark.parametrize("fail_remote_fct", [False, True])
async def test_get_tasks_status(
dask_client: DaskClient,
Expand Down
Loading

0 comments on commit 159422d

Please sign in to comment.