From 571660e6c5bb1ffbd6256b9765326343eca155ec Mon Sep 17 00:00:00 2001 From: James_David Date: Tue, 9 Jul 2024 21:03:42 +0100 Subject: [PATCH] [FEATURE] Added Prometheus to export receiver up statues, executed and failed messages count --- poetry.lock | 18 +- pyproject.toml | 1 + src/QAT_RPC/qat_rpc/utils/__init__.py | 0 src/QAT_RPC/qat_rpc/utils/constants.py | 8 + src/QAT_RPC/qat_rpc/utils/metrics.py | 210 ++++++++++++++++++++++++ src/QAT_RPC/qat_rpc/zmq/receiver.py | 6 +- src/QAT_RPC/qat_rpc/zmq/wrappers.py | 19 ++- src/tests/utils/test_metric_exporter.py | 131 +++++++++++++++ src/tests/zmq/test_wrappers.py | 6 +- 9 files changed, 393 insertions(+), 6 deletions(-) create mode 100644 src/QAT_RPC/qat_rpc/utils/__init__.py create mode 100644 src/QAT_RPC/qat_rpc/utils/constants.py create mode 100644 src/QAT_RPC/qat_rpc/utils/metrics.py create mode 100644 src/tests/utils/test_metric_exporter.py diff --git a/poetry.lock b/poetry.lock index 6b50690..51ce255 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "autoflake" @@ -1137,6 +1137,20 @@ nodeenv = ">=0.11.1" pyyaml = ">=5.1" virtualenv = ">=20.10.0" +[[package]] +name = "prometheus-client" +version = "0.20.0" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.20.0-py3-none-any.whl", hash = "sha256:cde524a85bce83ca359cc837f28b8c0db5cac7aa653a588fd7e84ba061c329e7"}, + {file = "prometheus_client-0.20.0.tar.gz", hash = "sha256:287629d00b147a32dcb2be0b9df905da599b2d82f80377083ec8463309a4bb89"}, +] + +[package.extras] +twisted = ["twisted"] + [[package]] name = "psutil" version = "6.0.0" @@ -2079,4 +2093,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.11" -content-hash = "71e1ed260184a5a20f1d1ae07035e59106f7866bf94f16bf9b4f468f15094bea" +content-hash = "cd681efac42ae329868d8c8dcf164f16e0b8570135b2e8dc03041c0da6f291a9" diff --git a/pyproject.toml b/pyproject.toml index 7a220de..c83e385 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ packages = [ python = ">=3.8.1,<3.11" qat-compiler = "^1.1.0" pyzmq = "^25.1.0" +prometheus-client="^0.20.0" [tool.poetry.group.dev.dependencies] coverage = "^6.3.2" diff --git a/src/QAT_RPC/qat_rpc/utils/__init__.py b/src/QAT_RPC/qat_rpc/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/QAT_RPC/qat_rpc/utils/constants.py b/src/QAT_RPC/qat_rpc/utils/constants.py new file mode 100644 index 0000000..af1248c --- /dev/null +++ b/src/QAT_RPC/qat_rpc/utils/constants.py @@ -0,0 +1,8 @@ +# constants.py + +"""This module defines project-level constants.""" +STARTING = "starting" +RUNNING = "running" +STOPPED = "stopped" + +PROMETHEUS_PORT = 9150 diff --git a/src/QAT_RPC/qat_rpc/utils/metrics.py b/src/QAT_RPC/qat_rpc/utils/metrics.py new file mode 100644 index 0000000..c7d4e14 --- /dev/null +++ b/src/QAT_RPC/qat_rpc/utils/metrics.py @@ -0,0 +1,210 @@ +import abc +import inspect +import re +from inspect import getmembers, ismethod +from typing import Callable, Union + +from prometheus_client import Counter, Gauge, start_http_server +from qat.purr.utils.logger import get_default_logger + +from qat_rpc.utils.constants import PROMETHEUS_PORT + +log = get_default_logger() + + +class IncrementMutableOutcome: + def __init__(self): + self._count: float = 0.0 + + def increment(self, amount: float = 1.0): + self._count += amount + + def __float__(self): + return self._count + + def __int__(self): + return int(self._count) + + +class BinaryMutableOutcome: + def __init__(self): + self._success: bool = None + + def succeed(self): + # Failure is sticky + if self._success is None or self._success: + self._success = True + + def fail(self): + self._success = False + + def __int__(self): + return int(self._success) + + def __float__(self): + return 1.0 if self._success else 0.0 + + def __eq__(self, other): + if isinstance(other, BinaryMutableOutcome): + return other is not None and other._success == self._success + if isinstance(other, bool): + return (self is not None and not other) or (other == self._success) + + +class ReceiverBackend(abc.ABC): + @abc.abstractmethod + def receiver_status(self, outcome: BinaryMutableOutcome): ... + + @abc.abstractmethod + def failed_messages(self, outcome: IncrementMutableOutcome): ... + + @abc.abstractmethod + def executed_messages(self, outcome: IncrementMutableOutcome): ... + + @abc.abstractmethod + def hardware_connected(self, outcome: BinaryMutableOutcome): ... + + @abc.abstractmethod + def hardware_reloaded(self, outcome: BinaryMutableOutcome): ... + + +class NullReceiverBackend(ReceiverBackend): + def __init__(self): + super(NullReceiverBackend, self).__init__() + + def receiver_status(self, outcome: BinaryMutableOutcome): ... + + def failed_messages(self, outcome: IncrementMutableOutcome): ... + + def executed_messages(self, outcome: IncrementMutableOutcome): ... + + def hardware_connected(self, outcome: BinaryMutableOutcome): ... + + def hardware_reloaded(self, outcome: BinaryMutableOutcome): ... + + +class PrometheusReceiver(ReceiverBackend): + def __init__(self, port: int): + super(PrometheusReceiver, self).__init__() + start_http_server(port=PROMETHEUS_PORT) + log.info(f"Using Prometheus on port {port}") + + self._receiver_status = Gauge( + "receiver_status", "Measure the Receiver backend up state" + ) + self._failed_messages = Counter("failed_messages", "messages failure counter") + self._executed_messages = Counter("executed_messages", "messages executed counter") + self._hardware_connected_status = Gauge( + "hardware_connected_status", "Indicate connected status of live hardware" + ) + self._hardware_reloaded_status = Gauge( + "hardware_reloaded_status", + "Indicate if hardware reload from calibration succeeded or failed", + ) + + def receiver_status(self, outcome: BinaryMutableOutcome): + self._receiver_status.set(outcome) + + def failed_messages(self, outcome: IncrementMutableOutcome): + self._failed_messages.inc(float(outcome)) + + def executed_messages(self, outcome: IncrementMutableOutcome): + self._executed_messages.inc(float(outcome)) + + def hardware_connected(self, outcome: BinaryMutableOutcome): + self._hardware_connected_status.set(outcome) + + def hardware_reloaded(self, outcome: BinaryMutableOutcome): + self._hardware_reloaded_status.set(outcome) + + +class ReceiverAdapter(ReceiverBackend): + """ + Presents a normal derived type to the MetricsExporter which is required for + introspection as part of it's dynamic build. When using Mocks, which replace + functions with variables, it is required to first wrap those in Adapters. + """ + + def __init__(self, decorated): + self.decorated = decorated + + def receiver_status(self, outcome: BinaryMutableOutcome): + self.decorated.receiver_status(outcome) + + def failed_messages(self, outcome: IncrementMutableOutcome): + self.decorated.failed_messages(outcome) + + def executed_messages(self, outcome: IncrementMutableOutcome): + self.decorated.executed_messages(outcome) + + def hardware_connected(self, outcome: BinaryMutableOutcome): + self.decorated.hardware_connected(outcome) + + def hardware_reloaded(self, outcome: BinaryMutableOutcome): + self.decorated.hardware_reloaded(outcome) + + +class MetricFieldWrapper: + def __init__( + self, + func: Callable, + outcome: Union[IncrementMutableOutcome, BinaryMutableOutcome], + ): + self.callable = func + self.outcome = outcome + + def __enter__(self): + # Loan out the mutable outcome for change + return self.outcome + + def __exit__(self, exc_type, exc_val, exc_tb): + # Apply the outcome to target callable + try: + self.callable(self.outcome) + except Exception as ex: + log.warning(f"Metric setting errored {str(ex)}") + + +class MetricExporter: + """ + Factory for context managers for metrics reporting over different + backend reporting frameworks + + Dynamically builds member functions that mirror the backend it is given. + The Backend is the API specification. + """ + + def __init__(self, backend): + if isinstance(backend, type): + raise ValueError("Argument must be an instance not a type") + methods = [ + member + for member in getmembers(backend, predicate=ismethod) + if not re.match("^_", member[0]) + ] + + def decorate(f): + # factory function avoids closure issues + try: + sign = inspect.signature(f) + parameters = sign.parameters + outcome_param = parameters.get("outcome") + outcome_type = outcome_param.annotation + return lambda: MetricFieldWrapper(f, outcome_type()) + except KeyError as ex: + log.error( + f" An error occurred while processing " + f"the function's outcome parameter {ex}" + ) + raise ex + + for func_name, func in methods: + nargs = func.__code__.co_argcount + if not nargs == 2: + raise ValueError( + f"Receiver must have one non-self argument on each " + f"function. Found {nargs} while evaluating {func_name}" + f" on {str(type(backend))}" + ) + # build no-arg setting functions on this matching backend public name + setattr(self, func_name, decorate(func)) diff --git a/src/QAT_RPC/qat_rpc/zmq/receiver.py b/src/QAT_RPC/qat_rpc/zmq/receiver.py index cc353eb..e3d2b81 100644 --- a/src/QAT_RPC/qat_rpc/zmq/receiver.py +++ b/src/QAT_RPC/qat_rpc/zmq/receiver.py @@ -5,6 +5,8 @@ from qat.purr.compiler.devices import Calibratable from qat.purr.utils.logger import get_default_logger +from qat_rpc.utils.constants import PROMETHEUS_PORT +from qat_rpc.utils.metrics import MetricExporter, PrometheusReceiver from qat_rpc.zmq.wrappers import ZMQServer log = get_default_logger() @@ -25,6 +27,7 @@ def _sigterm(self, *args): if __name__ == "__main__": hw = None + metric_exporter = MetricExporter(backend=PrometheusReceiver(PROMETHEUS_PORT)) if (calibration_file := os.getenv("TOSHIKO_CAL")) is not None: calibration_file = Path(calibration_file) if not calibration_file.is_absolute() and not calibration_file.is_file(): @@ -34,7 +37,8 @@ def _sigterm(self, *args): log.info(f"Loading: {calibration_file} ") hw = Calibratable.load_calibration_from_file(str(calibration_file)) log.debug("Loaded") - receiver = ZMQServer(hardware=hw) + + receiver = ZMQServer(hardware=hw, metric_exporter=metric_exporter) gk = GracefulKill(receiver) log.info(f"Starting receiver with {type(receiver._hardware)} hardware.") diff --git a/src/QAT_RPC/qat_rpc/zmq/wrappers.py b/src/QAT_RPC/qat_rpc/zmq/wrappers.py index 0c7497a..6eda944 100644 --- a/src/QAT_RPC/qat_rpc/zmq/wrappers.py +++ b/src/QAT_RPC/qat_rpc/zmq/wrappers.py @@ -1,5 +1,5 @@ from time import time -from typing import Union +from typing import Optional, Union import zmq from qat.purr.backends.echo import get_default_echo_hardware @@ -8,6 +8,8 @@ from qat.purr.compiler.runtime import get_runtime from qat.qat import execute_with_metrics +from qat_rpc.utils.metrics import MetricExporter + class ZMQBase: def __init__(self, socket_type: zmq.SocketType): @@ -55,8 +57,13 @@ def __del__(self): class ZMQServer(ZMQBase): - def __init__(self, hardware: QuantumHardwareModel = None): + def __init__( + self, + hardware: Optional[QuantumHardwareModel] = None, + metric_exporter: Optional[MetricExporter] = None, + ): super().__init__(zmq.REP) + self._metric = metric_exporter self._socket.bind(self.address) self._hardware = hardware or get_default_echo_hardware(qubit_count=32) self._engine = get_runtime(self._hardware).engine @@ -68,6 +75,8 @@ def address(self): def run(self): self._running = True + with self._metric.receiver_status() as metric: + metric.succeed() while self._running and not self._socket.closed: msg = self._check_recieved() if msg is not None: @@ -76,12 +85,18 @@ def run(self): config = CompilerConfig.create_from_json(msg[1]) result, metrics = execute_with_metrics(program, self._engine, config) reply = {"results": result, "execution_metrics": metrics} + with self._metric.executed_messages() as executed: + executed.increment() except Exception as e: reply = {"Exception": repr(e)} + with self._metric.failed_messages() as failed: + failed.increment() self._send(reply) def stop(self): self._running = False + with self._metric.receiver_status() as metric: + metric.fail() class ZMQClient(ZMQBase): diff --git a/src/tests/utils/test_metric_exporter.py b/src/tests/utils/test_metric_exporter.py new file mode 100644 index 0000000..2f60b2e --- /dev/null +++ b/src/tests/utils/test_metric_exporter.py @@ -0,0 +1,131 @@ +import abc +from unittest.mock import Mock + +import pytest + +from qat_rpc.utils.metrics import ( + BinaryMutableOutcome, + IncrementMutableOutcome, + MetricExporter, +) + + +class Backend(abc.ABC): + @abc.abstractmethod + def report(self, outcome: BinaryMutableOutcome): ... + + +class BackendAdapter(Backend): + def __init__(self, decorated): + super(BackendAdapter, self).__init__() + self.decorated = decorated + + def report(self, outcome: BinaryMutableOutcome): + self.decorated.report(outcome) + + +@pytest.fixture +def mock_backend(): + return BackendAdapter(Mock(Backend)) + + +class IncrementBackend(abc.ABC): + @abc.abstractmethod + def report(self, outcome: IncrementMutableOutcome): ... + + +class IncrementBackendAdapter(Backend): + def __init__(self, decorated): + super(IncrementBackendAdapter, self).__init__() + self.decorated = decorated + + def report(self, outcome: IncrementMutableOutcome): + self.decorated.report(outcome) + + +@pytest.fixture +def mock_increment_backend(): + return IncrementBackendAdapter(Mock(IncrementBackend)) + + +def test_metric_exporter_aborts_if_given_type(): + with pytest.raises(ValueError): + MetricExporter(BackendAdapter) # Type not instance + + +def test_metric_exporter_aborts_if_interface_doesnt_match_one_arg_function(): + class BrokenBackendTwoArgFunc: + def report(self, one_arg, extra_arg): ... + + with pytest.raises(ValueError): + MetricExporter(BrokenBackendTwoArgFunc()) + + class BrokenBackendOneArgFunc: + def report(self): ... + + with pytest.raises(ValueError): + MetricExporter(BrokenBackendOneArgFunc()) + + +def test_metric_exporter_context_manager_with_success(mock_backend): + with MetricExporter(backend=mock_backend).report() as metric: + metric.succeed() + + mock_backend.decorated.report.assert_called_once_with(True) + + +def test_metric_exporter_context_manager_with_failure(mock_backend): + with MetricExporter(backend=mock_backend).report() as metric: + metric.fail() + + mock_backend.decorated.report.assert_called_once_with(False) + + +def test_metric_exporter_context_manager_with_default_failure(mock_backend): + with MetricExporter(backend=mock_backend).report(): + ... + + mock_backend.decorated.report.assert_called_once_with(False) + + +def test_metric_exporter_context_manager_with_sticky_false(mock_backend): + with MetricExporter(backend=mock_backend).report() as metric: + metric.fail() + metric.succeed() + + mock_backend.decorated.report.assert_called_once_with(False) + + +def test_increment_metric_exporter_single_increment(mock_increment_backend): + with MetricExporter(backend=mock_increment_backend).report() as metric: + metric.increment() + expected_increment_outcome = mock_increment_backend.decorated.report.call_args[0][0] + assert float(expected_increment_outcome) == 1.0 + mock_increment_backend.decorated.report.assert_called_once_with( + expected_increment_outcome + ) + + +def test_increment_metric_exporter_multiple_increment(mock_increment_backend): + with MetricExporter(backend=mock_increment_backend).report() as metric: + for _ in range(5): + metric.increment() + assert float(mock_increment_backend.decorated.report.call_args[0][0]) == 5.0 + + +def test_implicit_conversion_of_proxy_object(): + outcome = BinaryMutableOutcome() + outcome.succeed() + assert float(outcome) == 1.0 + assert int(outcome) == 1 + + outcome.fail() + assert float(outcome) == 0.0 + assert int(outcome) == 0 + + +def test_increment_outcome(): + outcome = IncrementMutableOutcome() + for i in range(5): + outcome.increment() + assert float(outcome) == 5 diff --git a/src/tests/zmq/test_wrappers.py b/src/tests/zmq/test_wrappers.py index 0b520c0..cc15eb8 100644 --- a/src/tests/zmq/test_wrappers.py +++ b/src/tests/zmq/test_wrappers.py @@ -3,12 +3,16 @@ import pytest from qat.purr.compiler.config import CompilerConfig +from qat_rpc.utils.constants import PROMETHEUS_PORT +from qat_rpc.utils.metrics import MetricExporter, PrometheusReceiver from qat_rpc.zmq.wrappers import ZMQClient, ZMQServer @pytest.fixture(scope="module", autouse=True) def server(): - server = ZMQServer() + server = ZMQServer( + metric_exporter=MetricExporter(backend=PrometheusReceiver(PROMETHEUS_PORT)) + ) server_thread = threading.Thread(target=server.run, daemon=True) server_thread.start()