Skip to content

Commit

Permalink
✨Adds metrics regarding new style dynamic services timings (ITISFound…
Browse files Browse the repository at this point in the history
…ation#6347)

Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Sep 18, 2024
1 parent f22215c commit 9057d01
Show file tree
Hide file tree
Showing 23 changed files with 435 additions and 39 deletions.
2 changes: 2 additions & 0 deletions packages/service-library/src/servicelib/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def get_metrics_namespace(application_name: str) -> str:
return application_name.replace("-", "_")
8 changes: 4 additions & 4 deletions packages/service-library/src/servicelib/rabbitmq/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ async def wait_till_rabbitmq_responsive(url: str) -> bool:


def get_rabbitmq_client_unique_name(base_name: str) -> str:
# NOTE: below prefix is guaranteed to change each time the preocess restarts
# Why is this desiarable?
# 1. the code base makes the above assumption, otherwise subcscribers and consumers do not work
# 2. enables restartability of webserver during [re]deploys
# NOTE: The prefix below will change every time the process restarts.
# Why is this necessary?
# 1. The codebase relies on this behavior; without it, subscribers and consumers will fail.
# 2. It allows the web server to be restarted seamlessly during [re]deployments.
prefix_create_time = f"{psutil.Process(os.getpid()).create_time()}".strip(".")[-6:]

return f"{base_name}_{socket.gethostname()}_{prefix_create_time}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Final

from servicelib.instrumentation import get_metrics_namespace

from ..._meta import APP_NAME

METRICS_NAMESPACE: Final[str] = APP_NAME.replace("-", "_")
METRICS_NAMESPACE: Final[str] = get_metrics_namespace(APP_NAME)
EC2_INSTANCE_LABELS: Final[tuple[str, ...]] = ("instance_type",)

CLUSTER_METRICS_DEFINITIONS: Final[dict[str, tuple[str, tuple[str, ...]]]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ async def create_dynamic_service(
logger.debug("Redirecting %s", redirect_url_with_query)
return RedirectResponse(str(redirect_url_with_query))

#
if not await is_sidecar_running(
service.node_uuid, dynamic_services_settings.DYNAMIC_SCHEDULER.SWARM_STACK_NAME
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
override_fastapi_openapi_method,
)
from servicelib.fastapi.profiler_middleware import ProfilerMiddleware
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation,
)
from servicelib.fastapi.tracing import setup_tracing
from servicelib.logging_utils import config_all_loggers

Expand All @@ -28,6 +25,7 @@
director_v0,
dynamic_services,
dynamic_sidecar,
instrumentation,
notifier,
rabbitmq,
redis,
Expand Down Expand Up @@ -192,7 +190,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
resource_usage_tracker_client.setup(app)

if settings.DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED:
setup_prometheus_instrumentation(app)
instrumentation.setup(app)
if settings.DIRECTOR_V2_TRACING:
setup_tracing(app, app.state.settings.DIRECTOR_V2_TRACING, APP_NAME)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import logging
import re
from collections.abc import Mapping
from datetime import datetime
from enum import Enum
from functools import cached_property
from pathlib import Path
from typing import Any, TypeAlias
from uuid import UUID

import arrow
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceCreate
from models_library.api_schemas_directorv2.dynamic_services_service import (
CommonServiceDetails,
Expand Down Expand Up @@ -170,6 +172,28 @@ def mark_removed(self) -> None:
self.was_removed = True


class ServicesInstrumentation(BaseModel):
start_requested_at: datetime | None = Field(
None,
description="moment in which the process of starting the service was requested",
)
close_requested_at: datetime | None = Field(
None,
description="moment in which the process of stopping the service was requested",
)

def elapsed_since_start_request(self) -> float | None:
if self.start_requested_at is None:
return None

return (arrow.utcnow().datetime - self.start_requested_at).total_seconds()

def elapsed_since_close_request(self) -> float | None:
if self.close_requested_at is None:
return None
return (arrow.utcnow().datetime - self.close_requested_at).total_seconds()


class DynamicSidecar(BaseModel):
status: Status = Field(
Status.create_as_initially_ok(),
Expand Down Expand Up @@ -254,6 +278,11 @@ def compose_spec_submitted(self) -> bool:
description="set True if the dy-sidecar saves the state and uploads the outputs",
)

instrumentation: ServicesInstrumentation = Field(
default_factory=lambda: ServicesInstrumentation.parse_obj({}),
description="keeps track times for various operations",
)

# below had already been validated and
# used only to start the proxy
dynamic_sidecar_id: ServiceId | None = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,20 @@ async def stop_service(
progress_callback,
)

async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None:
async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> int:
response = await self._thin_client.post_containers_tasks_state_restore(
dynamic_sidecar_endpoint
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
_debug_progress_callback,
)
assert isinstance(result, int) # nosec
return result

async def pull_user_services_images(
self, dynamic_sidecar_endpoint: AnyHttpUrl
Expand All @@ -381,18 +383,20 @@ async def save_service_state(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
progress_callback: ProgressCallback | None = None,
) -> None:
) -> int:
response = await self._thin_client.post_containers_tasks_state_save(
dynamic_sidecar_endpoint
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
progress_callback,
)
assert isinstance(result, int) # nosec
return result

async def pull_service_input_ports(
self,
Expand All @@ -416,18 +420,20 @@ async def pull_service_output_ports(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
port_keys: list[str] | None = None,
) -> None:
) -> int:
response = await self._thin_client.post_containers_tasks_ports_outputs_pull(
dynamic_sidecar_endpoint, port_keys
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
_debug_progress_callback,
)
assert isinstance(result, int) # nosec
return result

async def push_service_output_ports(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DynamicServicesSchedulerSettings,
)
from .....models.dynamic_services_scheduler import SchedulerData
from .....modules.instrumentation import get_instrumentation, get_metrics_labels
from .....utils.db import get_repository
from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository
from ....db.repositories.projects import ProjectsRepository
Expand Down Expand Up @@ -222,4 +223,12 @@ async def progress_create_containers(

scheduler_data.dynamic_sidecar.were_containers_created = True

start_duration = (
scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_start_request()
)
assert start_duration is not None # nosec
get_instrumentation(app).dynamic_sidecar_metrics.start_time_duration.labels(
**get_metrics_labels(scheduler_data)
).observe(start_duration)

_logger.info("Internal state after creating user services %s", scheduler_data)
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
DockerStatus,
SchedulerData,
)
from .....modules.instrumentation import (
get_instrumentation,
get_metrics_labels,
get_rate,
track_duration,
)
from .....utils.db import get_repository
from ....db.repositories.projects import ProjectsRepository
from ....db.repositories.projects_networks import ProjectsNetworksRepository
Expand Down Expand Up @@ -157,9 +163,15 @@ async def service_save_state(
progress_callback: ProgressCallback | None = None,
) -> None:
scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid)
await sidecars_client.save_service_state(
scheduler_data.endpoint, progress_callback=progress_callback
)

with track_duration() as duration:
size = await sidecars_client.save_service_state(
scheduler_data.endpoint, progress_callback=progress_callback
)
get_instrumentation(app).dynamic_sidecar_metrics.push_service_state_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(get_rate(size, duration.to_flaot()))

await sidecars_client.update_volume_state(
scheduler_data.endpoint,
volume_category=VolumeCategory.STATES,
Expand Down Expand Up @@ -375,6 +387,16 @@ async def attempt_pod_removal_and_data_saving(
rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client
await rabbitmq_client.publish(message.channel_name, message)

# metrics

stop_duration = (
scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_close_request()
)
assert stop_duration is not None # nosec
get_instrumentation(app).dynamic_sidecar_metrics.stop_time_duration.labels(
**get_metrics_labels(scheduler_data)
).observe(stop_duration)


async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) -> None:
_logger.debug("Attaching project networks for %s", scheduler_data.service_name)
Expand Down Expand Up @@ -460,14 +482,44 @@ async def prepare_services_environment(
)
)

async def _pull_output_ports_with_metrics() -> None:
with track_duration() as duration:
size: int = await sidecars_client.pull_service_output_ports(
dynamic_sidecar_endpoint
)

get_instrumentation(app).dynamic_sidecar_metrics.output_ports_pull_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(get_rate(size, duration.to_flaot()))

async def _pull_user_services_images_with_metrics() -> None:
with track_duration() as duration:
await sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint)

get_instrumentation(
app
).dynamic_sidecar_metrics.pull_user_services_images_duration.labels(
**get_metrics_labels(scheduler_data)
).observe(
duration.to_flaot()
)

async def _restore_service_state_with_metrics() -> None:
with track_duration() as duration:
size = await sidecars_client.restore_service_state(dynamic_sidecar_endpoint)

get_instrumentation(app).dynamic_sidecar_metrics.pull_service_state_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(get_rate(size, duration.to_flaot()))

tasks = [
sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint),
sidecars_client.pull_service_output_ports(dynamic_sidecar_endpoint),
_pull_user_services_images_with_metrics(),
_pull_output_ports_with_metrics(),
]
# When enabled no longer downloads state via nodeports
# S3 is used to store state paths
if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED:
tasks.append(sidecars_client.restore_service_state(dynamic_sidecar_endpoint))
tasks.append(_restore_service_state_with_metrics())

await limited_gather(*tasks, limit=3)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import contextlib
import functools
import logging
import time
from asyncio import Lock, Queue, Task
from dataclasses import dataclass, field
from typing import Final

import arrow
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceCreate,
Expand Down Expand Up @@ -54,6 +56,11 @@
DynamicServicesSchedulerSettings,
)
from .....models.dynamic_services_scheduler import SchedulerData, ServiceName
from .....modules.instrumentation import (
get_instrumentation,
get_metrics_labels,
get_rate,
)
from ...api_client import SidecarsClient, get_sidecars_client
from ...docker_api import update_scheduler_data_label
from ...errors import DynamicSidecarError, DynamicSidecarNotFoundError
Expand Down Expand Up @@ -255,6 +262,9 @@ async def add_service(
request_simcore_user_agent=request_simcore_user_agent,
can_save=can_save,
)
scheduler_data.dynamic_sidecar.instrumentation.start_requested_at = (
arrow.utcnow().datetime
)
await self.add_service_from_scheduler_data(scheduler_data)

async def add_service_from_scheduler_data(
Expand Down Expand Up @@ -353,6 +363,10 @@ async def mark_service_for_removal(
)
return

current.dynamic_sidecar.instrumentation.close_requested_at = (
arrow.utcnow().datetime
)

# PC-> ANE: could you please review what to do when can_save=None
assert can_save is not None # nosec
current.dynamic_sidecar.service_removal_state.mark_to_remove(
Expand Down Expand Up @@ -455,9 +469,19 @@ async def retrieve_service_inputs(
dynamic_sidecar_endpoint: AnyHttpUrl = scheduler_data.endpoint
sidecars_client: SidecarsClient = await get_sidecars_client(self.app, node_uuid)

started = time.time()
transferred_bytes = await sidecars_client.pull_service_input_ports(
dynamic_sidecar_endpoint, port_keys
)
duration = time.time() - started

get_instrumentation(
self.app
).dynamic_sidecar_metrics.input_ports_pull_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(
get_rate(transferred_bytes, duration)
)

if scheduler_data.restart_policy == RestartPolicy.ON_INPUTS_DOWNLOADED:
logger.info("Will restart containers")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ._setup import get_instrumentation, setup
from ._utils import get_metrics_labels, get_rate, track_duration

__all__: tuple[str, ...] = (
"get_instrumentation",
"get_metrics_labels",
"get_rate",
"setup",
"track_duration",
)
Loading

0 comments on commit 9057d01

Please sign in to comment.