Skip to content

Commit

Permalink
fixup Metrics issues
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Jun 13, 2024
1 parent 13abf60 commit c8fa58d
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 26 deletions.
1 change: 1 addition & 0 deletions karapace/base_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = No

def close(self) -> None:
self.sentry_client.close()

38 changes: 33 additions & 5 deletions karapace/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
"""
from __future__ import annotations

import threading
from typing import Optional

from karapace.base_stats import StatsClient
from karapace.config import Config
from karapace.prometheus import PrometheusClient
from karapace.statsd import StatsdClient

import threading


class MetricsException(Exception):
pass
Expand All @@ -36,7 +37,7 @@ class Metrics(metaclass=Singleton):
def __init__(
self,
) -> None:
self.stats_client = None
self.stats_client = Optional[StatsClient]
self.is_ready = False
self.lock = threading.Lock()
self.request_size_total = 0
Expand All @@ -46,7 +47,6 @@ def setup(self, config: Config) -> None:
with self.lock:
if self.is_ready:
return

if not config.get("metrics_extended"):
return
stats_service = config.get("stats_service")
Expand Down Expand Up @@ -88,6 +88,34 @@ def latency(self, latency_ms: float) -> None:
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge(metric, value, tags)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase(metric, inc_value, tags)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing(metric, value, tags)

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.unexpected_exception(ex, where, tags)

def error(self) -> None:
if not self.is_ready or self.stats_client is None:
return
Expand All @@ -96,7 +124,7 @@ def error(self) -> None:
self.stats_client.increase("error-total", 1)

def cleanup(self) -> None:
if self.stats_client:
if self.stats_client and isinstance(self.stats_client, StatsClient):
self.stats_client.close()

if not self.is_ready:
Expand Down
36 changes: 17 additions & 19 deletions karapace/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from prometheus_client import Counter, Gauge, REGISTRY, Summary
from prometheus_client.exposition import make_wsgi_app
from socketserver import ThreadingMixIn
from typing import Final
from typing import Final, Any, Union, Tuple
from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer

import logging
Expand Down Expand Up @@ -44,11 +44,12 @@ class _SilentHandler(WSGIRequestHandler):
"""WSGI handler that does not log requests."""

# pylint: disable=W0622
def log_message(self, format, *args):

def log_message(self, format:str, *args: Any) -> None:
"""Log nothing."""


def get_family(address, port):
def get_family(address: Union[bytes, str, None], port: Union[int, str, None]) -> Tuple[socket.AddressFamily, str]:
infos = socket.getaddrinfo(address, port)
family, _, _, _, sockaddr = next(iter(infos))
return family, sockaddr[0]
Expand All @@ -60,7 +61,6 @@ class PrometheusClient(StatsClient):
def __init__(self, config: Config) -> None:
super().__init__(config)
self.lock = threading.Lock()
self.httpd = None
self.thread = None
with self.lock:
_host = config.get("prometheus_host", None)
Expand All @@ -71,9 +71,17 @@ def __init__(self, config: Config) -> None:
raise PrometheusException("prometheus_host port is undefined")
if not PrometheusClient.server_is_active:
# We wrapped httpd server creation from prometheus client to allow stop this server"""
self.start_server(_host, _port)

class TmpServer(ThreadingWSGIServer):
pass

TmpServer.address_family, addr = get_family(_host, _port)
app = make_wsgi_app(REGISTRY)
self.httpd = make_server(addr, _port, app, TmpServer, handler_class=_SilentHandler)
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
PrometheusClient.server_is_active = True

else:
raise PrometheusException("Double instance of Prometheus interface")
self._gauge: dict[str, Gauge] = dict()
Expand Down Expand Up @@ -101,23 +109,13 @@ def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
self._summary[metric] = m
m.observe(value)

def start_server(self, addr: str, port: int) -> None:
class TmpServer(ThreadingWSGIServer):
pass

TmpServer.address_family, addr = get_family(addr, port)
app = make_wsgi_app(REGISTRY)
self.httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler)
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()

def stop_server(self) -> None:
self.httpd.shutdown()
self.httpd.server_close()
self.thread.join()
if isinstance( self.thread, threading.Thread):
self.thread.join()

def close(self):
def close(self) -> None:
with self.lock:
if self.server_is_active:
self.stop_server()
Expand Down
2 changes: 1 addition & 1 deletion karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def __init__(
self.app = self._create_aiohttp_application(config=config)
self.log = logging.getLogger(self.app_name)
Metrics().setup(config)
self.stats = Metrics().stats_client
self.stats = Metrics()
self.app.on_shutdown.append(self.close_by_app)
self.not_ready_handler = not_ready_handler

Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(
self.consumer: KafkaConsumer | None = None
self._offset_watcher = offset_watcher
Metrics().setup(config=config)
self.stats = Metrics().stats_client
self.stats = Metrics()

# Thread synchronization objects
# - offset is used by the REST API to wait until this thread has
Expand Down

0 comments on commit c8fa58d

Please sign in to comment.