Skip to content

Commit

Permalink
prometheus support
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Sep 6, 2023
1 parent c495c50 commit 6fb96d0
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 75 deletions.
6 changes: 5 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000,
"stats_service": "statsd",
"metrics_extended": true,
"statsd_host": "127.0.0.1",
"statsd_port": 8125
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,

}
57 changes: 57 additions & 0 deletions karapace/base_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - basestats
Supports base class for statsd and prometheus protocols:
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import contextmanager
from karapace.config import Config
from karapace.sentry import get_sentry_client
from typing import Final, Iterator

import time


class StatsClient(ABC):
@abstractmethod
def __init__(
self,
config: Config,
) -> None:
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))

@contextmanager
def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]:
start_time = time.monotonic()
yield
self.timing(metric, time.monotonic() - start_time, tags)

@abstractmethod
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

@abstractmethod
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
pass

@abstractmethod
def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
all_tags = {
"exception": ex.__class__.__name__,
"where": where,
}
all_tags.update(tags or {})
self.increase("exception", tags=all_tags)
scope_args = {**(tags or {}), "where": where}
self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args)

def close(self) -> None:
self.sentry_client.close()
6 changes: 6 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ class Config(TypedDict):
karapace_registry: bool
master_election_strategy: str
protobuf_runtime_directory: str
stats_service: str
metrics_extended: bool
statsd_host: str
statsd_port: int
prometheus_host: str | None
prometheus_port: int | None

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -147,9 +150,12 @@ class ConfigDefaults(Config, total=False):
"karapace_registry": False,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"stats_service": "statsd",
"metrics_extended": True,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
57 changes: 34 additions & 23 deletions karapace/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from karapace.statsd import StatsClient
from karapace.prometheus import PrometheusClient
from karapace.statsd import StatsdClient

import os
import psutil
Expand All @@ -20,6 +22,10 @@
import time


class MetricsException(Exception):
pass


class Singleton(type):
_instance: Singleton | None = None

Expand All @@ -31,68 +37,71 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton:


class Metrics(metaclass=Singleton):
def __init__(self) -> None:
self.active = False
self.stats_client: StatsClient | None = None
stats_client: StatsClient

def __init__(
self,
) -> None:
self.is_ready = False
self.stop_event = threading.Event()
self.worker_thread = threading.Thread(target=self.worker)
self.lock = threading.Lock()

def setup(self, stats_client: StatsClient, config: Config) -> None:
self.active = config.get("metrics_extended") or False
if not self.active:
return
def setup(self, config: Config) -> None:
with self.lock:
if self.is_ready:
return
self.is_ready = True
if not self.stats_client:
self.stats_client = stats_client
else:
self.active = False
return

schedule.every(10).seconds.do(self.connections)
self.worker_thread.start()
stats_service = config.get("stats_service")
if not config.get("metrics_extended"):
return
if stats_service == "statsd":
self.stats_client = StatsdClient(config=config)
elif stats_service == "prometheus":
self.stats_client = PrometheusClient(config=config)
else:
raise MetricsException('Config variable "stats_service" is not defined')
self.is_ready = True
schedule.every(10).seconds.do(self.connections)
self.worker_thread.start()

def request(self, size: int) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("request-size", size)

def response(self, size: int) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("response-size", size)

def are_we_master(self, is_master: bool) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def error(self) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("error_total", 1)

def connections(self) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
Expand All @@ -112,7 +121,9 @@ def worker(self) -> None:
time.sleep(1)

def cleanup(self) -> None:
if not self.active:
if self.stats_client:
self.stats_client.close()
if not self.is_ready:
return
self.stop_event.set()
if self.worker_thread.is_alive():
Expand Down
69 changes: 69 additions & 0 deletions karapace/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
karapace - prometheus
Supports telegraf's statsd protocol extension for 'key=value' tags:
https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from prometheus_client import Counter, Gauge, start_http_server, Summary
from typing import Final

import logging

LOG = logging.getLogger(__name__)
HOST: Final = "127.0.0.1"
PORT: Final = 8005


class PrometheusException(Exception):
pass


class PrometheusClient(StatsClient):
server_is_active = False

def __init__(self, config: Config, host: str = HOST, port: int = PORT) -> None:
super().__init__(config)

_host = config.get("prometheus_host") if "prometheus_host" in config else host
_port = config.get("prometheus_port") if "prometheus_port" in config else port
if _host is None:
raise PrometheusException("prometheus_host host is undefined")
if _port is None:
raise PrometheusException("prometheus_host port is undefined")
if not self.server_is_active:
start_http_server(_port, _host)
self.server_is_active = True
else:
raise PrometheusException("Double instance of Prometheus interface")
self._gauge: dict[str, Gauge] = dict()
self._summary: dict[str, Summary] = dict()
self._counter: dict[str, Counter] = dict()

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._gauge.get(metric)
if m is None:
m = Gauge(metric, metric)
self._gauge[metric] = m
m.set(value)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
m = self._counter.get(metric)
if m is None:
m = Counter(metric, metric)
self._counter[metric] = m
m.inc(inc_value)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._summary.get(metric)
if m is None:
m = Summary(metric, metric)
self._summary[metric] = m
m.observe(value)
6 changes: 2 additions & 4 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from http import HTTPStatus
from karapace.config import Config, create_server_ssl_context
from karapace.metrics import Metrics
from karapace.statsd import StatsClient
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
from typing import Callable, Dict, NoReturn, Optional, overload, Union
Expand Down Expand Up @@ -169,10 +168,10 @@ def __init__(
self.app_request_metric = f"{app_name}_request"
self.app = self._create_aiohttp_application(config=config)
self.log = logging.getLogger(self.app_name)
self.stats = StatsClient(config=config)
Metrics().setup(config)
self.stats = Metrics().stats_client
self.app.on_cleanup.append(self.close_by_app)
self.not_ready_handler = not_ready_handler
Metrics().setup(self.stats, config)

def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application:
return aiohttp.web.Application(client_max_size=config["http_request_max_size"])
Expand All @@ -188,7 +187,6 @@ async def close(self) -> None:
created by the aiohttp library.
"""
Metrics().cleanup()
self.stats.close()

@staticmethod
def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument
Expand Down
5 changes: 3 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
from karapace.in_memory_database import InMemoryDatabase
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
from karapace.master_coordinator import MasterCoordinator
from karapace.metrics import Metrics
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Event, Thread
Expand Down Expand Up @@ -127,7 +127,8 @@ def __init__(
self.topic_replication_factor = self.config["replication_factor"]
self.consumer: KafkaConsumer | None = None
self._offset_watcher = offset_watcher
self.stats = StatsClient(config=config)
Metrics().setup(config=config)
self.stats = Metrics().stats_client

# Thread synchronization objects
# - offset is used by the REST API to wait until this thread has
Expand Down
Loading

0 comments on commit 6fb96d0

Please sign in to comment.