Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix skelet #127

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions core/opl/StatusData.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import copy
import datetime
import json
import logging
import os
import os.path
import tempfile
import requests
import yaml
from . import date


class StatusData:
def __init__(self, filename, data=None):
self.filename = filename
if filename.startswith("http://") or filename.startswith("https://"):
tmp = tempfile.mktemp()
logging.info(
f"Downloading {filename} to {tmp} and will work with that file from now on"
)
r = requests.get(filename, verify=False)
with open(tmp, "wb") as fp:
fp.write(r.content)
filename = tmp

self._filename = filename
self._filename_mtime = None
if data is None:
self.load()
else:
self._data = data
assert "name" in data
assert "started" in data
assert "ended" in data
assert "result" in data

def load(self):
try:
self._filename_mtime = os.path.getmtime(self._filename)
with open(self._filename, "r") as fp:
self._data = json.load(fp)
logging.debug(f"Loaded status data from {self._filename}")
except FileNotFoundError:
self.clear()
logging.info(f"Opening empty status data file {self._filename}")

def __getitem__(self, key):
logging.debug(f"Getting item {key} from {self._filename}")
return self._data.get(key, None)

def __setitem__(self, key, value):
logging.debug(f"Setting item {key} from {self._filename}")
self._data[key] = value

def __repr__(self):
return f"<StatusData instance version={self.get('version')} id={self.get('id')} started={self.get_date('started')}>"

def __eq__(self, other):
return self._data == other._data

def __gt__(self, other):
logging.info(f"Comparing {self} to {other}")
return self.get_date("started") > other.get_date("started")

def _split_mutlikey(self, multikey):
"""
Dots delimits path in the nested dict.
"""
if multikey == "":
return []
else:
return multikey.split(".")

def _get(self, data, split_key):
if split_key == []:
return data

if not isinstance(data, dict):
logging.warning(
"Attempted to dive into non-dict. Falling back to return None"
)
return None

try:
new_data = data[split_key[0]]
except KeyError:
return None

if len(split_key) == 1:
return new_data
else:
return self._get(new_data, split_key[1:])

def get(self, multikey):
"""
Recursively go through status_data data structure according to
multikey and return its value, or None. For example:

For example:

get(('a', 'b', 'c'))

returns:

self._data['a']['b']['c']

and if say `data['a']['b']` does not exist (or any other key along
the way), return None.
"""
split_key = self._split_mutlikey(multikey)
logging.debug(f"Getting {split_key} from {self._filename}")
return self._get(self._data, split_key)

def get_date(self, multikey):
i = self.get(multikey)
if i is None:
logging.warning(f"Field {multikey} is None, so can not convert to datetime")
return None
return date.my_fromisoformat(i)

def _set(self, data, split_key, value):
try:
new_data = data[split_key[0]]
except KeyError:
if len(split_key) == 1:
data[split_key[0]] = value
return
else:
data[split_key[0]] = {}
new_data = data[split_key[0]]

if len(split_key) == 1:
data[split_key[0]] = value
return
else:
return self._set(new_data, split_key[1:], value)

def set(self, multikey, value):
"""
Recursively go through status_data data structure and set value for
multikey. For example:

set('a.b.c', 123)

set:

self._data['a']['b']['c'] = 123

even if `self._data['a']['b']` do not exists - then it is created as
empty dict.
"""
split_key = self._split_mutlikey(multikey)
logging.debug(f"Setting {'.'.join(split_key)} in {self._filename} to {value}")
if isinstance(value, datetime.datetime):
value = value.isoformat() # make it a string with propper format
self._set(self._data, split_key, copy.deepcopy(value))

def set_now(self, multikey):
"""
Set given multikey to current datetime
"""
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
return self.set(multikey, now.isoformat())

def set_subtree_json(self, multikey, file_path):
"""
Set given multikey to contents of JSON formated file provided by its path
"""
with open(file_path, "r") as fp:
if file_path.endswith(".json"):
data = json.load(fp)
elif file_path.endswith(".yaml"):
data = yaml.load(fp, Loader=yaml.SafeLoader)
else:
raise Exception(
f"Unrecognized extension of file to import: {file_path}"
)
return self.set(multikey, data)

def _remove(self, data, split_key):
try:
new_data = data[split_key[0]]
except KeyError:
return

if len(split_key) == 1:
del data[split_key[0]]
return
else:
return self._remove(new_data, split_key[1:])

def remove(self, multikey):
"""
Remove given multikey (and it's content) from status data file
"""
split_key = self._split_mutlikey(multikey)
logging.debug(f"Removing {split_key} from {self._filename}")
self._remove(self._data, split_key)

def list(self, multikey):
"""
For given path, return list of all existing paths below this one
"""
out = []
split_key = self._split_mutlikey(multikey)
logging.debug(f"Listing {split_key}")
for k, v in self._get(self._data, split_key).items():
key = ".".join(list(split_key) + [k])
if isinstance(v, dict):
out += self.list(key)
else:
out.append(key)
return out

def clear(self):
"""
Default structure
"""
self._data = {
"name": None,
"started": date.get_now_str(),
"ended": None,
"owner": None,
"result": None,
"results": {},
"parameters": {},
"measurements": {},
}

def info(self):
out = ""
out += f"Filename: {self._filename}\n"
for k, v in self._data.items():
if not isinstance(v, dict):
out += f"{k}: {v}\n"
return out

def dump(self):
return self._data

def save(self, filename=None):
"""Save this status data document.

It makes sure that on disk file was not modified since we loaded it,
but if you provide a filename, this check is skipped.
"""
if filename is None:
if self._filename_mtime is not None:
current_mtime = os.path.getmtime(self._filename)
if self._filename_mtime != current_mtime:
tmp = tempfile.mktemp()
self._save(tmp)
raise Exception(
f"Status data file {self._filename} was modified since we loaded it so I do not want to overwrite it. Instead, saved to {tmp}"
)
else:
self._filename = filename

self._save(self._filename)

def _save(self, filename):
"""Just save status data document to JSON file on disk"""
with open(filename, "w+") as fp:
json.dump(self.dump(), fp, sort_keys=True, indent=4)
if filename == self._filename:
self._filename_mtime = os.path.getmtime(filename)
logging.debug(f"Saved status data to {filename}")
8 changes: 5 additions & 3 deletions core/opl/cluster_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from . import data
from . import date
from . import status_data
from . import skelet
from . import retry


def execute(command):
Expand Down Expand Up @@ -181,7 +181,7 @@ def _sanitize_target(self, target):
target = target.replace("$Cloud", self.args.grafana_prefix)
return target

@skelet.retry_on_traceback(max_attempts=10, wait_seconds=1)
@retry.retry_on_traceback(max_attempts=10, wait_seconds=1)
def measure(self, ri, name, grafana_target):
assert (
ri.start is not None and ri.end is not None
Expand Down Expand Up @@ -456,7 +456,9 @@ def get_source(self, environment, path):


class RequestedInfo:
def __init__(self, config, start=None, end=None, args=argparse.Namespace(), sd=None):
def __init__(
self, config, start=None, end=None, args=argparse.Namespace(), sd=None
):
"""
"config" is input for config_stuff function
"start" and "end" are datetimes needed if config file contains some
Expand Down
12 changes: 6 additions & 6 deletions core/opl/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ def data_stats(data):
"max": max(data),
"sum": sum(data),
"mean": statistics.mean(data),
"non_zero_mean": statistics.mean(non_zero_data)
if len(non_zero_data) > 0
else 0.0,
"non_zero_mean": (
statistics.mean(non_zero_data) if len(non_zero_data) > 0 else 0.0
),
"median": statistics.median(data),
"non_zero_median": statistics.median(non_zero_data)
if len(non_zero_data) > 0
else 0.0,
"non_zero_median": (
statistics.median(non_zero_data) if len(non_zero_data) > 0 else 0.0
),
"stdev": statistics.stdev(data) if len(data) > 1 else 0.0,
"range": max(data) - min(data),
"percentile25": q25,
Expand Down
6 changes: 6 additions & 0 deletions core/opl/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ def my_fromisoformat(string):
out = datetime.datetime.strptime(string, "%Y-%m-%dT%H:%M:%S")
out = out.replace(tzinfo=string_tz)
return out


def get_now_str():
now = datetime.datetime.utcnow()
now = now.replace(tzinfo=datetime.timezone.utc)
return now.isoformat()
43 changes: 43 additions & 0 deletions core/opl/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import time
import logging
from functools import wraps


def retry_on_traceback(max_attempts=10, wait_seconds=1):
"""
Retries a function until it succeeds or the maximum number of attempts
or wait time is reached.

This is to mimic `@retry` decorator from Tenacity so we do not depend
on it.

Args:
max_attempts: The maximum number of attempts to retry the function.
wait_seconds: The number of seconds to wait between retries.

Returns:
A decorator that retries the wrapped function.
"""
assert max_attempts >= 0, "It does not make sense to have less than 0 retries"
assert wait_seconds >= 0, "It does not make sense to wait les than 0 seconds"

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempt = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
if attempt >= max_attempts:
raise # Reraise the exception after all retries are exhausted

attempt += 1
logging.debug(
f"Retrying in {wait_seconds} seconds. Attempt {attempt}/{max_attempts} failed with: {e}"
)
time.sleep(wait_seconds)

return wrapper

return decorator
Loading
Loading