From 1327f2ebf71b83c55cc00452cd142fcecfe51033 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Thu, 19 Sep 2024 23:11:36 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=E2=9A=97=EF=B8=8FPrometheus=20inst?= =?UTF-8?q?rumentation=20incorrectly=20setup=20(#6398)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fastapi/prometheus_instrumentation.py | 26 ++++---- .../src/servicelib/fastapi/tracing.py | 25 +++---- .../src/servicelib/instrumentation.py | 11 ++++ services/agent/tests/unit/test_core_routes.py | 8 +-- .../modules/instrumentation/_core.py | 6 +- .../modules/instrumentation/_models.py | 42 ++++++++---- .../modules/instrumentation/_utils.py | 5 +- services/autoscaling/tests/unit/conftest.py | 8 ++- .../test_modules_instrumentation_utils.py | 5 +- .../modules/instrumentation/_models.py | 66 +++++++++++-------- 10 files changed, 126 insertions(+), 76 deletions(-) diff --git a/packages/service-library/src/servicelib/fastapi/prometheus_instrumentation.py b/packages/service-library/src/servicelib/fastapi/prometheus_instrumentation.py index 626d5559df7..847585c52fc 100644 --- a/packages/service-library/src/servicelib/fastapi/prometheus_instrumentation.py +++ b/packages/service-library/src/servicelib/fastapi/prometheus_instrumentation.py @@ -2,23 +2,27 @@ from fastapi import FastAPI +from prometheus_client import CollectorRegistry from prometheus_fastapi_instrumentator import Instrumentator def setup_prometheus_instrumentation(app: FastAPI) -> Instrumentator: + # NOTE: use that registry to prevent having a global one + app.state.prometheus_registry = registry = CollectorRegistry(auto_describe=True) + instrumentator = Instrumentator( + should_instrument_requests_inprogress=False, # bug in https://github.com/trallnag/prometheus-fastapi-instrumentator/issues/317 + inprogress_labels=False, + registry=registry, + ).instrument(app) - instrumentator = ( - Instrumentator( - should_instrument_requests_inprogress=True, inprogress_labels=False - ) - .instrument(app) - .expose(app, include_in_schema=False) - ) + async def _on_startup() -> None: + instrumentator.expose(app, include_in_schema=False) - def _unregister(): - for collector in list(instrumentator.registry._collector_to_names.keys()): - instrumentator.registry.unregister(collector) + def _unregister() -> None: + # NOTE: avoid registering collectors multiple times when running unittests consecutively (https://stackoverflow.com/a/62489287) + for collector in list(registry._collector_to_names.keys()): # noqa: SLF001 + registry.unregister(collector) - # avoid registering collectors multiple times when running unittests consecutively (https://stackoverflow.com/a/62489287) + app.add_event_handler("startup", _on_startup) app.add_event_handler("shutdown", _unregister) return instrumentator diff --git a/packages/service-library/src/servicelib/fastapi/tracing.py b/packages/service-library/src/servicelib/fastapi/tracing.py index ba1c7d9e565..e0f670686f5 100644 --- a/packages/service-library/src/servicelib/fastapi/tracing.py +++ b/packages/service-library/src/servicelib/fastapi/tracing.py @@ -1,6 +1,7 @@ """ Adds fastapi middleware for tracing using opentelemetry instrumentation. """ + import logging from fastapi import FastAPI @@ -8,9 +9,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter as OTLPSpanExporterHTTP, ) -from opentelemetry.instrumentation.fastapi import ( - FastAPIInstrumentor, # pylint: disable=no-name-in-module -) +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor @@ -21,24 +20,19 @@ def setup_tracing( app: FastAPI, tracing_settings: TracingSettings, service_name: str -) -> FastAPIInstrumentor | None: +) -> None: if ( not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT and not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT ): log.warning("Skipping opentelemetry tracing setup") - return None - if ( - not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT - or not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT - ): - raise RuntimeError( - f"Variable opentelemetry_collector_endpoint [{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}] or opentelemetry_collector_port [{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT}] unset. Tracing options incomplete." - ) + return + # Set up the tracer provider resource = Resource(attributes={"service.name": service_name}) trace.set_tracer_provider(TracerProvider(resource=resource)) - tracer_provider = trace.get_tracer_provider() + global_tracer_provider = trace.get_tracer_provider() + assert isinstance(global_tracer_provider, TracerProvider) # nosec tracing_destination: str = f"{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}:{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT}/v1/traces" log.info( "Trying to connect service %s to tracing collector at %s.", @@ -48,7 +42,6 @@ def setup_tracing( # Configure OTLP exporter to send spans to the collector otlp_exporter = OTLPSpanExporterHTTP(endpoint=tracing_destination) span_processor = BatchSpanProcessor(otlp_exporter) - # Mypy bug --> https://github.com/open-telemetry/opentelemetry-python/issues/3713 - tracer_provider.add_span_processor(span_processor) # type: ignore[attr-defined] + global_tracer_provider.add_span_processor(span_processor) # Instrument FastAPI - return FastAPIInstrumentor().instrument_app(app) # type: ignore[no-any-return] + FastAPIInstrumentor().instrument_app(app) diff --git a/packages/service-library/src/servicelib/instrumentation.py b/packages/service-library/src/servicelib/instrumentation.py index d1fa57f66e4..002e1942853 100644 --- a/packages/service-library/src/servicelib/instrumentation.py +++ b/packages/service-library/src/servicelib/instrumentation.py @@ -1,2 +1,13 @@ +from dataclasses import dataclass + +from prometheus_client import CollectorRegistry + + +@dataclass(slots=True, kw_only=True) +class MetricsBase: + subsystem: str + registry: CollectorRegistry + + def get_metrics_namespace(application_name: str) -> str: return application_name.replace("-", "_") diff --git a/services/agent/tests/unit/test_core_routes.py b/services/agent/tests/unit/test_core_routes.py index 1fd0252d1aa..c20b8714757 100644 --- a/services/agent/tests/unit/test_core_routes.py +++ b/services/agent/tests/unit/test_core_routes.py @@ -29,7 +29,7 @@ def test_client(initialized_app: FastAPI) -> TestClient: def test_health_ok(env: None, test_client: TestClient): response = test_client.get("/health") assert response.status_code == status.HTTP_200_OK - assert response.json() == None + assert response.json() is None def test_health_fails_not_started( @@ -37,7 +37,7 @@ def test_health_fails_not_started( ): task_monitor: TaskMonitor = initialized_app.state.task_monitor # emulate monitor not being started - task_monitor._was_started = False + task_monitor._was_started = False # noqa: SLF001 response = test_client.get("/health") assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE @@ -50,8 +50,8 @@ def test_health_fails_hanging_tasks( task_monitor: TaskMonitor = initialized_app.state.task_monitor # emulate tasks hanging - for task_data in task_monitor._to_start.values(): - task_data._start_time = time() - 1e6 + for task_data in task_monitor._to_start.values(): # noqa: SLF001 + task_data._start_time = time() - 1e6 # noqa: SLF001 response = test_client.get("/health") assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py index f7aaadbdc2a..e3bc20ef518 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py @@ -22,8 +22,10 @@ async def on_startup() -> None: metrics_subsystem = ( "dynamic" if app_settings.AUTOSCALING_NODES_MONITORING else "computational" ) - app.state.instrumentation = AutoscalingInstrumentation( - registry=instrumentator.registry, subsystem=metrics_subsystem + app.state.instrumentation = ( + AutoscalingInstrumentation( # pylint: disable=unexpected-keyword-arg + registry=instrumentator.registry, subsystem=metrics_subsystem + ) ) async def on_shutdown() -> None: diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py index 056a77ea2a5..3831b33b826 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py @@ -2,6 +2,7 @@ from typing import Final from prometheus_client import CollectorRegistry, Counter, Histogram +from servicelib.instrumentation import MetricsBase from ...models import BufferPoolManager, Cluster from ._constants import ( @@ -13,11 +14,6 @@ from ._utils import TrackedGauge, create_gauge -@dataclass(slots=True, kw_only=True) -class MetricsBase: - subsystem: str - - @dataclass(slots=True, kw_only=True) class ClusterMetrics(MetricsBase): # pylint: disable=too-many-instance-attributes active_nodes: TrackedGauge = field(init=False) @@ -36,7 +32,12 @@ def __post_init__(self) -> None: cluster_subsystem = f"{self.subsystem}_cluster" # Creating and assigning gauges using the field names and the metric definitions for field_name, definition in CLUSTER_METRICS_DEFINITIONS.items(): - gauge = create_gauge(field_name, definition, cluster_subsystem) + gauge = create_gauge( + field_name=field_name, + definition=definition, + subsystem=cluster_subsystem, + registry=self.registry, + ) setattr(self, field_name, gauge) def update_from_cluster(self, cluster: Cluster) -> None: @@ -65,6 +66,7 @@ def __post_init__(self) -> None: labelnames=EC2_INSTANCE_LABELS, namespace=METRICS_NAMESPACE, subsystem=self.subsystem, + registry=self.registry, ) self.started_instances = Counter( "started_instances_total", @@ -72,6 +74,7 @@ def __post_init__(self) -> None: labelnames=EC2_INSTANCE_LABELS, namespace=METRICS_NAMESPACE, subsystem=self.subsystem, + registry=self.registry, ) self.stopped_instances = Counter( "stopped_instances_total", @@ -79,6 +82,7 @@ def __post_init__(self) -> None: labelnames=EC2_INSTANCE_LABELS, namespace=METRICS_NAMESPACE, subsystem=self.subsystem, + registry=self.registry, ) self.terminated_instances = Counter( "terminated_instances_total", @@ -86,6 +90,7 @@ def __post_init__(self) -> None: labelnames=EC2_INSTANCE_LABELS, namespace=METRICS_NAMESPACE, subsystem=self.subsystem, + registry=self.registry, ) def instance_started(self, instance_type: str) -> None: @@ -123,7 +128,12 @@ def __post_init__(self) -> None: setattr( self, field_name, - create_gauge(field_name, definition, buffer_pools_subsystem), + create_gauge( + field_name=field_name, + definition=definition, + subsystem=buffer_pools_subsystem, + registry=self.registry, + ), ) self.instances_ready_to_pull_seconds = Histogram( "instances_ready_to_pull_duration_seconds", @@ -132,6 +142,7 @@ def __post_init__(self) -> None: namespace=METRICS_NAMESPACE, subsystem=buffer_pools_subsystem, buckets=(10, 20, 30, 40, 50, 60, 120), + registry=self.registry, ) self.instances_completed_pulling_seconds = Histogram( "instances_completed_pulling_duration_seconds", @@ -150,6 +161,7 @@ def __post_init__(self) -> None: 30 * _MINUTE, 40 * _MINUTE, ), + registry=self.registry, ) def update_from_buffer_pool_manager( @@ -174,8 +186,16 @@ class AutoscalingInstrumentation(MetricsBase): buffer_machines_pools_metrics: BufferPoolsMetrics = field(init=False) def __post_init__(self) -> None: - self.cluster_metrics = ClusterMetrics(subsystem=self.subsystem) - self.ec2_client_metrics = EC2ClientMetrics(subsystem=self.subsystem) - self.buffer_machines_pools_metrics = BufferPoolsMetrics( - subsystem=self.subsystem + self.cluster_metrics = ClusterMetrics( # pylint: disable=unexpected-keyword-arg + subsystem=self.subsystem, registry=self.registry + ) + self.ec2_client_metrics = ( + EC2ClientMetrics( # pylint: disable=unexpected-keyword-arg + subsystem=self.subsystem, registry=self.registry + ) + ) + self.buffer_machines_pools_metrics = ( + BufferPoolsMetrics( # pylint: disable=unexpected-keyword-arg + subsystem=self.subsystem, registry=self.registry + ) ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_utils.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_utils.py index 2d991b71cc7..8f80b1f05e8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_utils.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_utils.py @@ -3,7 +3,7 @@ from dataclasses import dataclass, field from aws_library.ec2._models import EC2InstanceData -from prometheus_client import Gauge +from prometheus_client import CollectorRegistry, Gauge from ._constants import METRICS_NAMESPACE @@ -27,9 +27,11 @@ def update_from_instances(self, instances: Iterable[EC2InstanceData]) -> None: def create_gauge( + *, field_name: str, definition: tuple[str, tuple[str, ...]], subsystem: str, + registry: CollectorRegistry, ) -> TrackedGauge: description, labelnames = definition return TrackedGauge( @@ -39,5 +41,6 @@ def create_gauge( labelnames=labelnames, namespace=METRICS_NAMESPACE, subsystem=subsystem, + registry=registry, ) ) diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 97e709c2dba..b705ea85b78 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -378,11 +378,17 @@ def enabled_rabbitmq( return rabbit_service +_LIFESPAN_TIMEOUT: Final[int] = 10 + + @pytest.fixture async def initialized_app(app_environment: EnvVarsDict) -> AsyncIterator[FastAPI]: settings = ApplicationSettings.create_from_envs() app = create_app(settings) - async with LifespanManager(app): + # NOTE: the timeout is sometime too small for CI machines, and even larger machines + async with LifespanManager( + app, startup_timeout=_LIFESPAN_TIMEOUT, shutdown_timeout=_LIFESPAN_TIMEOUT + ): yield app diff --git a/services/autoscaling/tests/unit/test_modules_instrumentation_utils.py b/services/autoscaling/tests/unit/test_modules_instrumentation_utils.py index f72fa262a97..31a19701f8e 100644 --- a/services/autoscaling/tests/unit/test_modules_instrumentation_utils.py +++ b/services/autoscaling/tests/unit/test_modules_instrumentation_utils.py @@ -2,6 +2,7 @@ from typing import TypedDict from aws_library.ec2._models import EC2InstanceData +from prometheus_client import CollectorRegistry from prometheus_client.metrics import MetricWrapperBase from simcore_service_autoscaling.modules.instrumentation._constants import ( EC2_INSTANCE_LABELS, @@ -40,10 +41,12 @@ def test_update_gauge_sets_old_entries_to_0( fake_ec2_instance_data: Callable[..., EC2InstanceData] ): # Create a Gauge with example labels + registry = CollectorRegistry() tracked_gauge = create_gauge( - "example_gauge", + field_name="example_gauge", definition=("An example gauge", EC2_INSTANCE_LABELS), subsystem="whatever", + registry=registry, ) ec2_instance_type_1 = fake_ec2_instance_data() diff --git a/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_models.py b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_models.py index 5a8f692a124..7407885af31 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_models.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/instrumentation/_models.py @@ -3,12 +3,11 @@ from prometheus_client import CollectorRegistry, Histogram from pydantic import ByteSize, parse_obj_as -from servicelib.instrumentation import get_metrics_namespace +from servicelib.instrumentation import MetricsBase, get_metrics_namespace from ..._meta import PROJECT_NAME -_NAMESPACE_METRICS: Final[str] = get_metrics_namespace(PROJECT_NAME) -_SUBSYSTEM_NAME: Final[str] = "dynamic_services" +_METRICS_NAMESPACE: Final[str] = get_metrics_namespace(PROJECT_NAME) _INSTRUMENTATION_LABELS: Final[tuple[str, ...]] = ( "user_id", "wallet_id", @@ -31,7 +30,7 @@ ) -_BUCKETS_RATE_BPS: Final[tuple[float, ...]] = tuple( +_RATE_BPS_BUCKETS: Final[tuple[float, ...]] = tuple( parse_obj_as(ByteSize, f"{m}MiB") for m in ( 1, @@ -50,8 +49,7 @@ @dataclass(slots=True, kw_only=True) -class DynamiSidecarMetrics: - +class DynamiSidecarMetrics(MetricsBase): start_time_duration: Histogram = field(init=False) stop_time_duration: Histogram = field(init=False) pull_user_services_images_duration: Histogram = field(init=False) @@ -69,69 +67,79 @@ class DynamiSidecarMetrics: def __post_init__(self) -> None: self.start_time_duration = Histogram( "start_time_duration_seconds", - "time to start dynamic-sidecar", + "time to start dynamic service (from start request in dv-2 till service containers are in running state (healthy))", labelnames=_INSTRUMENTATION_LABELS, - namespace=_NAMESPACE_METRICS, + namespace=_METRICS_NAMESPACE, buckets=_BUCKETS_TIME_S, - subsystem=_SUBSYSTEM_NAME, + subsystem=self.subsystem, + registry=self.registry, ) self.stop_time_duration = Histogram( "stop_time_duration_seconds", - "time to stop dynamic-sidecar", + "time to stop dynamic service (from stop request in dv-2 till all allocated resources (services + dynamic-sidecar) are removed)", labelnames=_INSTRUMENTATION_LABELS, - namespace=_NAMESPACE_METRICS, + namespace=_METRICS_NAMESPACE, buckets=_BUCKETS_TIME_S, - subsystem=_SUBSYSTEM_NAME, + subsystem=self.subsystem, + registry=self.registry, ) self.pull_user_services_images_duration = Histogram( "pull_user_services_images_duration_seconds", "time to pull docker images", labelnames=_INSTRUMENTATION_LABELS, - namespace=_NAMESPACE_METRICS, - buckets=_BUCKETS_RATE_BPS, - subsystem=_SUBSYSTEM_NAME, + namespace=_METRICS_NAMESPACE, + buckets=_RATE_BPS_BUCKETS, + subsystem=self.subsystem, + registry=self.registry, ) self.output_ports_pull_rate = Histogram( "output_ports_pull_rate_bps", "rate at which output ports were pulled", labelnames=_INSTRUMENTATION_LABELS, - namespace=_NAMESPACE_METRICS, - buckets=_BUCKETS_RATE_BPS, - subsystem=_SUBSYSTEM_NAME, + namespace=_METRICS_NAMESPACE, + buckets=_RATE_BPS_BUCKETS, + subsystem=self.subsystem, + registry=self.registry, ) self.input_ports_pull_rate = Histogram( "input_ports_pull_rate_bps", "rate at which input ports were pulled", labelnames=_INSTRUMENTATION_LABELS, - namespace=_NAMESPACE_METRICS, - buckets=_BUCKETS_RATE_BPS, - subsystem=_SUBSYSTEM_NAME, + namespace=_METRICS_NAMESPACE, + buckets=_RATE_BPS_BUCKETS, + subsystem=self.subsystem, + registry=self.registry, ) self.pull_service_state_rate = Histogram( "pull_service_state_rate_bps", "rate at which service states were recovered", labelnames=_INSTRUMENTATION_LABELS, - namespace=_NAMESPACE_METRICS, - buckets=_BUCKETS_RATE_BPS, - subsystem=_SUBSYSTEM_NAME, + namespace=_METRICS_NAMESPACE, + buckets=_RATE_BPS_BUCKETS, + subsystem=self.subsystem, + registry=self.registry, ) self.push_service_state_rate = Histogram( "push_service_state_rate_bps", "rate at which service states were saved", labelnames=_INSTRUMENTATION_LABELS, - namespace=_NAMESPACE_METRICS, - buckets=_BUCKETS_RATE_BPS, - subsystem=_SUBSYSTEM_NAME, + namespace=_METRICS_NAMESPACE, + buckets=_RATE_BPS_BUCKETS, + subsystem=self.subsystem, + registry=self.registry, ) @dataclass(slots=True, kw_only=True) class DirectorV2Instrumentation: registry: CollectorRegistry - dynamic_sidecar_metrics: DynamiSidecarMetrics = field(init=False) def __post_init__(self) -> None: - self.dynamic_sidecar_metrics = DynamiSidecarMetrics() + self.dynamic_sidecar_metrics = ( + DynamiSidecarMetrics( # pylint: disable=unexpected-keyword-arg + subsystem="dynamic_services", registry=self.registry + ) + )