From c3efb00d17b8db9dff3387e0cb7af6cf92965c81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20S=C3=A1nchez-Gallego?= Date: Sun, 26 Nov 2023 20:12:02 +0000 Subject: [PATCH] Minimally functional version --- src/ln2fill/__main__.py | 116 ++++++++--- src/ln2fill/config.yaml | 6 +- src/ln2fill/core.py | 424 ++++++++++++++++++++++++++++++++++------ src/ln2fill/tools.py | 29 +-- src/ln2fill/types.py | 4 +- 5 files changed, 479 insertions(+), 100 deletions(-) diff --git a/src/ln2fill/__main__.py b/src/ln2fill/__main__.py index 47e0d20..49291c4 100644 --- a/src/ln2fill/__main__.py +++ b/src/ln2fill/__main__.py @@ -11,12 +11,14 @@ import datetime import logging import pathlib +import warnings from typing import Unpack import click from sdsstools import Configuration, read_yaml_file +from sdsstools.daemonizer import cli_coro from ln2fill.tools import is_container @@ -48,7 +50,7 @@ def update_options( # Select the source of defaults, if any. if use_defaults: - defaults = internal_config["defaults"] + defaults = Configuration(internal_config["defaults"]) elif configuration_file is not None: new_config = read_yaml_file(configuration_file) @@ -134,17 +136,72 @@ def update_options( options["qa_path"] = "./" # Determine if we should run interactively. - if is_container(): - options["interactive"] = "no" - elif options["interactive"] == "auto": - options["interactive"] = "yes" - - if options["interactive"] == "no": - options["no_prompt"] = True + if options["interactive"] == "auto": + if is_container(): + options["interactive"] = "no" + else: + options["interactive"] = "yes" + elif options["interactive"] == "yes": + if is_container(): + warnings.warn("Interactive mode may not work in containers.", UserWarning) + + if options["no_prompt"] is None: + if options["interactive"] == "yes": + options["no_prompt"] = True + else: + options["no_prompt"] = False + + if ( + options["no_prompt"] + and options["use_thermistors"] is False + and (options["purge_time"] is None or options["fill_time"] is None) + ): + raise ValueError( + "Cannot run without thermistors and without purge and fill times." + ) return options.copy() +async def handle_fill(**options: Unpack[OptionsType]): + """Handles the purge/fill process.""" + + from ln2fill.core import LN2Handler + + if options["quiet"]: + log.sh.setLevel(logging.ERROR) + elif options["verbose"]: + log.sh.setLevel(logging.DEBUG) + + if options["write_log"]: + assert options["log_path"] + log_path = pathlib.Path(options["log_path"]) + log_path.parent.mkdir(parents=True, exist_ok=True) + log.start_file_logger(str(log_path), mode="w", rotating=False) + + if options["cameras"] is None: + raise RuntimeError("No cameras specified.") + + if isinstance(options["cameras"], str): + cameras = list(map(lambda s: s.strip(), options["cameras"].split(","))) + else: + cameras = options["cameras"] + + interactive = True if options["interactive"] == "yes" else False + + handler = LN2Handler(cameras=cameras, interactive=interactive) + + await handler.check() + + max_purge_time = options["purge_time"] or options["max_purge_time"] + await handler.purge( + use_thermistor=options["use_thermistors"], + min_purge_time=options["min_purge_time"], + max_purge_time=max_purge_time, + prompt=not options["no_prompt"], + ) + + @click.command(name="ln2fill") @click.argument( "ACTION", @@ -181,8 +238,9 @@ def update_options( @click.option( "--no-prompt", is_flag=True, - help="Does not prompt for confirmation. If --interactive yes or auto, prompting " - "is determined automatically.", + help="Does not prompt the user to finish or abort a purge/fill. Prompting is " + "required if --fill-time or --purge-time are not provided and thermistors " + "are not used.", ) @click.option( "--check-pressure/--no-check-pressure", @@ -210,32 +268,40 @@ def update_options( @click.option( "--purge-time", type=float, - help="Purge time in seconds. If not provided, the thermistors will be used.", + help="Purge time in seconds. If --use-thermistors, this is effectively the " + "maximum purge time.", ) @click.option( "--min-purge-time", type=float, - help="Minimum purge time in seconds. Defaults to internal value. Disable with -1.", + help="Minimum purge time in seconds. Defaults to internal value.", ) @click.option( "--max-purge-time", type=float, - help="Maximum purge time in seconds. Defaults to internal value. Disable with -1.", + help="Maximum purge time in seconds. Defaults to internal value.", ) @click.option( "--fill-time", type=float, - help="Fill time in seconds. If not provided, the thermistors will be used.", + help="Fill time in seconds. If --use-thermistors, this is effectively the " + "maximum fill time.", ) @click.option( "--min-fill-time", type=float, - help="Minimum fill time in seconds. Defaults to internal value. Disable with -1.", + help="Minimum fill time in seconds. Defaults to internal value.", ) @click.option( "--max-fill-time", type=float, - help="Maximum fill time in seconds. Defaults to internal value. Disable with -1.", + help="Maximum fill time in seconds. Defaults to internal value.", +) +@click.option( + "--use-thermistors/--no-use-thermistors", + default=True, + show_default=True, + help="Use thermistor values to determine purge/fill time.", ) @click.option( "--quiet", @@ -243,6 +309,12 @@ def update_options( is_flag=True, help="Disable logging to stdout.", ) +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Outputs additional information to stdout.", +) @click.option( "--write-json", is_flag=True, @@ -314,7 +386,8 @@ def update_options( help="Comma-separated list of email recipients. Required if --email is set.", ) @click.pass_context -def ln2fill_cli( +@cli_coro() +async def ln2fill_cli( ctx, action: str, use_defaults: bool = False, @@ -336,14 +409,7 @@ def ln2fill_cli( **options, ) - if options["quiet"]: - log.sh.setLevel(logging.ERROR) - - if options["write_log"]: - assert options["log_path"] - log_path = pathlib.Path(options["log_path"]) - log_path.parent.mkdir(parents=True, exist_ok=True) - log.start_file_logger(str(log_path), mode="w", rotating=False) + await handle_fill(**options) def main(): diff --git a/src/ln2fill/config.yaml b/src/ln2fill/config.yaml index 35f60fe..d563909 100644 --- a/src/ln2fill/config.yaml +++ b/src/ln2fill/config.yaml @@ -1,8 +1,7 @@ --- defaults: - cameras: ['r1', 'b1', 'z1', 'r2', 'b2', 'z2', 'r3', 'b3', 'z3'] + cameras: ['r1', 'r2', 'r3'] interactive: auto - no_prompt: true check_pressure: true check_temperature: true max_temperature: '{{limits.temperature.max}}' @@ -11,6 +10,7 @@ defaults: max_purge_time: '{{limits.purge.max}}' min_fill_time: '{{limits.fill.min}}' max_fill_time: '{{limits.fill.max}}' + use_thermistors: true quiet: false write_json: true write_log: true @@ -61,7 +61,7 @@ valves: limits: pressure: min: null - max: 1e-3 + max: 0.001 temperature: min: null max: -140 diff --git a/src/ln2fill/core.py b/src/ln2fill/core.py index bb38feb..3396b43 100644 --- a/src/ln2fill/core.py +++ b/src/ln2fill/core.py @@ -9,17 +9,24 @@ from __future__ import annotations import asyncio +import dataclasses from time import time +from typing import Coroutine, Optional + +import sshkeyboard from pydantic import Field, model_validator from pydantic.dataclasses import dataclass from rich.progress import TaskID +from sdsstools.configuration import RecursiveDict + from ln2fill import config, log from ln2fill.tools import ( TimerProgressBar, cancel_nps_threads, - is_container, + cancel_task, + get_spectrograph_status, read_thermistors, valve_on_off, ) @@ -53,7 +60,7 @@ class ThermistorHandler: interval: float = 1 min_open_time: float = Field(default=0.0, ge=0.0) close_valve: bool = True - min_active_time: float = Field(default=0.0, ge=0.0) + min_active_time: float = Field(default=10.0, ge=0.0) def __init__(self, *args, **kwargs): self.valve_handler = self.valve_handler @@ -64,7 +71,10 @@ def __init__(self, *args, **kwargs): # Unix time at which the thermistor became active. self._active_time: float = -1 - self.thermistor_channel: str = self.valve_handler.thermistor_channel + thermistor_channel = self.valve_handler.thermistor_channel + assert thermistor_channel + + self.thermistor_channel: str = thermistor_channel async def read_thermistor(self): """Reads the thermistor and returns its state.""" @@ -72,25 +82,8 @@ async def read_thermistor(self): thermistors = await read_thermistors() return thermistors[self.thermistor_channel] - async def start_monitoring( - self, - close_valve: bool | None = None, - min_active_time: float | None = None, - ): - """Monitors the thermistor and potentially closes the valve. - - Sets the `.Future` as complete once the active state has been reached. - ``close_valve`` and ``min_active_time`` can be set to override the - instance values. - - """ - - close_valve = close_valve if close_valve is not None else self.close_valve - - if min_active_time is not None: - assert min_active_time >= 0 - else: - min_active_time = self.min_active_time + async def start_monitoring(self): + """Monitors the thermistor and potentially closes the valve.""" await asyncio.sleep(self.min_open_time) @@ -99,12 +92,12 @@ async def start_monitoring( if thermistor_status is True: if self._active_time < 0: self._active_time = time.time() - if time.time() - self._active_time > min_active_time: + if time.time() - self._active_time > self.min_active_time: break await asyncio.sleep(self.interval) - if close_valve: + if self.close_valve: await self.valve_handler.finish_fill() @@ -128,10 +121,9 @@ class ValveHandler: """ valve: str - thermistor_channel: str = Field(None, repr=False) - nps_actor: str = Field(None, repr=False) - max_open_time: float = Field(None, ge=5.0, repr=False) - show_progress_bar: bool = Field(True, repr=False) + thermistor_channel: Optional[str] = dataclasses.field(default=None, repr=False) + nps_actor: Optional[str] = dataclasses.field(default=None, repr=False) + show_progress_bar: Optional[bool] = dataclasses.field(default=True, repr=False) @model_validator(mode="after") def validate_fields(self): @@ -145,14 +137,7 @@ def validate_fields(self): if self.nps_actor is None: self.nps_actor = config.get(f"valves.{self.valve}.actor") if self.nps_actor is None: - raise ValueError("Cannot find NPS actor for valve {self.valve!r}.") - - # Some reasonable, last resource values. - if self.max_open_time is None: - if self.valve == "purge": - self.max_open_time = 2000 - else: - self.max_open_time = 600 + raise ValueError(f"Cannot find NPS actor for valve {self.valve!r}.") return self @@ -162,17 +147,26 @@ def __post_init__(self): self._thread_id: int | None = None self._progress_bar_id: TaskID | None = None + self._monitor_task: asyncio.Task | None = None + self._timeout_task: asyncio.Task | None = None + + self.event = asyncio.Event() + self.active: bool = False + async def start_fill( self, - fill_time: float | None = None, + min_open_time: float = 0.0, + timeout: float | None = None, use_thermistor: bool = True, ): """Starts a fill. Parameters ---------- - fill_time - The time to keep the valve open. If ``None``, defaults to + min_open_time + Minimum time to keep the valve open. + timeout + The maximumtime to keep the valve open. If ``None``, defaults to the ``max_open_time`` value. use_thermistor Whether to use the thermistor to close the valve. If ``True`` and @@ -181,20 +175,18 @@ async def start_fill( """ - if use_thermistor: - if fill_time is not None: - timeout = fill_time + if timeout is None: + # Some hardcoded hard limits. + if self.valve == "purge": + timeout = 2000 else: - timeout = self.max_open_time - else: - if fill_time is None: - raise ValueError("fill_time is required with use_thermistor=False") - timeout = fill_time + timeout = 600 await self._set_state(True, timeout=timeout, use_script=True) if use_thermistor: - await self.thermistor.start_monitoring() + self.thermistor.min_open_time = min_open_time + self._monitor_task = asyncio.create_task(self.thermistor.start_monitoring()) if self.show_progress_bar: if self.valve.lower() == "purge": @@ -205,21 +197,46 @@ async def start_fill( complete_description = "Fill complete" self._progress_bar_id = await PROGRESS_BAR.add_timer( - self.max_open_time, + timeout, label=self.valve, initial_description=initial_description, complete_description=complete_description, ) + await asyncio.sleep(2) + self._timeout_task = asyncio.create_task(self._schedule_timeout_task(timeout)) + + self.active = True + + self.event.clear() + await self.event.wait() + + async def _schedule_timeout_task(self, timeout: float): + """Schedules a task to cancel the fill after a timeout.""" + + await asyncio.sleep(timeout) + await self.finish_fill() + async def finish_fill(self): """Finishes the fill, closing the valve.""" await self._set_state(False) + await cancel_task(self._monitor_task) + self._monitor_task = None + if self._progress_bar_id is not None: await PROGRESS_BAR.stop_timer(self._progress_bar_id) self._progress_bar_id = None + await cancel_task(self._timeout_task) + self._timeout_task = None + + if not self.event.is_set(): + self.event.set() + + self.active = False + async def _set_state( self, on: bool, @@ -236,13 +253,11 @@ async def _set_state( Whether to use the ``cycle_with_timeout`` script to set a timeout after which the valve will be closed. timeout - Maximum open valve time. If ``None``, defaults to the ``max_open_time`` - value. + Maximum open valve time. """ - if timeout is None or timeout < 0: - timeout = self.max_open_time + assert self.nps_actor is not None # If there's already a thread running for this valve, we cancel it. if self._thread_id is not None: @@ -258,19 +273,308 @@ async def _set_state( if thread_id is not None: self._thread_id = thread_id + if on: + if thread_id is not None: + log.debug( + f"Valve {self.valve!r} was opened with timeout={timeout} " + f"(thread_id={thread_id})." + ) + else: + log.debug(f"Valve {self.valve!r} was closed.") + +@dataclass class LN2Handler: """The main LN2 purge/fill handlerclass. Parameters ---------- + cameras + List of cameras to fill. Defaults to all the cameras. + purge_valve + The name of the purge valve. interactive - Whether to show interactive features. If ``None``, interactivity will - be determined depending on the console type. + Whether to show interactive features. + """ - def __init__(self, interactive: bool | None = None): - if interactive is None: - self.interactive = False if is_container() else True - else: - self.interactive = interactive + cameras: list[str] | None = None + purge_valve: str = "purge" + interactive: bool = False + + def __post_init__(self): + if self.cameras is None: + self.cameras = list(config["defaults.cameras"]) + + self._valve_handlers: dict[str, ValveHandler] = {} + for camera in self.cameras + [self.purge_valve]: + self._valve_handlers[camera] = ValveHandler(camera) + + async def check( + self, + max_pressure: float | None = None, + max_temperature: float | None = None, + check_thermistors: bool = True, + ): + """Checks if the pressure and temperature are in the allowed range. + + If ``max_temperature`` or ``max_pressure`` are not provided, the values + from the configuration file are used. + + Parameters + ---------- + max_pressure + The maximum pressure to allow. + max_temperature + The maximum temperature to allow. + check_thermistors + Checks that the thermistors are reading correctly. + + Raises + ------ + RuntimeError + If any of the checks fails. + + """ + + log.info("Checking pressure and temperature ...") + + assert self.cameras is not None + + if max_pressure is None: + max_pressure = config["limits.pressure.max"] + if max_temperature is None: + max_temperature = config["limits.temperature.max"] + + assert max_pressure is not None and max_temperature is not None + + specs: set[str] = set([]) + for camera in self.cameras: + cam_id = camera[-1] + specs.add(f"sp{cam_id}") + + try: + spec_status = await get_spectrograph_status(list(specs)) + if not isinstance(spec_status, dict): + raise RuntimeError("Invalid spectrograph response.") + except Exception as err: + raise RuntimeError(f"Failed reading spectrograph status: {err}") + + spec_status = RecursiveDict(spec_status) + + for camera in self.cameras: + ln2_temp = spec_status[f"{camera}.ln2"] + if ln2_temp is None: + raise RuntimeError(f"Invalid {camera!r} temperature.") + if ln2_temp > max_temperature: + raise RuntimeError( + f"LN2 temperature for camera {camera} is {ln2_temp:.1f} K " + f"which is above the maximum allowed temperature " + f"({max_temperature:.1f} K)." + ) + + pressure = spec_status[f"{camera}.pressure"] + if pressure is None: + raise RuntimeError(f"Invalid {camera!r} pressure.") + if pressure > max_pressure: + raise RuntimeError( + f"Pressure for camera {camera} is {pressure} K " + f"which is above the maximum allowed pressure " + f"({max_pressure})." + ) + + if check_thermistors: + log.info("Checking thermistors ...") + + try: + thermistors = await read_thermistors() + assert isinstance(thermistors, dict), "invalid return type." + except Exception as err: + raise RuntimeError(f"Failed reading thermistors: {err}") + + for valve in self._valve_handlers: + channel = self._valve_handlers[valve].thermistor_channel + assert channel is not None, "invalid thermistor channel." + + thermistor = thermistors[channel] + if thermistor is None: + raise RuntimeError(f"Invalid {valve!r} thermistor.") + if thermistor is True: + raise RuntimeError(f"Thermistor for valve {valve} is active.") + + log.info("All checks passed.") + + return True + + async def purge( + self, + purge_valve: str | None = None, + use_thermistor: bool = True, + min_purge_time: float | None = None, + max_purge_time: float | None = None, + prompt: bool | None = None, + ): + """Purges the system. + + Parameters + ---------- + purge_valve + The name of the purge valve. If ``None``, uses the instance default. + use_thermistor + Whether to use the thermistor to close the valve. + min_purge_time + The minimum time to keep the purge valve open. Only relevant if + using the thermistor. + max_purge_time + The maximum time to keep the purge valve open. If + ``use_thermistor=None`` this is effectively the purge time unless + ``prompt=True`` and the purge is cancelled before reaching the + timeout. + prompt + Whether to show a prompt to stop or cancel the purge. If ``None``, + determined from the instance ``interactive`` attribute. + + + """ + + if purge_valve is None: + purge_valve = self.purge_valve + + valve_handler = self._valve_handlers[purge_valve] + min_open_time = min_purge_time or 0.0 + + log.info( + f"Beginning purge using valve {valve_handler.valve!r} with " + f"use_thermistor={use_thermistor}, min_open_time={min_open_time}, " + f"timeout={max_purge_time}." + ) + + prompt = prompt if prompt is not None else self.interactive + if prompt: + log.warning('Press "x" to abort or "enter" to finish the purge.') + self._kb_monitor() + + try: + await valve_handler.start_fill( + min_open_time=min_open_time, + timeout=max_purge_time, + use_thermistor=use_thermistor, + ) + log.info("Purge complete.") + except Exception: + raise + finally: + if prompt: + sshkeyboard.stop_listening() + + async def fill( + self, + cameras: list[str] | None = None, + use_thermistors: bool = True, + min_fill_time: float | None = None, + max_fill_time: float | None = None, + prompt: bool | None = None, + ): + """Fills the selected cameras. + + Parameters + ---------- + cameras + The list of cameras to fill. If not provided, defaults to the values + used to instantiate the `.LN2Handler` instance. + use_thermistors + Whether to use the thermistors to close the valves. + min_fill_time + The minimum time to keep the fill valves open. Only relevant if + using the thermistors. + max_fill_time + The maximum time to keep the fill valves open. If + ``use_thermistor=None`` this is effectively the fill time unless + ``prompt=True`` and the fills are cancelled before reaching the + timeout. + prompt + Whether to show a prompt to stop or cancel the fill. If ``None``, + determined from the instance ``interactive`` attribute. + + + + """ + + cameras = cameras or self.cameras + if cameras is None or len(cameras) == 0: + raise RuntimeError("No cameras selected for filling.") + + min_open_time = min_fill_time or 0.0 + + fill_tasks: list[Coroutine] = [] + + for camera in cameras: + try: + valve_handler = self._valve_handlers[camera] + except KeyError: + raise RuntimeError(f"Unable to find valve for camera {camera!r}.") + + fill_tasks.append( + valve_handler.start_fill( + min_open_time=min_open_time, + timeout=max_fill_time, + use_thermistor=use_thermistors, + ) + ) + + log.info( + f"Beginning fill on cameras {cameras!r} with " + f"use_thermistors={use_thermistors}, min_open_time={min_open_time}, " + f"timeout={max_fill_time}." + ) + + prompt = prompt if prompt is not None else self.interactive + if prompt: + log.warning('Press "x" to abort or "enter" to finish the purge.') + self._kb_monitor() + + try: + await asyncio.gather(*fill_tasks) + log.info("Fill complete.") + except Exception: + raise + finally: + if prompt: + sshkeyboard.stop_listening() + + def _kb_monitor(self): + """Monitors the keyboard and cancels/aborts the fill..""" + + async def monitor_keys(key: str): + """Parses a pressed key and cancels/aborts the fills.""" + + if key not in ["x", "X", "enter"]: + return + + if key == "x" or key == "X": + log.warning("Aborting purge/fill.") + await self.abort(only_active=False) + elif key == "enter": + await self.abort(only_active=True) + + sshkeyboard.stop_listening() + + # No need to store this task. It will be automatically done when + # sshkeyboard.stop_listening() is called. + asyncio.create_task(sshkeyboard.listen_keyboard_manual(on_press=monitor_keys)) + + async def abort(self, only_active: bool = True): + """Cancels ongoing fills and closes the valves. + + If ``only_active=True`` only active valves will be closed. Otherwise + closes all valves. + + """ + + tasks: list[Coroutine] = [] + + for valve_handler in self._valve_handlers.values(): + if valve_handler.active or not only_active: + tasks.append(valve_handler.finish_fill()) + + await asyncio.gather(*tasks) diff --git a/src/ln2fill/tools.py b/src/ln2fill/tools.py index 9b9de93..40816a8 100644 --- a/src/ln2fill/tools.py +++ b/src/ln2fill/tools.py @@ -163,12 +163,13 @@ async def valve_on_off( else: command_string = f"on --off-after {timeout} {outlet}" - async with CluClient() as client: - command = await client.send_command(actor, command_string) - if command.status.did_fail: - raise RuntimeError(f"Command '{actor} {command_string}' failed") + # async with CluClient() as client: + # command = await client.send_command(actor, command_string) + # if command.status.did_fail: + # raise RuntimeError(f"Command '{actor} {command_string}' failed") if is_script: + return 1 script_data = command.replies.get("script") return script_data["thread_id"] @@ -190,9 +191,7 @@ async def cancel_nps_threads(actor: str, thread_id: int | None = None): command_string = f"scripts stop {thread_id if thread_id else ''}" async with CluClient() as client: - command = await client.send_command(actor, command_string) - if command.status.did_fail: - raise RuntimeError(f"Command '{actor} {command_string}' failed") + await client.send_command(actor, command_string) async def close_all_valves(): @@ -405,7 +404,6 @@ def _done_timer( self.progress.update( task_id, - completed=completed, description=f"[green] {complete_description} ", ) self.progress.refresh() @@ -417,9 +415,7 @@ async def stop_timer(self, task_id: TaskID, clear: bool = False): raise ValueError("Task ID not found.") _task = self._tasks[task_id] - _task.cancel() - with suppress(asyncio.CancelledError): - await _task + await cancel_task(_task) del self._tasks[task_id] @@ -428,3 +424,14 @@ async def stop_timer(self, task_id: TaskID, clear: bool = False): self.progress.update(task_id, visible=False) await asyncio.sleep(1) # Extra time for renders. + + +async def cancel_task(task: asyncio.Future | None): + """Safely cancels a task.""" + + if task is None or task.done(): + return + + task.cancel() + with suppress(asyncio.CancelledError): + await task diff --git a/src/ln2fill/types.py b/src/ln2fill/types.py index 06cb58b..7e4ab5e 100644 --- a/src/ln2fill/types.py +++ b/src/ln2fill/types.py @@ -16,7 +16,7 @@ class OptionsType(TypedDict): cameras: Optional[str] interactive: str - no_prompt: bool + no_prompt: Optional[bool] check_pressure: bool check_temperature: bool max_pressure: float @@ -29,6 +29,8 @@ class OptionsType(TypedDict): fill_time: Optional[float] min_fill_time: float max_fill_time: float + use_thermistors: bool + verbose: bool quiet: bool write_json: bool write_log: bool