From 64c13af6a6175e5e4c1ce5a23b26887ebec4343b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Thu, 12 Sep 2024 06:25:46 +0000 Subject: [PATCH] Add CluClient and spectrograph tools --- pyproject.toml | 4 +- src/lvmopstools/clu.py | 179 +++++++++++++++++++++++++++++++ src/lvmopstools/config.yaml | 4 + src/lvmopstools/devices/specs.py | 85 +++++++++++++++ 4 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 src/lvmopstools/clu.py create mode 100644 src/lvmopstools/devices/specs.py diff --git a/pyproject.toml b/pyproject.toml index 934aa1e..7d41b50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,8 +84,10 @@ branch = true include = ["src/lvmopstools/*"] omit = [ "src/lvmopstools/__main__.py", + "src/lvmopstools/clu.py", "src/lvmopstools/ds9.py", - "src/lvmopstools/utils.py" + "src/lvmopstools/utils.py", + "src/lvmopstools/devices/specs.py" ] [tool.coverage.report] diff --git a/src/lvmopstools/clu.py b/src/lvmopstools/clu.py new file mode 100644 index 0000000..cacab94 --- /dev/null +++ b/src/lvmopstools/clu.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-09-12 +# @Filename: clu.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +import asyncio +import os + +from typing import TYPE_CHECKING, Any, Literal, overload + +from clu.client import AMQPClient + +from lvmopstools import config + + +if TYPE_CHECKING: + from clu.command import Command + + +__all__ = ["CluClient", "send_clu_command"] + + +class CluClient: + """AMQP client asynchronous generator. + + Returns an object with an ``AMQPClient`` instance. The normal way to + use it is to do :: + + async with CluClient() as client: + await client.send_command(...) + + Alternatively one can do :: + + client = await anext(CluClient()) + await client.send_command(...) + + The asynchronous generator differs from the one in ``AMQPClient`` in that + it does not close the connection on exit. + + This class is a singleton, which effectively means the AMQP client is reused + during the life of the worker. The singleton can be cleared by calling + `.clear`. + + """ + + __initialised: bool = False + __instance: CluClient | None = None + + def __new__(cls): + if cls.__instance is None: + cls.__instance = super(CluClient, cls).__new__(cls) + cls.__instance.__initialised = False + + return cls.__instance + + def __init__(self): + if self.__initialised is True: + return + + host: str = os.environ.get("RABBITMQ_HOST", config["rabbitmq.host"]) + port: int = int(os.environ.get("RABBITMQ_PORT", config["rabbitmq.port"])) + + self.client = AMQPClient(host=host, port=port) + self.__initialised = True + + self._lock = asyncio.Lock() + + async def __aenter__(self): + # Small delay to allow the event loop to update the + # connection status if needed. + await asyncio.sleep(0.05) + + async with self._lock: + connection = self.client.connection + connected = connection.connection and not connection.connection.is_closed + closed = hasattr(connection, "channel") and connection.channel.is_closed + + if not connected or closed: + print("reconnecting") + await self.client.start() + + return self.client + + async def __aexit__(self, exc_type, exc, tb): + pass + + async def __anext__(self): + if not self.client.is_connected(): + await self.client.start() + + return self.client + + @classmethod + def clear(cls): + """Clears the current instance.""" + + cls.__instance = None + cls.__initialised = False + + +@overload +async def send_clu_command(command_string: str) -> list[dict[str, Any]]: ... + + +@overload +async def send_clu_command( + command_string: str, + *, + raw: Literal[False], +) -> list[dict[str, Any]]: ... + + +@overload +async def send_clu_command( + command_string: str, + *, + raw: Literal[True], +) -> Command: ... + + +@overload +async def send_clu_command( + command_string: str, + *, + raw: bool, +) -> list[dict[str, Any]] | Command: ... + + +async def send_clu_command( + command_string: str, + *, + raw=False, +) -> list[dict[str, Any]] | Command: + """Sends a command to the actor system and returns a list of replies. + + Parameters + ---------- + command_string + The command to send to the actor. Must include the name of the actor. + raw + If `True`, returns the command. Otherwise returns a list of replies. + + Returns + ------- + replies + A list of replies, each one a dictionary of keyword to value. Empty + replies (e.g., those only changing the status) are not returned. If + ``raw=True``, the CLU command is returned after awaiting for it to + complete or fail. + + Raises + ------ + RuntimeError + If the command fails, times out, or is otherwise not successful. + + """ + + consumer, *rest = command_string.split(" ") + + async with CluClient() as client: + cmd = await client.send_command(consumer, " ".join(rest)) + + if cmd.status.did_succeed: + if raw: + return cmd + + replies: list[dict[str, Any]] = [] + for reply in cmd.replies: + if len(reply.message) == 0: + continue + replies.append(reply.message) + return replies + + raise RuntimeError(f"Command {command_string!r} failed.") diff --git a/src/lvmopstools/config.yaml b/src/lvmopstools/config.yaml index 627a11c..d45102e 100644 --- a/src/lvmopstools/config.yaml +++ b/src/lvmopstools/config.yaml @@ -1,3 +1,7 @@ +rabbitmq: + host: 10.8.38.21 + port: 5672 + devices: thermistors: host: 10.8.38.180 diff --git a/src/lvmopstools/devices/specs.py b/src/lvmopstools/devices/specs.py new file mode 100644 index 0000000..66482a2 --- /dev/null +++ b/src/lvmopstools/devices/specs.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# @Author: José Sánchez-Gallego (gallegoj@uw.edu) +# @Date: 2024-09-12 +# @Filename: specs.py +# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause) + +from __future__ import annotations + +import asyncio + +from typing import Literal, get_args + +from lvmopstools.clu import CluClient + + +__all__ = ["get_spectrograph_temperature_label", "get_spectrograph_temperatures"] + + +Spectrographs = Literal["sp1", "sp2", "sp3"] +Cameras = Literal["r", "b", "z"] +Sensors = Literal["ccd", "ln2"] + + +def get_spectrograph_temperature_label(camera: str, sensor: str = "ccd"): + """Returns the archon label associated with a temperature sensor.""" + + if sensor == "ccd": + if camera == "r": + return "mod2/tempa" + elif camera == "b": + return "mod12/tempc" + elif camera == "z": + return "mod12/tempa" + + else: + if camera == "r": + return "mod2/tempb" + elif camera == "b": + return "mod2/tempc" + elif camera == "z": + return "mod12/tempb" + + +async def get_spectrograph_temperatures(spec: Spectrographs | None = None): + """Returns a dictionary of spectrograph temperatures.""" + + if spec is None: + tasks: list[asyncio.Task] = [] + for spec in get_args(Spectrographs): + tasks.append(asyncio.create_task(get_spectrograph_temperatures(spec))) + + task_results = await asyncio.gather(*tasks) + return { + key: value + for task_result in task_results + for key, value in task_result.items() + } + + async with CluClient() as client: + scp_command = await client.send_command( + f"lvmscp.{spec}", + "status", + internal=True, + ) + + if scp_command.status.did_fail: + raise ValueError("Failed retrieving status from SCP.") + + status = scp_command.replies.get("status") + + response: dict[str, float] = {} + + cameras: list[Cameras] = ["r", "b", "z"] + sensors: list[Sensors] = ["ccd", "ln2"] + + for camera in cameras: + for sensor in sensors: + label = get_spectrograph_temperature_label(camera, sensor) + if label not in status: + raise ValueError(f"Cannot find status label {label!r}.") + response[f"{camera}{spec[-1]}_{sensor}"] = status[label] + + return response