Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy features from lvmapi #9

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

* Removed the option `raise_on_max_attempts` from `Retrier`. If the number of attempts is reached, the retrier will always raise an exception.

### 🚀 New

* Add `get_weather` function to retrieve weather data from the LCO API (via `lvmapi`).

### ✨ Improved

* Better typing for `Retrier.__call__()`.
Expand All @@ -16,11 +20,6 @@
* Fix some unittests.


### 🚀 New

* Add `get_weather` function to retrieve weather data from the LCO API (via `lvmapi`).


## 0.3.9 - September 17, 2024

### 🔧 Fixed
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "lvmopstools"
version = "0.3.10a0"
version = "0.4.0"
description = "LVM tools and utilities for operations"
authors = [
{ name = "José Sánchez-Gallego", email = "[email protected]" }
Expand All @@ -21,6 +21,8 @@ dependencies = [

[project.optional-dependencies]
ds9 = [ "pyds9>=1.8.1" ]
kubernetes = [ "kubernetes>=31.0.0" ]
influxdb = [ "influxdb-client>=1.47.0" ]

[project.urls]
Homepage = "https://github.com/sdss/lvmopstools"
Expand Down Expand Up @@ -87,6 +89,8 @@ omit = [
"src/lvmopstools/__main__.py",
"src/lvmopstools/clu.py",
"src/lvmopstools/ds9.py",
"src/lvmopstools/kubernetes.py",
"src/lvmopstools/influxdb.py",
"src/lvmopstools/utils.py",
"src/lvmopstools/devices/specs.py",
"src/lvmopstools/devices/nps.py"
Expand Down
4 changes: 2 additions & 2 deletions src/lvmopstools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ def set_config(config_file: str | pathlib.Path | None = None) -> None:
CONFIG_FILE = config_path


from .retrier import *
from .socket import *
from .retrier import Retrier
from .socket import AsyncSocketHandler
65 changes: 65 additions & 0 deletions src/lvmopstools/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2023-11-19
# @Filename: influxdb.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

#

from __future__ import annotations

import json
import os
import warnings

import polars


try:
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.warnings import MissingPivotFunction
except ImportError:
InfluxDBClientAsync = None
MissingPivotFunction = None


__all__ = ["query_influxdb"]


async def query_influxdb(
url: str,
query: str,
token: str | None = None,
org: str | None = None,
) -> polars.DataFrame:
"""Runs a query in InfluxDB and returns a Polars dataframe."""

if not InfluxDBClientAsync or not MissingPivotFunction:
raise ImportError("influxdb-client is not installed. Use the influxdb extra.")

warnings.simplefilter("ignore", MissingPivotFunction) # noqa: F821

token = token or os.environ.get("INFLUXDB_V2_TOKEN")
if token is None:
raise ValueError("$INFLUXDB_V2_TOKEN not defined.")

org = org or os.environ.get("INFLUXDB_V2_ORG")
if org is None:
raise ValueError("$INFLUXDB_V2_ORG not defined.")

async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
if not (await client.ping()):
raise RuntimeError("InfluxDB client failed to connect.")

api = client.query_api()

query_results = await api.query(query)

df = polars.DataFrame(json.loads(query_results.to_json()))

if len(df) > 0:
df = df.with_columns(polars.col._time.cast(polars.Datetime("ms")))

return df
198 changes: 198 additions & 0 deletions src/lvmopstools/kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2023-06-29
# @Filename: kubernetes.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import datetime
import os
import warnings
from pathlib import Path

from lvmopstools.utils import is_notebook


try:
import kubernetes
from kubernetes.utils import create_from_yaml
except ImportError:
kubernetes = None
create_from_yaml = None


class Kubernetes:
"""Interface with the Kubernetes cluster."""

def __init__(self, deployments_path: str | Path | None = None):
if kubernetes is None:
raise ImportError("kubernetes is not installed. Use the kubernetes extra.")

self.is_notebook = is_notebook()

if os.getenv("KUBERNETES_SERVICE_HOST"):
self.is_pod = True
else:
self.is_pod = False

# If we are in a notebook, we assume it's the one running in the Jupyter
# Lab deployment, which is configured to have access to the cluster.
if self.is_notebook or self.is_pod:
kubernetes.config.load_incluster_config()
else:
kubernetes.config.load_config()

self.v1 = kubernetes.client.CoreV1Api()
self.apps_v1 = kubernetes.client.AppsV1Api()

self.deployments_path = Path(deployments_path) if deployments_path else None

def list_namespaces(self):
"""Returns a list of namespaces."""

namespace_info = self.v1.list_namespace()
namespaces = [item.metadata.name for item in namespace_info.items]

return namespaces

def list_deployments(self):
"""Returns a list of deployments."""

deployment_info = self.apps_v1.list_deployment_for_all_namespaces()
deployments = [item.metadata.name for item in deployment_info.items]

return deployments

def get_deployment_info(self, deployment: str):
"""Returns the deployment info for a deployment."""

deployment_info = self.apps_v1.list_deployment_for_all_namespaces()

for item in deployment_info.items:
meta = item.metadata
if meta.name == deployment:
return item.to_dict()

raise ValueError(f"Deployment {deployment!r} not found.")

def get_deployment_namespace(self, deployment: str):
"""Returns the namespace of a deployment."""

deployment_info = self.apps_v1.list_deployment_for_all_namespaces()

for item in deployment_info.items:
meta = item.metadata
if meta.name == deployment:
return meta.namespace

return None

def get_yaml_file(self, name: str):
"""Finds and returns the contents of a Kubernetes YAML file."""

if not self.deployments_path:
raise ValueError("No deployments path defined.")

files = list(self.deployments_path.glob(f"**/{name}.y*ml"))

if files is None or len(files) == 0:
raise ValueError(f"No YAML file found for {name!r}.")
elif len(files) > 1:
raise ValueError(f"Multiple YAML files found for {name!r}.")

return files[0]

def apply_from_file(self, name: str | Path):
"""Applies a YAML file.

Parameters
----------
name
The full path to the file to apply. If the path is relative,
the file will be searched in the directory for YAML files
defined in the configuration.

"""

if create_from_yaml is None or kubernetes is None:
raise ImportError("kubernetes is not installed. Use the kubernetes extra.")

if isinstance(name, Path) or os.path.isabs(name):
path = Path(name)
else:
path = self.get_yaml_file(name)

deployments = create_from_yaml(
kubernetes.client.ApiClient(),
yaml_file=str(path),
)

return [dep[0].metadata.name for dep in deployments]

def delete_deployment(self, deployment: str):
"""Deletes resources from a YAML file.

Parameters
----------
deployment
The deployment to delete.

"""

namespace = self.get_deployment_namespace(deployment)
if namespace is None:
raise ValueError(f"Deployment {deployment!r} not found.")

self.apps_v1.delete_namespaced_deployment(deployment, namespace)

async def restart_deployment(self, deployment: str, from_file: bool = True):
"""Restarts a deployment.

If the deployment is running, does a rollout restart. Otherwise looks
for the deployment file and applies it.

"""

if deployment in self.list_deployments() and not from_file:
namespace = self.get_deployment_namespace(deployment)
if namespace is None:
raise ValueError(f"Namespace not found for deployment {deployment}.")

# Create a patch for the current deployment saying that
# it was restarted now, and it will.
now = datetime.datetime.now(datetime.timezone.utc)
now = str(now.isoformat("T") + "Z")
body = {
"spec": {
"template": {
"metadata": {
"annotations": {"kubectl.kubernetes.io/restartedAt": now}
}
}
}
}

self.apps_v1.patch_namespaced_deployment(
deployment,
namespace,
body,
pretty="true",
)

else:
try:
file_ = self.get_yaml_file(deployment)
except ValueError as err:
raise RuntimeError(f"Failed restarting from file: {err} ")

if deployment in self.list_deployments():
self.delete_deployment(deployment)
await asyncio.sleep(3) # Give some time for the pods to exit.
else:
warnings.warn(f"{deployment!r} is not running.")

self.apply_from_file(file_)
19 changes: 19 additions & 0 deletions src/lvmopstools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,22 @@ async def stop_event_loop(timeout: float | None = 5):
pass
finally:
asyncio.get_running_loop().stop()


def is_notebook() -> bool:
"""Returns :obj:`True` if the code is run inside a Jupyter Notebook.

https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook

"""

try:
shell = get_ipython().__class__.__name__ # type: ignore
if shell == "ZMQInteractiveShell":
return True # Jupyter notebook or qtconsole
elif shell == "TerminalInteractiveShell":
return False # Terminal running IPython
else:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
Loading