Skip to content

Commit

Permalink
Add CluClient and spectrograph tools
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Sep 12, 2024
1 parent 8ce824e commit 64c13af
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ branch = true
include = ["src/lvmopstools/*"]
omit = [
"src/lvmopstools/__main__.py",
"src/lvmopstools/clu.py",
"src/lvmopstools/ds9.py",
"src/lvmopstools/utils.py"
"src/lvmopstools/utils.py",
"src/lvmopstools/devices/specs.py"
]

[tool.coverage.report]
Expand Down
179 changes: 179 additions & 0 deletions src/lvmopstools/clu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-09-12
# @Filename: clu.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import os

from typing import TYPE_CHECKING, Any, Literal, overload

from clu.client import AMQPClient

from lvmopstools import config


if TYPE_CHECKING:
from clu.command import Command


__all__ = ["CluClient", "send_clu_command"]


class CluClient:
"""AMQP client asynchronous generator.
Returns an object with an ``AMQPClient`` instance. The normal way to
use it is to do ::
async with CluClient() as client:
await client.send_command(...)
Alternatively one can do ::
client = await anext(CluClient())
await client.send_command(...)
The asynchronous generator differs from the one in ``AMQPClient`` in that
it does not close the connection on exit.
This class is a singleton, which effectively means the AMQP client is reused
during the life of the worker. The singleton can be cleared by calling
`.clear`.
"""

__initialised: bool = False
__instance: CluClient | None = None

def __new__(cls):
if cls.__instance is None:
cls.__instance = super(CluClient, cls).__new__(cls)
cls.__instance.__initialised = False

return cls.__instance

def __init__(self):
if self.__initialised is True:
return

host: str = os.environ.get("RABBITMQ_HOST", config["rabbitmq.host"])
port: int = int(os.environ.get("RABBITMQ_PORT", config["rabbitmq.port"]))

self.client = AMQPClient(host=host, port=port)
self.__initialised = True

self._lock = asyncio.Lock()

async def __aenter__(self):
# Small delay to allow the event loop to update the
# connection status if needed.
await asyncio.sleep(0.05)

async with self._lock:
connection = self.client.connection
connected = connection.connection and not connection.connection.is_closed
closed = hasattr(connection, "channel") and connection.channel.is_closed

if not connected or closed:
print("reconnecting")
await self.client.start()

return self.client

async def __aexit__(self, exc_type, exc, tb):
pass

async def __anext__(self):
if not self.client.is_connected():
await self.client.start()

return self.client

@classmethod
def clear(cls):
"""Clears the current instance."""

cls.__instance = None
cls.__initialised = False


@overload
async def send_clu_command(command_string: str) -> list[dict[str, Any]]: ...


@overload
async def send_clu_command(
command_string: str,
*,
raw: Literal[False],
) -> list[dict[str, Any]]: ...


@overload
async def send_clu_command(
command_string: str,
*,
raw: Literal[True],
) -> Command: ...


@overload
async def send_clu_command(
command_string: str,
*,
raw: bool,
) -> list[dict[str, Any]] | Command: ...


async def send_clu_command(
command_string: str,
*,
raw=False,
) -> list[dict[str, Any]] | Command:
"""Sends a command to the actor system and returns a list of replies.
Parameters
----------
command_string
The command to send to the actor. Must include the name of the actor.
raw
If `True`, returns the command. Otherwise returns a list of replies.
Returns
-------
replies
A list of replies, each one a dictionary of keyword to value. Empty
replies (e.g., those only changing the status) are not returned. If
``raw=True``, the CLU command is returned after awaiting for it to
complete or fail.
Raises
------
RuntimeError
If the command fails, times out, or is otherwise not successful.
"""

consumer, *rest = command_string.split(" ")

async with CluClient() as client:
cmd = await client.send_command(consumer, " ".join(rest))

if cmd.status.did_succeed:
if raw:
return cmd

replies: list[dict[str, Any]] = []
for reply in cmd.replies:
if len(reply.message) == 0:
continue
replies.append(reply.message)
return replies

raise RuntimeError(f"Command {command_string!r} failed.")
4 changes: 4 additions & 0 deletions src/lvmopstools/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
rabbitmq:
host: 10.8.38.21
port: 5672

devices:
thermistors:
host: 10.8.38.180
Expand Down
85 changes: 85 additions & 0 deletions src/lvmopstools/devices/specs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-09-12
# @Filename: specs.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio

from typing import Literal, get_args

from lvmopstools.clu import CluClient


__all__ = ["get_spectrograph_temperature_label", "get_spectrograph_temperatures"]


Spectrographs = Literal["sp1", "sp2", "sp3"]
Cameras = Literal["r", "b", "z"]
Sensors = Literal["ccd", "ln2"]


def get_spectrograph_temperature_label(camera: str, sensor: str = "ccd"):
"""Returns the archon label associated with a temperature sensor."""

if sensor == "ccd":
if camera == "r":
return "mod2/tempa"
elif camera == "b":
return "mod12/tempc"
elif camera == "z":
return "mod12/tempa"

else:
if camera == "r":
return "mod2/tempb"
elif camera == "b":
return "mod2/tempc"
elif camera == "z":
return "mod12/tempb"


async def get_spectrograph_temperatures(spec: Spectrographs | None = None):
"""Returns a dictionary of spectrograph temperatures."""

if spec is None:
tasks: list[asyncio.Task] = []
for spec in get_args(Spectrographs):
tasks.append(asyncio.create_task(get_spectrograph_temperatures(spec)))

task_results = await asyncio.gather(*tasks)
return {
key: value
for task_result in task_results
for key, value in task_result.items()
}

async with CluClient() as client:
scp_command = await client.send_command(
f"lvmscp.{spec}",
"status",
internal=True,
)

if scp_command.status.did_fail:
raise ValueError("Failed retrieving status from SCP.")

status = scp_command.replies.get("status")

response: dict[str, float] = {}

cameras: list[Cameras] = ["r", "b", "z"]
sensors: list[Sensors] = ["ccd", "ln2"]

for camera in cameras:
for sensor in sensors:
label = get_spectrograph_temperature_label(camera, sensor)
if label not in status:
raise ValueError(f"Cannot find status label {label!r}.")
response[f"{camera}{spec[-1]}_{sensor}"] = status[label]

return response

0 comments on commit 64c13af

Please sign in to comment.