Skip to content

Commit

Permalink
[FEATURE] Added Prometheus to export receiver up statues, executed an…
Browse files Browse the repository at this point in the history
…d failed messages count
  • Loading branch information
jdavid-oqc committed Jul 23, 2024
1 parent 2d43f85 commit 571660e
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 6 deletions.
18 changes: 16 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ packages = [
python = ">=3.8.1,<3.11"
qat-compiler = "^1.1.0"
pyzmq = "^25.1.0"
prometheus-client="^0.20.0"

[tool.poetry.group.dev.dependencies]
coverage = "^6.3.2"
Expand Down
Empty file.
8 changes: 8 additions & 0 deletions src/QAT_RPC/qat_rpc/utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# constants.py

"""This module defines project-level constants."""
STARTING = "starting"
RUNNING = "running"
STOPPED = "stopped"

PROMETHEUS_PORT = 9150
210 changes: 210 additions & 0 deletions src/QAT_RPC/qat_rpc/utils/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import abc
import inspect
import re
from inspect import getmembers, ismethod
from typing import Callable, Union

from prometheus_client import Counter, Gauge, start_http_server
from qat.purr.utils.logger import get_default_logger

from qat_rpc.utils.constants import PROMETHEUS_PORT

log = get_default_logger()


class IncrementMutableOutcome:
def __init__(self):
self._count: float = 0.0

def increment(self, amount: float = 1.0):
self._count += amount

def __float__(self):
return self._count

def __int__(self):
return int(self._count)


class BinaryMutableOutcome:
def __init__(self):
self._success: bool = None

def succeed(self):
# Failure is sticky
if self._success is None or self._success:
self._success = True

def fail(self):
self._success = False

def __int__(self):
return int(self._success)

def __float__(self):
return 1.0 if self._success else 0.0

def __eq__(self, other):
if isinstance(other, BinaryMutableOutcome):
return other is not None and other._success == self._success
if isinstance(other, bool):
return (self is not None and not other) or (other == self._success)


class ReceiverBackend(abc.ABC):
@abc.abstractmethod
def receiver_status(self, outcome: BinaryMutableOutcome): ...

@abc.abstractmethod
def failed_messages(self, outcome: IncrementMutableOutcome): ...

@abc.abstractmethod
def executed_messages(self, outcome: IncrementMutableOutcome): ...

@abc.abstractmethod
def hardware_connected(self, outcome: BinaryMutableOutcome): ...

@abc.abstractmethod
def hardware_reloaded(self, outcome: BinaryMutableOutcome): ...


class NullReceiverBackend(ReceiverBackend):
def __init__(self):
super(NullReceiverBackend, self).__init__()

def receiver_status(self, outcome: BinaryMutableOutcome): ...

def failed_messages(self, outcome: IncrementMutableOutcome): ...

def executed_messages(self, outcome: IncrementMutableOutcome): ...

def hardware_connected(self, outcome: BinaryMutableOutcome): ...

def hardware_reloaded(self, outcome: BinaryMutableOutcome): ...


class PrometheusReceiver(ReceiverBackend):
def __init__(self, port: int):
super(PrometheusReceiver, self).__init__()
start_http_server(port=PROMETHEUS_PORT)
log.info(f"Using Prometheus on port {port}")

self._receiver_status = Gauge(
"receiver_status", "Measure the Receiver backend up state"
)
self._failed_messages = Counter("failed_messages", "messages failure counter")
self._executed_messages = Counter("executed_messages", "messages executed counter")
self._hardware_connected_status = Gauge(
"hardware_connected_status", "Indicate connected status of live hardware"
)
self._hardware_reloaded_status = Gauge(
"hardware_reloaded_status",
"Indicate if hardware reload from calibration succeeded or failed",
)

def receiver_status(self, outcome: BinaryMutableOutcome):
self._receiver_status.set(outcome)

def failed_messages(self, outcome: IncrementMutableOutcome):
self._failed_messages.inc(float(outcome))

def executed_messages(self, outcome: IncrementMutableOutcome):
self._executed_messages.inc(float(outcome))

def hardware_connected(self, outcome: BinaryMutableOutcome):
self._hardware_connected_status.set(outcome)

def hardware_reloaded(self, outcome: BinaryMutableOutcome):
self._hardware_reloaded_status.set(outcome)


class ReceiverAdapter(ReceiverBackend):
"""
Presents a normal derived type to the MetricsExporter which is required for
introspection as part of it's dynamic build. When using Mocks, which replace
functions with variables, it is required to first wrap those in Adapters.
"""

def __init__(self, decorated):
self.decorated = decorated

def receiver_status(self, outcome: BinaryMutableOutcome):
self.decorated.receiver_status(outcome)

def failed_messages(self, outcome: IncrementMutableOutcome):
self.decorated.failed_messages(outcome)

def executed_messages(self, outcome: IncrementMutableOutcome):
self.decorated.executed_messages(outcome)

def hardware_connected(self, outcome: BinaryMutableOutcome):
self.decorated.hardware_connected(outcome)

def hardware_reloaded(self, outcome: BinaryMutableOutcome):
self.decorated.hardware_reloaded(outcome)


class MetricFieldWrapper:
def __init__(
self,
func: Callable,
outcome: Union[IncrementMutableOutcome, BinaryMutableOutcome],
):
self.callable = func
self.outcome = outcome

def __enter__(self):
# Loan out the mutable outcome for change
return self.outcome

def __exit__(self, exc_type, exc_val, exc_tb):
# Apply the outcome to target callable
try:
self.callable(self.outcome)
except Exception as ex:
log.warning(f"Metric setting errored {str(ex)}")


class MetricExporter:
"""
Factory for context managers for metrics reporting over different
backend reporting frameworks
Dynamically builds member functions that mirror the backend it is given.
The Backend is the API specification.
"""

def __init__(self, backend):
if isinstance(backend, type):
raise ValueError("Argument must be an instance not a type")
methods = [
member
for member in getmembers(backend, predicate=ismethod)
if not re.match("^_", member[0])
]

def decorate(f):
# factory function avoids closure issues
try:
sign = inspect.signature(f)
parameters = sign.parameters
outcome_param = parameters.get("outcome")
outcome_type = outcome_param.annotation
return lambda: MetricFieldWrapper(f, outcome_type())
except KeyError as ex:
log.error(
f" An error occurred while processing "
f"the function's outcome parameter {ex}"
)
raise ex

for func_name, func in methods:
nargs = func.__code__.co_argcount
if not nargs == 2:
raise ValueError(
f"Receiver must have one non-self argument on each "
f"function. Found {nargs} while evaluating {func_name}"
f" on {str(type(backend))}"
)
# build no-arg setting functions on this matching backend public name
setattr(self, func_name, decorate(func))
6 changes: 5 additions & 1 deletion src/QAT_RPC/qat_rpc/zmq/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from qat.purr.compiler.devices import Calibratable
from qat.purr.utils.logger import get_default_logger

from qat_rpc.utils.constants import PROMETHEUS_PORT
from qat_rpc.utils.metrics import MetricExporter, PrometheusReceiver
from qat_rpc.zmq.wrappers import ZMQServer

log = get_default_logger()
Expand All @@ -25,6 +27,7 @@ def _sigterm(self, *args):

if __name__ == "__main__":
hw = None
metric_exporter = MetricExporter(backend=PrometheusReceiver(PROMETHEUS_PORT))
if (calibration_file := os.getenv("TOSHIKO_CAL")) is not None:
calibration_file = Path(calibration_file)
if not calibration_file.is_absolute() and not calibration_file.is_file():
Expand All @@ -34,7 +37,8 @@ def _sigterm(self, *args):
log.info(f"Loading: {calibration_file} ")
hw = Calibratable.load_calibration_from_file(str(calibration_file))
log.debug("Loaded")
receiver = ZMQServer(hardware=hw)

receiver = ZMQServer(hardware=hw, metric_exporter=metric_exporter)
gk = GracefulKill(receiver)

log.info(f"Starting receiver with {type(receiver._hardware)} hardware.")
Expand Down
19 changes: 17 additions & 2 deletions src/QAT_RPC/qat_rpc/zmq/wrappers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from time import time
from typing import Union
from typing import Optional, Union

import zmq
from qat.purr.backends.echo import get_default_echo_hardware
Expand All @@ -8,6 +8,8 @@
from qat.purr.compiler.runtime import get_runtime
from qat.qat import execute_with_metrics

from qat_rpc.utils.metrics import MetricExporter


class ZMQBase:
def __init__(self, socket_type: zmq.SocketType):
Expand Down Expand Up @@ -55,8 +57,13 @@ def __del__(self):


class ZMQServer(ZMQBase):
def __init__(self, hardware: QuantumHardwareModel = None):
def __init__(
self,
hardware: Optional[QuantumHardwareModel] = None,
metric_exporter: Optional[MetricExporter] = None,
):
super().__init__(zmq.REP)
self._metric = metric_exporter
self._socket.bind(self.address)
self._hardware = hardware or get_default_echo_hardware(qubit_count=32)
self._engine = get_runtime(self._hardware).engine
Expand All @@ -68,6 +75,8 @@ def address(self):

def run(self):
self._running = True
with self._metric.receiver_status() as metric:
metric.succeed()
while self._running and not self._socket.closed:
msg = self._check_recieved()
if msg is not None:
Expand All @@ -76,12 +85,18 @@ def run(self):
config = CompilerConfig.create_from_json(msg[1])
result, metrics = execute_with_metrics(program, self._engine, config)
reply = {"results": result, "execution_metrics": metrics}
with self._metric.executed_messages() as executed:
executed.increment()
except Exception as e:
reply = {"Exception": repr(e)}
with self._metric.failed_messages() as failed:
failed.increment()
self._send(reply)

def stop(self):
self._running = False
with self._metric.receiver_status() as metric:
metric.fail()


class ZMQClient(ZMQBase):
Expand Down
Loading

0 comments on commit 571660e

Please sign in to comment.