Skip to content

Commit

Permalink
Add post-fill tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Sep 16, 2024
1 parent 0c06824 commit 6741505
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 29 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"sshkeyboard>=2.3.1",
"lvmopstools>=0.3.7",
"typer>=0.12.5",
"polars>=1.7.1",
]

[project.urls]
Expand Down
88 changes: 66 additions & 22 deletions src/lvmcryo/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import asyncio
import json
import pathlib
import signal
import warnings
Expand All @@ -23,6 +24,7 @@
from typer.core import TyperGroup

from lvmcryo.config import Actions, Config, InteractiveMode, NotificationLevel
from lvmcryo.runner import post_fill_tasks


LOCKFILE = pathlib.Path("/data/lvmcryo.lock")
Expand Down Expand Up @@ -349,7 +351,7 @@ async def ln2(
"data collection has finished.",
rich_help_panel="Logging",
),
] = 0,
] = 30,
):
"""Handles LN2 purges and fills.
Expand Down Expand Up @@ -378,6 +380,13 @@ async def ln2(
stdout_console = log.rich_console
assert stdout_console is not None

error: Exception | str | None = None
skip_finally: bool = False

json_path: pathlib.Path | None = None
json_handler: FileHandler | None = None
log_data: list[dict] | None = None

if action == Actions.abort:
return await _close_valves_helper()
elif action == Actions.clear_lock:
Expand Down Expand Up @@ -428,10 +437,11 @@ async def ln2(
log.start_file_logger(str(config.log_path))

if write_json:
jsonHandler = FileHandler(config.log_path.with_suffix(".json"), mode="w")
jsonHandler.setLevel(5)
jsonHandler.setFormatter(CustomJsonFormatter())
log.addHandler(jsonHandler)
json_path = config.log_path.with_suffix(".json")
json_handler = FileHandler(str(json_path), mode="w")
json_handler.setLevel(5)
json_handler.setFormatter(CustomJsonFormatter())
log.addHandler(json_handler)
elif config.notify:
# If notifying, start a file logger, but to a temporary location. This is
# just to be able to send the log body in a notification email.
Expand Down Expand Up @@ -471,7 +481,7 @@ async def ln2(
log=log,
valve_info=config.valves,
dry_run=config.dry_run,
alerts_route=config.internal_config["alerts_route"],
alerts_route=config.internal_config["api_routes"]["alerts"],
)

try:
Expand All @@ -486,8 +496,6 @@ async def ln2(
"handler reports a failure."
)

log.info(f"Event times:\n{handler.event_times.model_dump_json(indent=2)}")

except LockExistsError:
err_console.print(
"[red]Lock file already exists.[/] Another instance may be "
Expand All @@ -501,6 +509,9 @@ async def ln2(
f"LN2 {action.value} failed because a lockfile was already present."
)

# Do not do anything special for this error, just exit.
skip_finally = True

return typer.Exit(1)

except Exception as err:
Expand All @@ -515,31 +526,64 @@ async def ln2(
log.sh.setLevel(1000) # Log the traceback to file but do not print.
log.exception(f"Error during {action.value}.", exc_info=err)

if notify:
log.warning("Sending failure notifications.")
await notifier.notify_failure(err, handler)

error = err
if with_traceback:
raise

return typer.Exit(1)

else:
log.info(f"Event times:\n{handler.event_times.model_dump_json(indent=2)}")
log.info(f"LN2 {action.value} completed successfully.")

if notify and config.email_level == NotificationLevel.info:
# The handler has already emitted a notification to Slack so just
# send an email.
# TODO: include log and more data here. For now it's just plain text.
log.debug("Sending notification email.")
notifier.send_email(
message="The LN2 fill completed successfully.",
subject="SUCCESS: LVM LN2 fill",
)

finally:
await handler.clear()

if not skip_finally:
if json_handler and json_path:
json_handler.flush()
with json_path.open("r") as ff:
log_data = [json.loads(line) for line in ff.readlines()]

record_pk = await post_fill_tasks(
handler,
write_data=config.write_data,
data_path=config.data_path,
data_extra_time=config.data_extra_time,
api_data_route=config.internal_config["api_routes"]["fill_data"],
write_to_db=True,
api_db_route=config.internal_config["api_routes"]["register_fill"],
db_extra_payload={
"error": error,
"action": action.value,
"log_file": str(config.log_path) if config.log_path else None,
"json_file": str(json_path) if json_path else None,
"log_data": log_data,
"configuration": config.model_dump(),
},
)

if record_pk:
log.debug(f"Record {record_pk} created in the database.")

if notify:
if error:
log.warning("Sending failure notifications.")
await notifier.notify_failure(error, handler)

elif config.email_level == NotificationLevel.info:
# The handler has already emitted a notification to
# Slack so just send an email.

# TODO: include log and more data here.
# For now it's just plain text.

log.debug("Sending notification email.")
notifier.send_email(
message="The LN2 fill completed successfully.",
subject="SUCCESS: LVM LN2 fill",
)

return typer.Exit(0)


Expand Down
7 changes: 5 additions & 2 deletions src/lvmcryo/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ defaults:
data_path: '/data/logs/lvmcryo/{timestamp}.parquet'

notifications:
slack_route: http://lvm-hub.lco.cl:8080/api/slack/message
slack_channels:
info: lvm-notifications
error: lvm-alerts
Expand All @@ -28,7 +27,11 @@ notifications:
email_from: 'LVM LN2 system <[email protected]>'
email_reply_to: '[email protected]'

alerts_route: http://lvm-hub.lco.cl:8080/api/alerts
api_routes:
slack: http://lvm-hub.lco.cl:8080/api/slack/message
alerts: http://lvm-hub.lco.cl:8080/api/alerts
fill_data: http://lvm-hub.lco.cl:8080/api/spectrographs/fills/measurements
register_fill: http://lvm-hub.lco.cl:8080/api/spectrographs/fills/register

valves:
r1:
Expand Down
5 changes: 4 additions & 1 deletion src/lvmcryo/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ def __init__(self, config_data: dict | str | pathlib.Path | None = None):
if not config_data["notifications"]:
raise ValidationError("Configuration does not have notifications section.")

self.config = NotifierConfig(**config_data["notifications"])
self.config = NotifierConfig(
slack_route=config_data["api_routes"]["slack"],
**config_data["notifications"],
)

self.disabled: bool = False
self.slack_disabled: bool = False
Expand Down
152 changes: 148 additions & 4 deletions src/lvmcryo/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@

import asyncio
import logging
import pathlib
import signal
from datetime import datetime

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import httpx
import polars

from lvmcryo.config import Actions
from lvmcryo.handlers import LN2Handler, close_all_valves
Expand Down Expand Up @@ -72,10 +77,10 @@ async def ln2_runner(
action = config.action.value

if config.use_thermistors or config.purge_time is None or config.fill_time is None:
await notifier.post_to_slack(f"Starting LN₂ {action} at {now_str}.")
await notifier.post_to_slack(f"Starting LN₂ `{action}` at {now_str}.")
else:
await notifier.post_to_slack(
f"Starting LN₂ {action} at {now_str} with "
f"Starting LN₂ `{action}` at {now_str} with "
f"purge_time={config.purge_time} and "
f"fill_time={config.fill_time}."
)
Expand Down Expand Up @@ -130,5 +135,144 @@ async def ln2_runner(
await handler.clear()
raise RuntimeError("Fill failed or was aborted.")

await notifier.post_to_slack(f"LN₂ {action} completed successfully.")
await notifier.post_to_slack(f"LN₂ `{action}` completed successfully.")
await handler.clear()


async def post_fill_tasks(
handler: LN2Handler,
write_data: bool = False,
data_path: str | pathlib.Path | None = None,
data_extra_time: float | None = None,
api_data_route: str = "http://lvm-hub.lco.cl:8080/api/spectrographs/fills/measurements",
write_to_db: bool = False,
api_db_route: str = "http://lvm-hub.lco.cl:8080/api/spectrographs/fills/register",
db_extra_payload: dict[str, Any] = {},
) -> int | None:
"""Runs the post-fill tasks.
Parameters
----------
handler
The `.LN2Handler` instance.
write_data
Whether to collect fill metrology data and write it to disk.
data_path
The path where to write the data. If `None` writes it to the current directory.
data_extra_time
Extra time to wait after the fill before collecting data.
api_data_route
The API route to retrive the fill data.
write_to_db
Whether to write the data to the database.
api_db_route
The API route to write the data to the database.
db_extra_payload
Extra payload to send to the database registration endpoint.
Returns
-------
record_id
The primary key of the associated record if the fill data was written to
the database. `None` otherwise.
"""

log = handler.log
log.info("Running post-fill tasks.")

record_id: int | None = None

start_time: datetime | None = None
end_time: datetime | None = None

event_times = handler.event_times

if not event_times.purge_start and not event_times.fill_start:
# Nothing to do, the fill never happened. Probably failed or was aborted.
pass
else:
if event_times.purge_start:
start_time = event_times.purge_start
elif event_times.fill_start:
start_time = event_times.fill_start

if event_times.failed or event_times.aborted:
end_time = event_times.failed or event_times.aborted
else:
end_time = event_times.fill_complete or event_times.purge_complete

if not write_data or not start_time or not end_time or not api_data_route:
write_data = False
log.debug("Skipping data collection.")

if write_data and start_time and end_time:
if data_extra_time:
log.info(f"Waiting {data_extra_time} seconds before collecting data.")
await asyncio.sleep(data_extra_time)

if data_path is None:
data_path = pathlib.Path.cwd() / "fill_data.parquet"
else:
data_path = pathlib.Path(data_path)

try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(
api_data_route,
params={
"start_time": int(start_time.timestamp()),
"end_time": int(end_time.timestamp()),
},
)
response.raise_for_status()

data = (
polars.DataFrame(response.json())
.with_columns(polars.col.time.cast(polars.Datetime("ms")))
.sort("time")
.drop_nulls()
)

data_path.parent.mkdir(parents=True, exist_ok=True)
data.write_parquet(data_path)

except Exception as ee:
log.error(f"Failed to retrieve fill data from API: {ee!r}")

else:
log.debug(f"Fill data written to {data_path}.")

if write_to_db and api_db_route:
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.post(
api_db_route,
json={
"start_time": date_json(start_time),
"end_time": date_json(end_time),
"purge_start": date_json(event_times.purge_start),
"purge_complete": date_json(event_times.purge_complete),
"fill_start": date_json(event_times.fill_start),
"fill_complete": date_json(event_times.fill_complete),
"fail_time": date_json(event_times.failed),
"abort_time": date_json(event_times.aborted),
"failed": handler.failed,
"aborted": handler.aborted,
**db_extra_payload,
},
)
response.raise_for_status()

record_id = response.json()

except Exception as ee:
log.error(f"Failed to write fill data to database: {ee!r}")

return record_id


def date_json(date: datetime | None) -> str | None:
"""Serialises a datetime object to a JSON string."""

return date.isoformat() if date else None
Loading

0 comments on commit 6741505

Please sign in to comment.