From c8fa58d6f70c26b343e85cecc2a4265d5af8bad7 Mon Sep 17 00:00:00 2001 From: libretto Date: Thu, 13 Jun 2024 14:39:57 +0300 Subject: [PATCH] fixup Metrics issues --- karapace/base_stats.py | 1 + karapace/metrics.py | 38 +++++++++++++++++++++++++++++++++----- karapace/prometheus.py | 36 +++++++++++++++++------------------- karapace/rapu.py | 2 +- karapace/schema_reader.py | 2 +- 5 files changed, 53 insertions(+), 26 deletions(-) diff --git a/karapace/base_stats.py b/karapace/base_stats.py index b6fe87a0c..ba16a913f 100644 --- a/karapace/base_stats.py +++ b/karapace/base_stats.py @@ -55,3 +55,4 @@ def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = No def close(self) -> None: self.sentry_client.close() + diff --git a/karapace/metrics.py b/karapace/metrics.py index c1ae2554c..497b24f3c 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -10,13 +10,14 @@ """ from __future__ import annotations +import threading +from typing import Optional + from karapace.base_stats import StatsClient from karapace.config import Config from karapace.prometheus import PrometheusClient from karapace.statsd import StatsdClient -import threading - class MetricsException(Exception): pass @@ -36,7 +37,7 @@ class Metrics(metaclass=Singleton): def __init__( self, ) -> None: - self.stats_client = None + self.stats_client = Optional[StatsClient] self.is_ready = False self.lock = threading.Lock() self.request_size_total = 0 @@ -46,7 +47,6 @@ def setup(self, config: Config) -> None: with self.lock: if self.is_ready: return - if not config.get("metrics_extended"): return stats_service = config.get("stats_service") @@ -88,6 +88,34 @@ def latency(self, latency_ms: float) -> None: raise RuntimeError("no StatsClient available") self.stats_client.timing("latency_ms", latency_ms) + def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: + if not self.is_ready or self.stats_client is None: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge(metric, value, tags) + + def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: + if not self.is_ready or self.stats_client is None: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.increase(metric, inc_value, tags) + + def timing(self, metric: str, value: float, tags: dict | None = None) -> None: + if not self.is_ready or self.stats_client is None: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.timing(metric, value, tags) + + def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: + if not self.is_ready or self.stats_client is None: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.unexpected_exception(ex, where, tags) + def error(self) -> None: if not self.is_ready or self.stats_client is None: return @@ -96,7 +124,7 @@ def error(self) -> None: self.stats_client.increase("error-total", 1) def cleanup(self) -> None: - if self.stats_client: + if self.stats_client and isinstance(self.stats_client, StatsClient): self.stats_client.close() if not self.is_ready: diff --git a/karapace/prometheus.py b/karapace/prometheus.py index eb4c530f6..0ae44ece9 100644 --- a/karapace/prometheus.py +++ b/karapace/prometheus.py @@ -15,7 +15,7 @@ from prometheus_client import Counter, Gauge, REGISTRY, Summary from prometheus_client.exposition import make_wsgi_app from socketserver import ThreadingMixIn -from typing import Final +from typing import Final, Any, Union, Tuple from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer import logging @@ -44,11 +44,12 @@ class _SilentHandler(WSGIRequestHandler): """WSGI handler that does not log requests.""" # pylint: disable=W0622 - def log_message(self, format, *args): + + def log_message(self, format:str, *args: Any) -> None: """Log nothing.""" -def get_family(address, port): +def get_family(address: Union[bytes, str, None], port: Union[int, str, None]) -> Tuple[socket.AddressFamily, str]: infos = socket.getaddrinfo(address, port) family, _, _, _, sockaddr = next(iter(infos)) return family, sockaddr[0] @@ -60,7 +61,6 @@ class PrometheusClient(StatsClient): def __init__(self, config: Config) -> None: super().__init__(config) self.lock = threading.Lock() - self.httpd = None self.thread = None with self.lock: _host = config.get("prometheus_host", None) @@ -71,9 +71,17 @@ def __init__(self, config: Config) -> None: raise PrometheusException("prometheus_host port is undefined") if not PrometheusClient.server_is_active: # We wrapped httpd server creation from prometheus client to allow stop this server""" - self.start_server(_host, _port) - + class TmpServer(ThreadingWSGIServer): + pass + + TmpServer.address_family, addr = get_family(_host, _port) + app = make_wsgi_app(REGISTRY) + self.httpd = make_server(addr, _port, app, TmpServer, handler_class=_SilentHandler) + self.thread = threading.Thread(target=self.httpd.serve_forever) + self.thread.daemon = True + self.thread.start() PrometheusClient.server_is_active = True + else: raise PrometheusException("Double instance of Prometheus interface") self._gauge: dict[str, Gauge] = dict() @@ -101,23 +109,13 @@ def timing(self, metric: str, value: float, tags: dict | None = None) -> None: self._summary[metric] = m m.observe(value) - def start_server(self, addr: str, port: int) -> None: - class TmpServer(ThreadingWSGIServer): - pass - - TmpServer.address_family, addr = get_family(addr, port) - app = make_wsgi_app(REGISTRY) - self.httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler) - self.thread = threading.Thread(target=self.httpd.serve_forever) - self.thread.daemon = True - self.thread.start() - def stop_server(self) -> None: self.httpd.shutdown() self.httpd.server_close() - self.thread.join() + if isinstance( self.thread, threading.Thread): + self.thread.join() - def close(self): + def close(self) -> None: with self.lock: if self.server_is_active: self.stop_server() diff --git a/karapace/rapu.py b/karapace/rapu.py index 74e37258b..46d53754d 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -169,7 +169,7 @@ def __init__( self.app = self._create_aiohttp_application(config=config) self.log = logging.getLogger(self.app_name) Metrics().setup(config) - self.stats = Metrics().stats_client + self.stats = Metrics() self.app.on_shutdown.append(self.close_by_app) self.not_ready_handler = not_ready_handler diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 5c3f9ea31..f2e6511cc 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -140,7 +140,7 @@ def __init__( self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher Metrics().setup(config=config) - self.stats = Metrics().stats_client + self.stats = Metrics() # Thread synchronization objects # - offset is used by the REST API to wait until this thread has