diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..b694934 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.venv \ No newline at end of file diff --git a/.gitignore b/.gitignore index 89eb912..f2ff438 100644 --- a/.gitignore +++ b/.gitignore @@ -129,7 +129,13 @@ dmypy.json # Pyre type checker .pyre/ exp__* -experiments/simple_raytune/benchmark__RaytuneBenchmark -experiments/optuna_minikube/benchmark__OptunaMinikubeBenchmark +**/benchmark__** -data/ \ No newline at end of file +data/ + +#idea +.idea/ + +# +.envs +test/test_ml_benchmark/hyperparameter_space.yml diff --git a/experiments/optuna_kubernetes/ops/manifests/db/db-deployment.yml b/experiments/optuna_kubernetes/ops/manifests/db/db-deployment.yml index 51e7bd0..9782521 100644 --- a/experiments/optuna_kubernetes/ops/manifests/db/db-deployment.yml +++ b/experiments/optuna_kubernetes/ops/manifests/db/db-deployment.yml @@ -9,38 +9,6 @@ data: POSTGRES_DB: postgresdb POSTGRES_USER: postgresadmin POSTGRES_PASSWORD: admin123 -# --- -# kind: PersistentVolume -# apiVersion: v1 -# metadata: -# name: postgres-pv-volume -# labels: -# type: local -# app: postgres -# spec: -# # storageClassName: manual -# capacity: -# storage: 1Gi -# accessModes: -# - ReadWriteMany -# hostPath: -# path: "/mnt/data" -# reclaimPolicy: Delete -# --- -# kind: PersistentVolumeClaim -# apiVersion: v1 -# metadata: -# name: postgres-pv-claim -# labels: -# app: postgres -# spec: -# storageClassName: manual -# accessModes: -# - ReadWriteMany -# resources: -# requests: -# storage: 1Gi - --- apiVersion: apps/v1 kind: Deployment @@ -56,6 +24,8 @@ spec: labels: app: postgres spec: + nodeSelector: + scaphandre : "true" containers: - name: postgres image: postgres:10.4 @@ -65,17 +35,10 @@ spec: envFrom: - configMapRef: name: postgres-config - # volumeMounts: - # - mountPath: /var/lib/postgresql/data - # name: postgredb resources: limits: cpu: 1.0 memory: 1G - # volumes: - # - name: postgredb - # persistentVolumeClaim: - # claimName: postgres-pv-claim --- apiVersion: v1 kind: Service diff --git a/experiments/optuna_kubernetes/ops/manifests/trial/job.yml b/experiments/optuna_kubernetes/ops/manifests/trial/job.yml index 49e8ff9..4fa93fd 100644 --- a/experiments/optuna_kubernetes/ops/manifests/trial/job.yml +++ b/experiments/optuna_kubernetes/ops/manifests/trial/job.yml @@ -7,6 +7,8 @@ spec: parallelism: $worker_num template: spec: + nodeSelector: + scaphandre : "true" containers: - name: optuna-trial image: $worker_image @@ -22,5 +24,14 @@ spec: value: "postgresql://postgresadmin:admin123@postgres:5432/postgresdb" - name: "METRICS_STORAGE_HOST" value: "$metrics_ip" + - name: "N_TRIALS" + value: "$trials" + - name: "EPOCHS" + value: "$epochs" + # injects the kuberntes node name into eacah pod + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName restartPolicy: OnFailure diff --git a/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py b/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py index e216578..734512c 100644 --- a/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py +++ b/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py @@ -1,6 +1,7 @@ import random from os import path from time import sleep +from math import ceil import optuna from kubernetes import client, config, watch @@ -36,12 +37,29 @@ def __init__(self, resources: dict) -> None: self.workerCount = resources.get("workerCount", 4) self.delete_after_run = resources.get("deleteAfterRun", True) self.metrics_ip = resources.get("metricsIP") + self.trials = resources.get("trials", 10) #self._calculate_trial_number(resources.get("trials", 6)) + self.epochs = resources.get("epochs", 5) + self.hyperparameter = resources.get("hyperparameter") + + def _calculate_trial_number(self, n_trials): + new_n_trials = None + if n_trials < self.workerCount: + new_n_trials = self.workerCount + else: + new_n_trials = ceil(n_trials/self.workerCount) + return new_n_trials def deploy(self) -> None: """ Deploy DB """ # TODO: deal with exsiting resources... + + if self.hyperparameter: + #TODO: XXX we got to fix this dependency thing. eitehr merge minikube/kubernetes or use the same baseclass or something... + f = path.join(path.dirname(__file__),"..","optuna_minikube","hyperparameter_space.yml") + YamlTemplateFiller.as_yaml(f, self.hyperparameter) + try: resp = client.CoreV1Api().create_namespace( client.V1Namespace(metadata=client.V1ObjectMeta(name=self.namespace))) @@ -95,6 +113,8 @@ def run(self): "worker_image": self.trial_tag, "study_name": self.study_name, "metrics_ip": self.metrics_ip, + "trials": self.trials, + "epochs": self.epochs, } job_yml_objects = YamlTemplateFiller.load_and_fill_yaml_template( path.join(path.dirname(__file__), "ops/manifests/trial/job.yml"), job_definition) @@ -105,11 +125,19 @@ def run(self): if self._is_create_conflict(e): # lets remove the old one and try again client.BatchV1Api().delete_namespaced_job(name="optuna-trial", namespace=self.namespace) + #wait for that to complete + sleep(5) + # try again create_from_yaml( client.ApiClient(), yaml_objects=job_yml_objects, namespace=self.namespace, verbose=True) else: raise e - self._watch_trials() + try: + for t in range(1,14): + self._watch_trials(timeout=120*t) + except Exception as e: + #TODO deal with mitigatable errors + raise e def _getDBURL(self): postgres_sepc = client.CoreV1Api().read_namespaced_service(namespace=self.namespace, name="postgres") @@ -127,23 +155,36 @@ def collect_run_results(self): study = optuna.load_study(study_name=self.study_name, storage=self._getDBURL()) self.best_trial = study.best_trial - def _watch_trials(self): + def _watch_trials(self,timeout=120): """ Checks if Trials (Kubernetes Jobs) are completed. If not the process waits on it. """ w = watch.Watch() c = client.BatchV1Api() - for e in w.stream(c.list_namespaced_job, namespace=self.namespace, timeout_seconds=10): + + for e in w.stream(c.list_namespaced_job, namespace=self.namespace, timeout_seconds=timeout): if "object" in e and e["object"].status.completion_time is not None: w.stop() - return - print("Trials completed! Collecting Results") + print("Trials completed! Collecting Results") + return True + print("Watch_Trials timed out") + try: + job = client.BatchV1Api().read_namespaced_job(name="optuna-trial", namespace=self.namespace) + if job.status.failed != None and job.status.failed > 0: + raise Exception("Trials failed") + except ApiException as e: + if e.status == 404: + raise Exception("Job not created...") + raise e + return False + + def test(self): def optuna_trial(trial): - objective = MnistTask(config_init={"epochs": 1}).create_objective() - lr = trial.suggest_float("learning_rate", 1e-3, 0.1, log=True) + objective = MnistTask(config_init={"epochs": self.epochs}).create_objective() + lr = trial.suggest_float("learning_rate", 1e-4, 0.1, log=True) decay = trial.suggest_float("weight_decay", 1e-6, 1e-4, log=True) objective.set_hyperparameters({"learning_rate": lr, "weight_decay": decay}) # these are the results, that can be used for the hyperparameter search @@ -165,10 +206,11 @@ def undeploy(self): if self.delete_after_run: client.CoreV1Api().delete_namespace(self.namespace) self._watch_namespace() - self.image_builder.cleanup(self.trial_tag) + # self.image_builder.cleanup(self.trial_tag) def _watch_namespace(self): try: + #TODO: XXX fix me! client.CoreV1Api().read_namespace_status(self.namespace).to_dict() sleep(2) except client.exceptions.ApiException: @@ -193,26 +235,23 @@ def _watch_db(self): if __name__ == "__main__": from ml_benchmark.benchmark_runner import BenchmarkRunner from urllib.request import urlopen - # The basic config for the workload. For testing purposes set epochs to one. - # For benchmarking take the default value of 100 - # your ressources the optimization should run on - resource_definition = { - "workerCpu": 2, - "workerMemory": 2, - "workerCount": 4, + from ml_benchmark.utils.yml_parser import YMLParser + resources = YMLParser.parse(path.join(path.dirname(__file__),"resource_definition.yml")) + + # TODO: XXX remove this hardcoded values + to_automate = { "metricsIP": urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip(), - "studyName": "optuna-study", "dockerImageTag": "tawalaya/optuna-trial:latest", "dockerImageBuilder": "docker", "kubernetesNamespace": "optuna-study", "kubernetesContext": "admin@smile", "kubernetesMasterIP": "130.149.158.143", - "deleteAfterRun": False, + "prometheus_url": "http://130.149.158.143:30041", + "deleteAfterRun":False, } + resources.update(to_automate) - # TODO: hyperparams. - - # import an use the runner runner = BenchmarkRunner( - benchmark_cls=OptunaKubernetesBenchmark, resource_definition=resource_definition) + benchmark_cls=OptunaKubernetesBenchmark, resources=resources) runner.run() + diff --git a/experiments/optuna_kubernetes/optuna_kubernetes_rcpu.py b/experiments/optuna_kubernetes/optuna_kubernetes_rcpu.py new file mode 100644 index 0000000..14518f2 --- /dev/null +++ b/experiments/optuna_kubernetes/optuna_kubernetes_rcpu.py @@ -0,0 +1,44 @@ +import logging +from os import path +from time import sleep +from experiments.optuna_kubernetes.optuna_kubernetes_benchmark import OptunaKubernetesBenchmark +from ml_benchmark.benchmark_runner import BenchmarkRunner +from urllib.request import urlopen +from ml_benchmark.utils.yml_parser import YMLParser + +if __name__ == "__main__": + metricsIP = urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip() + + # read in base configuration + resources = YMLParser.parse(path.join(path.dirname(__file__),"resource_definition.yml")) + # TODO: XXX remove this hardcoded values + to_automate = { + "metricsIP": metricsIP, + "dockerImageTag": "tawalaya/optuna-trial:latest", + "dockerImageBuilder": "docker", + #force random namespaces to reduce conflicts + # "kubernetesNamespace": "optuna-study", + "kubernetesContext": "admin@smile", + "kubernetesMasterIP": "130.149.158.143", + "prometheus_url": "http://130.149.158.143:30041", + "deleteAfterRun":True, + "epochs": 50, + } + resources.update(to_automate) + + repetions = 2 + for trials in [6,12,18]: + for cpu in range(2,8): + for i in range(1,repetions+1): + sleep(3) + logging.info(f"Starting Run {i} with 3x{cpu} vCPUs with n_trails {trials}") + try: + resources["trials"] = trials + resources["workerCpu"] = (cpu/2.0) + resources["goal"] = f"rcpu{cpu}-{trials}-{i}" + runner = BenchmarkRunner(benchmark_cls=OptunaKubernetesBenchmark, resources=resources) + runner.run() + sleep(7) + runner = None + except Exception as e: + logging.warning(f'Failed Run {i} with 3x{cpu} vCPUs with n_trails {trials} - {e}') diff --git a/experiments/optuna_kubernetes/optuna_kubernetes_rnode.py b/experiments/optuna_kubernetes/optuna_kubernetes_rnode.py new file mode 100644 index 0000000..f771655 --- /dev/null +++ b/experiments/optuna_kubernetes/optuna_kubernetes_rnode.py @@ -0,0 +1,43 @@ +import logging +from os import path +from time import sleep +from experiments.optuna_kubernetes.optuna_kubernetes_benchmark import OptunaKubernetesBenchmark +from ml_benchmark.benchmark_runner import BenchmarkRunner +from urllib.request import urlopen +from ml_benchmark.utils.yml_parser import YMLParser + +if __name__ == "__main__": + metricsIP = urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip() + + # read in base configuration + resources = YMLParser.parse(path.join(path.dirname(__file__),"resource_definition.yml")) + # TODO: XXX remove this hardcoded values + to_automate = { + "metricsIP": metricsIP, + "dockerImageTag": "tawalaya/optuna-trial:latest", + "dockerImageBuilder": "docker", + #force random namespaces to reduce conflicts + # "kubernetesNamespace": "optuna-study", + "kubernetesContext": "admin@smile", + "kubernetesMasterIP": "130.149.158.143", + "prometheus_url": "http://130.149.158.143:30041", + "deleteAfterRun":True, + "epochs": 50, + } + resources.update(to_automate) + + repetions = 2 + for i in range(1,repetions+1): + for n in range(1,7): + sleep(3) + logging.info(f"Starting Run {i} with {n} nodes with n_trails 100") + try: + resources["trials"] = 100 + resources["workerCount"] = n + resources["goal"] = f"rnode{n}-100-{i}" + runner = BenchmarkRunner(benchmark_cls=OptunaKubernetesBenchmark, resources=resources) + runner.run() + sleep(7) + runner = None + except Exception as e: + logging.warning(f'Failed Run {i} with {n} nodes and n_trails 100 - {e}') diff --git a/experiments/optuna_kubernetes/resource_definition.yml b/experiments/optuna_kubernetes/resource_definition.yml new file mode 100644 index 0000000..542f7f8 --- /dev/null +++ b/experiments/optuna_kubernetes/resource_definition.yml @@ -0,0 +1,25 @@ + +workerCpu: 3.25 +workerMemory: 6 +workerCount: 1 +trials: 6 +epochs: 5 +metricsIP: auto ##urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip(), +kubernetesMasterIP: minikube ##subprocess.check_output("minikube ip", shell=True).decode("utf-8").strip("\n") +dockerImageTag: tawalaya/optuna-trial:latest +dockerImageBuilder: docker +kubernetesContext: "minikube" +deleteAfterRun: True +hyperparameter: + learning_rate: + start: 1e-4 + end: 1e-2 + step_size: 1e-3 + weight_decay: + start: 1e-6 + end: 1e-4 + step_size: 1e-5 + # hidden_layer_config: + # start: [10] + # end: [100, 100, 100] + # step_size: [10, 1] diff --git a/experiments/optuna_minikube/hyperparameter_space.yml b/experiments/optuna_minikube/hyperparameter_space.yml new file mode 100644 index 0000000..1e72da6 --- /dev/null +++ b/experiments/optuna_minikube/hyperparameter_space.yml @@ -0,0 +1,9 @@ +# generated file - do not edit +learning_rate: + end: 0.01 + start: 0.0001 + step_size: 0.001 +weight_decay: + end: 0.0001 + start: 1.0e-06 + step_size: 1.0e-05 diff --git a/experiments/optuna_minikube/ops/manifests/trial/job.yml b/experiments/optuna_minikube/ops/manifests/trial/job.yml index 202fd2d..a3e111e 100644 --- a/experiments/optuna_minikube/ops/manifests/trial/job.yml +++ b/experiments/optuna_minikube/ops/manifests/trial/job.yml @@ -23,5 +23,10 @@ spec: value: "postgresql://postgresadmin:admin123@postgres:5432/postgresdb" - name: "METRICS_STORAGE_HOST" value: "$metrics_ip" + # injects the kuberntes node name into eacah pod + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName restartPolicy: OnFailure diff --git a/experiments/optuna_minikube/optuna_minikube_benchmark.py b/experiments/optuna_minikube/optuna_minikube_benchmark.py index 3816f97..33efbfb 100644 --- a/experiments/optuna_minikube/optuna_minikube_benchmark.py +++ b/experiments/optuna_minikube/optuna_minikube_benchmark.py @@ -10,6 +10,7 @@ from ml_benchmark.utils.image_build_wrapper import builder_from_string from ml_benchmark.workload.mnist.mnist_task import MnistTask from ml_benchmark.utils.yaml_template_filler import YamlTemplateFiller +from ml_benchmark.utils.yml_parser import YMLParser class OptunaMinikubeBenchmark(Benchmark): @@ -35,12 +36,21 @@ def __init__(self, resources: dict) -> None: self.workerCount = resources.get("workerCount", 4) self.delete_after_run = resources.get("deleteAfterRun", True) self.metrics_ip = resources.get("metricsIP") + self.hyperparameter = resources.get("hyperparameter") def deploy(self) -> None: """ Deploy DB """ + # TODO: deal with exsiting resources... + + #generate hyperparameter file from resouces def. + + if self.hyperparameter: + f = path.join(path.dirname(__file__),"hyperparameter_space.yml") + YamlTemplateFiller.as_yaml(f, self.hyperparameter) + try: resp = client.CoreV1Api().create_namespace( client.V1Namespace(metadata=client.V1ObjectMeta(name=self.namespace))) @@ -132,7 +142,7 @@ def _watch_trials(self): """ w = watch.Watch() c = client.BatchV1Api() - for e in w.stream(c.list_namespaced_job, namespace=self.namespace, timeout_seconds=10): + for e in w.stream(c.list_namespaced_job, namespace=self.namespace, timeout_seconds=100): if "object" in e and e["object"].status.completion_time is not None: w.stop() return @@ -198,18 +208,11 @@ def _watch_db(self): # The basic config for the workload. For testing purposes set epochs to one. # For benchmarking take the default value of 100 # your ressources the optimization should run on - resources = { - "workerCpu": 2, - "workerMemory": 2, - "workerCount": 4, + resources = YMLParser.parse("experiments/optuna_minikube/resource_definition.yml") + to_automate = { "metricsIP": urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip(), - "kubernetesMasterIP": subprocess.check_output("minikube ip", shell=True).decode("utf-8").strip("\n"), - "dockerImageTag": "tawalaya/optuna-trial:latest", - "dockerImageBuilder": "minikube", - "kubernetesNamespace": "optuna-study", - "kubernetesContext": "minikube", - "deleteAfterRun": True, - } + "kubernetesMasterIP": subprocess.check_output("minikube ip", shell=True).decode("utf-8").strip("\n")} + resources.update(to_automate) # TODO: hyperparams. diff --git a/experiments/optuna_minikube/optuna_trial.py b/experiments/optuna_minikube/optuna_trial.py index 1e5f653..5453677 100644 --- a/experiments/optuna_minikube/optuna_trial.py +++ b/experiments/optuna_minikube/optuna_trial.py @@ -3,30 +3,48 @@ from time import sleep import optuna from ml_benchmark.workload.mnist.mnist_task import MnistTask +from utils import generate_search_space +from optuna.study import MaxTrialsCallback +from optuna.trial import TrialState -#TODO: can we extract this to a point were we can use a config to drive this? def optuna_trial(trial): - task = MnistTask(config_init={"epochs": 5}) + epochs = int(os.environ.get("EPOCHS", 5)) + task = MnistTask(config_init={"epochs": epochs}) objective = task.create_objective() - lr = trial.suggest_float("learning_rate", 1e-3, 0.1, log=True) + # optuna doesnt care, these lines of code just get hyperparameters from the search space in grid search + lr = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True) decay = trial.suggest_float("weight_decay", 1e-6, 1e-4, log=True) - objective.set_hyperparameters({"learning_rate": lr, "weight_decay": decay}) + # hidden_layer_config = trial.suggest_int("hidden_layer_config", 1, 4) + objective.set_hyperparameters( + {"learning_rate": lr, "weight_decay": decay})#, "hidden_layer_config": hidden_layer_config}) objective.train() validation_scores = objective.validate() return validation_scores["macro avg"]["f1-score"] - -if __name__ == "__main__": +def main(): try: study_name = os.environ.get("STUDY_NAME", "Test-Study") database_conn = os.environ.get("DB_CONN") + n_trials = int(os.environ.get("N_TRIALS", 2)) + search_space = generate_search_space(os.path.join(os.path.dirname(__file__),"hyperparameter_space.yml")) + print(search_space) study = optuna.create_study( - study_name=study_name, storage=database_conn, direction="maximize", load_if_exists=True) - study.optimize(optuna_trial, n_trials=6) + study_name=study_name, storage=database_conn, direction="maximize", load_if_exists=True, + sampler=optuna.samplers.GridSampler(search_space)) + study.optimize(optuna_trial, + callbacks=[MaxTrialsCallback(n_trials, states=(TrialState.COMPLETE,))], + ) ##TODO:XXX We need to make this a configurable parameter!!! # TODO: add small wait to avoid missing metrics sleep(5) + return True + except Exception as e: + print(e) + return False + +if __name__ == "__main__": + if main(): sys.exit(0) - except Exception: + else: sys.exit(1) diff --git a/experiments/optuna_minikube/resource_definition.yml b/experiments/optuna_minikube/resource_definition.yml new file mode 100644 index 0000000..3c8d03d --- /dev/null +++ b/experiments/optuna_minikube/resource_definition.yml @@ -0,0 +1,24 @@ + +workerCpu: 2 +workerMemory: 2 +workerCount: 4 +metricsIP: auto ##urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip(), +kubernetesMasterIP: minikube ##subprocess.check_output("minikube ip", shell=True).decode("utf-8").strip("\n") +dockerImageTag: tawalaya/optuna-trial:latest +dockerImageBuilder: minikube +kubernetesNamespace: optuna-study +kubernetesContext: "minikube" +deleteAfterRun: True +hyperparameter: + learning_rate: + start: 1e-4 + end: 1e-2 + step_size: 1e-5 + weight_decay: + start: 1e-6 + end: 1e-4 + step_size: 1e-5 + hidden_layer_config: + start: [10] + end: [100, 100, 100] + step_size: [10, 1] diff --git a/experiments/optuna_minikube/test_trail.py b/experiments/optuna_minikube/test_trail.py new file mode 100644 index 0000000..41e9b07 --- /dev/null +++ b/experiments/optuna_minikube/test_trail.py @@ -0,0 +1,28 @@ + +from distutils import file_util +import os +from time import sleep +from experiments.optuna_minikube.optuna_trial import main +from ml_benchmark.metrics_storage import MetricsStorage + + +def test_trail(): + metrics_storage = MetricsStorage() + try: + metrics_storage.start_db() + sleep(5) + os.environ["METRICS_STORAGE_HOST"] = MetricsStorage.host + os.environ["DB_CONN"] = MetricsStorage.connection_string + os.environ["N_TRIALS"] = "10" + os.environ["EPOCHS"] = "1" + + f = main() + assert f + + lats = metrics_storage.get_latency_results() + assert len(lats) >= int(os.environ["N_TRIALS"])*2 #(validate+train) + finally: + metrics_storage.stop_db() + +#TODO: do the same for the container .... +# def test_trail_container(): \ No newline at end of file diff --git a/experiments/optuna_minikube/utils.py b/experiments/optuna_minikube/utils.py new file mode 100644 index 0000000..d0946a5 --- /dev/null +++ b/experiments/optuna_minikube/utils.py @@ -0,0 +1,22 @@ +import numpy as np +from ml_benchmark.utils.yml_parser import YMLParser +import itertools + +def generate_search_space(yaml_file_path): + search_space = YMLParser.parse(yaml_file_path) + modified_search_space = {} + hidden_layer_config = [] + for key, value in search_space.items(): + if isinstance(value["start"], list): + combinations = [] + numbers = range(value["start"][0], value["end"][-1], value["step_size"][0]) + for r in range(len(value["end"])): + r = r + 1 + for combination in itertools.combinations(set(numbers), r): + combinations.append(list(combination)) + + + modified_search_space[key] = combinations + else: + modified_search_space[key] = np.arange(value["start"], value["end"], value["step_size"]) + return modified_search_space diff --git a/experiments/simple_raytune/ops/configs/prometheus.yml b/experiments/simple_raytune/ops/configs/prometheus.yml new file mode 100644 index 0000000..4261afe --- /dev/null +++ b/experiments/simple_raytune/ops/configs/prometheus.yml @@ -0,0 +1,12 @@ +global: + scrape_interval: 1m + +scrape_configs: + - job_name: "prometheus" + scrape_interval: 1m + static_configs: + - targets: ["localhost:9090"] + + - job_name: "node" + static_configs: + - targets: ["node-exporter:9100"] \ No newline at end of file diff --git a/experiments/simple_raytune/prometheus.yml b/experiments/simple_raytune/prometheus.yml new file mode 100644 index 0000000..885089a --- /dev/null +++ b/experiments/simple_raytune/prometheus.yml @@ -0,0 +1,42 @@ +version: '3.8' + +networks: + monitoring: + driver: bridge + +services: + node-exporter: + image: prom/node-exporter:latest + container_name: node-exporter + restart: unless-stopped + volumes: + - /proc:/host/proc:ro + - /sys:/host/sys:ro + - /:/rootfs:ro + command: + - '--path.procfs=/host/proc' + - '--path.rootfs=/rootfs' + - '--path.sysfs=/host/sys' + - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)' + expose: + - 9100 + networks: + - monitoring + + prometheus: + image: prom/prometheus:latest + container_name: prometheus + restart: unless-stopped + volumes: + - ./ops/configs/:/configs:ro + command: + - '--config.file=/configs/prometheus.yml' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--web.enable-lifecycle' + expose: + - 9090 + networks: + - monitoring + ports: + - "9090:9090" \ No newline at end of file diff --git a/experiments/simple_raytune/raytune_benchmark.py b/experiments/simple_raytune/raytune_benchmark.py index d5ac1ca..8000f76 100644 --- a/experiments/simple_raytune/raytune_benchmark.py +++ b/experiments/simple_raytune/raytune_benchmark.py @@ -99,7 +99,9 @@ def undeploy(self): # For benchmarking take the default value of 100 # your ressources the optimization should run on - resources = {"workerCpu": 12} + resources = {"workerCpu": 8, + "prometheus_url": "http://localhost:9090" # Assuming u used docker-compose up + } # Add your hyperparameter setting procedure here # your hyperparameter grid you want to search over diff --git a/ml_benchmark/__init__.py b/ml_benchmark/__init__.py index f628f98..c2bddfe 100644 --- a/ml_benchmark/__init__.py +++ b/ml_benchmark/__init__.py @@ -1,7 +1,14 @@ +# from pathlib import Path + __version__ = "develop" install_requires = [ "scikit-learn==0.24.2", "tqdm==4.62.3", "SQLAlchemy==1.4.31", "docker==4.4.2", - "psycopg2-binary"], + "psycopg2-binary", + "prometheus-api-client==0.5.1", + "ruamel.yaml==0.17.21"], test_install_requires = ["pytest==7.1.2", "pytest-cov==3.0.0"] URL = "https://github.com/gebauerm/ml_benchmark" +# this_directory = Path(__file__).parent.parent +# long_description = (this_directory / "README.md").read_text() +long_description = "FIXME" diff --git a/ml_benchmark/benchmark_runner.py b/ml_benchmark/benchmark_runner.py index d290b0b..27a117c 100644 --- a/ml_benchmark/benchmark_runner.py +++ b/ml_benchmark/benchmark_runner.py @@ -9,19 +9,18 @@ import docker import numpy as np import torch +import logging -from ml_benchmark.latency_tracker import Latency, LatencyTracker +from ml_benchmark.latency_tracker import LatencyTracker from ml_benchmark.metrics_storage import MetricsStorage - +from ml_benchmark.resource_tracker import ResourceTracker +from ml_benchmark.metrics import Latency class Benchmark(ABC): """ This class serves as an Interface for a benchmark. All neccessary methods have to be implemented in the subclass that is using the interface. Make sure to use the predefined static variables. Your benchmark will most likely not run properly if the variables value remains to be "None". - - Args: - ABC (_type_): Abstract Base Class """ # TODO: objective and grid are not allowed to be in the benchmark @@ -118,8 +117,10 @@ def __init__( self.rundate = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") benchmark_path = os.path.abspath(os.path.dirname(inspect.getabsfile(benchmark_cls))) self.bench_name = f"{benchmark_cls.__name__}" + self.bench_goal = resources.get("goal", "debug") self.benchmark_folder = os.path.join(benchmark_path, f"benchmark__{self.bench_name}") self.create_benchmark_folder(self.benchmark_folder) + self.resources = resources # add input and output size to the benchmark. self.benchmark = benchmark_cls(resources) @@ -130,6 +131,11 @@ def __init__( # prepare tracker self.metrics_storage = MetricsStorage() self.latency_tracker = LatencyTracker(MetricsStorage.connection_string) + if "prometheus_url" in resources: + self.resource_tracker = ResourceTracker(resources["prometheus_url"]) + else: + logging.warning("No Prometheus URL provided. Resource Tracker will not be used.") + self.resource_tracker = None def run(self): """ @@ -139,28 +145,46 @@ def run(self): Raises: ValueError: _description_ """ - run_process = [ - self.benchmark.deploy, self.benchmark.setup, self.benchmark.run, - self.benchmark.collect_run_results, - self.benchmark.test, self.benchmark.collect_benchmark_metrics] benchmark_results = None try: self.metrics_storage.start_db() + + # Deploy the SUT + with Latency(self.benchmark.deploy) as latency: + self.benchmark.deploy() + self.latency_tracker.track(latency) + + # RUN the benchmark + run_process = [ + self.benchmark.setup, self.benchmark.run, + self.benchmark.collect_run_results, + self.benchmark.test, self.benchmark.collect_benchmark_metrics] + + if self.resource_tracker is not None: + self.resource_tracker.start() + for benchmark_fun in run_process: with Latency(benchmark_fun) as latency: benchmark_fun() self.latency_tracker.track(latency) + + # Get the results of the benchmark benchmark_results = self.metrics_storage.get_benchmark_results() # just to be save we wait a bit before killing shit. - sleep(5) - self.metrics_storage.stop_db() except (docker.errors.APIError, AttributeError, ValueError, RuntimeError) as e: print(e) raise ValueError("No Results obtained, Benchmark failed.") finally: + sleep(5) + if self.resource_tracker is not None: + self.resource_tracker.stop() + self.resource_tracker = None + + self.metrics_storage.stop_db() + # Undeploy the SUT try: self.benchmark.undeploy() except Exception: @@ -171,6 +195,8 @@ def run(self): except Exception: pass + # TODO: move to finally block to ensure that results are always caputres if possible? + # persist the results self.save_benchmark_results(benchmark_results) def _set_all_seeds(self): @@ -191,7 +217,7 @@ def save_benchmark_results(self, benchmark_results): benchmark_results (_type_): _description_ """ benchmark_config_dict = dict( - resources=self.benchmark.resources, + resources=self.resources, ) benchmark_result_dict = dict( benchmark_metrics=benchmark_results, @@ -200,7 +226,7 @@ def save_benchmark_results(self, benchmark_results): with open( os.path.join( self.benchmark_folder, - f"benchmark_results__{self.rundate}__id.json"), "w" + f"benchmark_results__{self.rundate}__{self.bench_goal}.json"), "w" ) as f: json.dump(benchmark_result_dict, f) print("Results saved!") diff --git a/ml_benchmark/config.py b/ml_benchmark/config.py index c45fc9d..6582d1f 100644 --- a/ml_benchmark/config.py +++ b/ml_benchmark/config.py @@ -27,7 +27,7 @@ def to_dict(self): class MLPHyperparameter: learning_rate: float = 1e-3 weight_decay: float = 1e-6 - hidden_layer_config: list = field(default_factory=lambda: [50, 20]) + hidden_layer_config: list = field(default_factory=lambda: [15]) def to_dict(self): return asdict(self) diff --git a/ml_benchmark/decorators.py b/ml_benchmark/decorators.py new file mode 100644 index 0000000..a9f1e08 --- /dev/null +++ b/ml_benchmark/decorators.py @@ -0,0 +1,49 @@ + +from ml_benchmark.latency_tracker import LatencyTracker +from ml_benchmark.metrics import Latency +from ml_benchmark.results_tracker import ResultTracker + + +def validation_latency_decorator(func): + """ + A Decorator to record the latency of the decorated function. Once it is recorded the LatencyTracker + writes the result into the postgres database. + + We assume that that the decorated function returns a dictionary with the following keys: + - "macro avg": the macro average of the validation with the keys: + - "f1-score": the f1-score + + """ + def result_func(*args, **kwargs): + func.__self__ = args[0] + with Latency(func) as latency: + result = func(*args, **kwargs) + latency_tracker = LatencyTracker() + tracker = ResultTracker() + + latency_tracker.track(latency) + #XXX this locks us into the f1-score, we probably want to track all callification metrics not just f1-score. MG please help :) + tracker.track(func, result) + func.__self__ = None + return result + + return result_func + +def latency_decorator(func): + """A Decorator to record the latency of the decorated function. Once it is recorded the LatencyTracker + writes the result into the postgres databse. + + Decorators overwrite a decorated function once the code is passed to the compier + + Args: + func (_type_): _description_ + """ + def latency_func(*args, **kwargs): + func.__self__ = args[0] + with Latency(func) as latency: + result = func(*args, **kwargs) + latency_tracker = LatencyTracker() + latency_tracker.track(latency) + func.__self__ = None + return result + return latency_func \ No newline at end of file diff --git a/ml_benchmark/latency_tracker.py b/ml_benchmark/latency_tracker.py index 0f98094..0ad71b1 100644 --- a/ml_benchmark/latency_tracker.py +++ b/ml_benchmark/latency_tracker.py @@ -1,12 +1,9 @@ import os from abc import ABC, abstractmethod - import psycopg2 from sqlalchemy import MetaData, Table, create_engine, insert - from ml_benchmark.config import MetricsStorageConfig -from ml_benchmark.metrics import Latency class Tracker(ABC): @@ -40,7 +37,7 @@ def __init__(self, connection_string: str = None) -> None: def _create_engine(self, connection_string): try: - engine = create_engine(connection_string, echo=True) + engine = create_engine(connection_string, echo=False) except psycopg2.Error: raise ConnectionError("Could not create an Engine for the Postgres DB.") return engine @@ -84,25 +81,6 @@ def shape_connection_string(self, host): return f"postgresql://{user}:{password}@{host}:{port}/{db}" -def latency_decorator(func): - """A Decorator to record the latency of the decorated function. Once it is recorded the LatencyTracker - writes the result into the postgres databse. - - Decorators overwrite a decorated function once the code is passed to the compier - - Args: - func (_type_): _description_ - """ - def latency_func(*args, **kwargs): - func.__self__ = args[0] - with Latency(func) as latency: - result = func(*args, **kwargs) - latency_tracker = LatencyTracker() - latency_tracker.track(latency) - func.__self__ = None - return result - return latency_func - if __name__ == "__main__": @@ -114,7 +92,7 @@ def latency_func(*args, **kwargs): storage = MetricsStorage() result = [] - + from ml_benchmark.decorators import latency_decorator class Test: metrics_storage_address = MetricsStorage.connection_string diff --git a/ml_benchmark/metrics.py b/ml_benchmark/metrics.py index 59947a7..6adea31 100644 --- a/ml_benchmark/metrics.py +++ b/ml_benchmark/metrics.py @@ -1,3 +1,5 @@ +import json +import logging import os from datetime import datetime, timedelta from uuid import uuid4 @@ -10,9 +12,7 @@ class Metric: """ Metric Parentclass. Creates a unique identifier for every metric and gathers basic information. """ - process_id = os.getpid() - hostname = socket.gethostname() - metric_id = f"id_{uuid4()}__pid_{process_id}__hostname_{hostname}" + metric_id = "" def add_to_id(self, id_addition): self.metric_id = self.metric_id + f"__{id_addition}" @@ -21,6 +21,89 @@ def to_dict(self): return asdict(self) +class NodeUsage(Metric): + def __init__(self, node_id): + super().__init__() + self.node_id = node_id + + self.add_to_id(f"{self.node_id}") + + self.timestamp = None + self.cpu_usage = None + self.memory_usage = None + self.network_usage = None + self.accelerator_usage = None + self.wattage = None + self.processes = None + + def to_dict(self): + node_dict = dict( + metric_id=self.metric_id, + timestamp=self.timestamp, + cpu_usage=self.cpu_usage, + memory_usage=self.memory_usage, + network_usage=self.network_usage, + wattage=self.wattage, + processes=int(self.processes), + ) + if self.accelerator_usage: + node_dict["accelerator_usage"] = self.accelerator_usage + + return {key: _convert_datetime_to_unix(value) for key, value in node_dict.items()} + + def __repr__(self): + return str(self.to_dict()) + + +class Result(Metric): + def __init__(self, objective): + super().__init__() + + # add fingerprinting data to self + self.fp = _fingerprint(self,objective) + self.timestamp = datetime.now().ctime() + self.value = None + self.measure = None + + self.hyperparameters = None + self.classification_metrics = None + + def to_dict(self): + return dict( + metric_id=self.metric_id, + timestamp=self.timestamp, + value=self.value, + hyperparameters=json.dumps(self.hyperparameters, indent=None), + classification_metrics=json.dumps(self.classification_metrics,indent=None), + measure=self.measure, + **self.fp + ) + + + +def _fingerprint(metric,func): + process_id = os.getpid() + # inject the NODE_NAME (from the environment) - should be availble in containerized environments + if os.getenv("NODE_NAME"): + hostname = f'{os.getenv("NODE_NAME")}_{socket.gethostname()}' + else: + hostname = f'BARE_{socket.gethostname()}' + metric.add_to_id(f"id_{uuid4()}__pid_{process_id}__hostname_{hostname}") + + obj_hash = 0 + try: + obj_hash = hash(func.__self__) + except AttributeError as e: + logging.warn(f"fingerprinting error {e}") + raise AttributeError("Functions need to be part of a class in order to measure their latency. {e}") + + return { + "process_id": process_id, + "hostname": hostname, + "obj_hash": obj_hash, + } + + class Latency(Metric): def __init__(self, func) -> None: @@ -41,13 +124,13 @@ def __init__(self, func) -> None: AttributeError: _description_ """ super().__init__() - self.function_name: str = func.__name__ - try: - self.obj_hash = hash(func.__self__) - except AttributeError as e: - print(e) - raise AttributeError("Functions need to be part of a class in order to measure their latency.") - self.add_to_id(f"function-name_{self.function_name}__objHash_{self.obj_hash}") + #TODO: make each id filed also availible as a column + fp = _fingerprint(self,func) + obj_hash = fp["obj_hash"] + function_name: str = func.__name__ + self.function_name = function_name + self.add_to_id(f"function-name_{function_name}__objHash_{obj_hash}") + self.start_time: float = None self.end_time: float = None self.duration_sec: float = None @@ -61,7 +144,7 @@ def to_dict(self): duration_sec=self.duration_sec ) # latency_dict.update(super().to_dict()) - latency_dict = {key: self._convert_times_to_float(value) for key, value in latency_dict.items()} + latency_dict = {key: _convert_times_to_float(value) for key, value in latency_dict.items()} return latency_dict @@ -84,8 +167,15 @@ def __exit__(self, *args): def _calculate_duration(self): self.duration_sec = self.end_time - self.start_time - def _convert_times_to_float(self, value): - if isinstance(value, timedelta): - return value.total_seconds() - else: - return str(value) + +def _convert_times_to_float(value): + if isinstance(value, timedelta): + return value.total_seconds() + else: + return str(value) + +def _convert_datetime_to_unix(value): + if isinstance(value, datetime): + return value.ctime() + else: + return str(value) \ No newline at end of file diff --git a/ml_benchmark/metrics_storage.py b/ml_benchmark/metrics_storage.py index 9de9ccc..1e08056 100644 --- a/ml_benchmark/metrics_storage.py +++ b/ml_benchmark/metrics_storage.py @@ -1,9 +1,14 @@ +from abc import abstractmethod +import logging import time import docker -from sqlalchemy import create_engine, MetaData, Table, Column, String, Float, select +from docker.errors import APIError +from sqlalchemy import create_engine, MetaData, Table, Column, String, Float, select, Integer, insert, BigInteger +import psycopg2 +import os from ml_benchmark.config import MetricsStorageConfig - +from ml_benchmark.metrics import Metric class MetricsStorage: @@ -15,6 +20,7 @@ class MetricsStorage: connection_string = MetricsStorageConfig.connection_string def __init__(self, connection_string: str = None) -> None: + """ The MetricsStorage serves as the representation of the databse. It sets up a postgres database in a docker container and creates tables for every recorded metric. @@ -25,30 +31,46 @@ def __init__(self, connection_string: str = None) -> None: Args: connection_string (str, optional): _description_. Defaults to None. """ + logging.basicConfig() + logging.getLogger('sqlalchemy').setLevel(logging.ERROR) + logging.getLogger('sqlalchemy.engine').setLevel(logging.ERROR) + + self.meta = None + self.client = None + self.engine = None if connection_string: self.connection_string = connection_string self.latency = None + self.resources = None def start_db(self): self.setup_db() self.engine = create_engine(self.connection_string) self.create_metrics_table() - self.create_resource_table() return self def setup_db(self): self.client = docker.from_env() - self.client.containers.run( - "postgres:14.1", detach=True, - environment=[ - f"POSTGRES_PASSWORD={self.password}", f"POSTGRES_DB={self.db}", f"POSTGRES_USER={self.user}"], - ports={f'{self.port}/tcp': self.port}, - name="postgres", - remove=True) + try: + self.client.containers.run( + "postgres:14.1", detach=True, + environment=[ + f"POSTGRES_PASSWORD={self.password}", f"POSTGRES_DB={self.db}", f"POSTGRES_USER={self.user}"], + ports={f'{self.port}/tcp': self.port}, + name="postgres", + remove=True) + except APIError as e: + if e.status_code == 409: + #TODO: we maybe want to drop the database in these cases + logging.info("Postgres is already running") + else: + raise e + container = self.client.containers.get("postgres") # checks if db is up while "accepting connections" not in container.exec_run("pg_isready").output.decode(): time.sleep(2) + #TODO: should have a timeout condition print("DB-Container Running") def stop_db(self): @@ -63,7 +85,8 @@ def create_metrics_table(self): self.create_latency_table() self.create_resource_table() self.create_classification_metrics_table() - self.meta.create_all(self.engine) + self.meta.create_all(self.engine,checkfirst=True) + def create_latency_table(self): self.latency = Table( @@ -73,32 +96,146 @@ def create_latency_table(self): Column("start_time", String), Column("end_time", String), Column("duration_sec", Float) + #TODO add fingerprint ) def create_resource_table(self): - pass + self.resources = Table( + "resources", self.meta, + Column("metric_id", String, primary_key=True), + Column("timestamp", String, primary_key=True), + Column("cpu_usage", Float), + Column("memory_usage", Float), + Column("network_usage", Float), + Column("accelerator_usage", Float), + Column("wattage", Float), + Column("processes", Integer), + ) def create_classification_metrics_table(self): - pass + self.classification_metrics = Table( + "classification_metrics", self.meta, + Column("metric_id", String, primary_key=True), + Column("timestamp", String, primary_key=True), + Column("value", Float), + Column("measure", String), + Column("hyperparameters", String), + Column("classification_metrics", String), + Column("process_id", Integer, nullable=True), + Column("hostname", String), + Column("obj_hash", BigInteger, nullable=True), + ) def get_benchmark_results(self): latency = self.get_latency_results() resources = self.get_resource_results() classification = self.get_classification_results() return dict(latency=latency, resources=resources, classification=classification) - - def get_latency_results(self): + + def _get_table_results(self,table): result_list = [] with self.engine.connect() as conn: - stmt = select(self.latency) + stmt = select(table) cursor = conn.execute(stmt) cursor = cursor.mappings().all() for row in cursor: result_list.append(dict(row)) return result_list + def get_latency_results(self): + return self._get_table_results(self.latency) + def get_resource_results(self): - pass + return self._get_table_results(self.resources) def get_classification_results(self): + return self._get_table_results(self.classification_metrics) + + +class StoreStrategy(object): + """ + Interface for swapping out different implementations of the resource store, e.g., a database, a file, etc. + """ + @abstractmethod + def setup(self, **kwargs): + """ + Setup the resource store, e.g., create a database connection. + """ pass + + @abstractmethod + def store(self, node_usage:Metric, **kwargs): + """ + Store the node usage in the resource store. + """ + pass + +#global store engine used as a singleton to safe +engine=None + +class MetricsStorageStrategy(StoreStrategy): + + def __init__(self): + self.engine = None + + def setup(self, **kwargs): + if self.engine: + return + + #resue the global engine if it exists + # global engine + # if engine: + # self.engine = engine + + self.engine = self._create_engine(**kwargs) + # engine = self.engine + + def _get_connection_string(self, **kwargs): + # XXX: list order is implicitly a priority + connection_string_actions_registry = [ + ("env", os.environ.get("METRICS_STORAGE_HOST", None)), + ("args",kwargs.get("connection_string",None)) + ] + for method, value in connection_string_actions_registry: + if value: + logging.debug(f"Tracker Connection String retrieved from: {method} using {value}") + return self.shape_connection_string(value) + logging.warn("No Method was succsessful. Setting Tracker URL to current Host.") + return MetricsStorageConfig.connection_string + + def shape_connection_string(self, host): + user = MetricsStorageConfig.user + password = MetricsStorageConfig.password + port = MetricsStorageConfig.port + db = MetricsStorageConfig.db + return f"postgresql://{user}:{password}@{host}:{port}/{db}" + + def _create_engine(self, **kwargs): + connection_string = self._get_connection_string(**kwargs) + try: + engine = create_engine(connection_string, echo=False) + except psycopg2.Error: + raise ConnectionError("Could not create an Engine for the Postgres DB.") + return engine + + def store(self, data:Metric, **kwargs): + try: + metadata = MetaData(bind=self.engine) + node_usage = Table(kwargs.get("table_name","metrics"), metadata, autoload_with=self.engine) + with self.engine.connect() as conn: + stmt = insert(node_usage).values(data.to_dict()) + conn.execute(stmt) + except Exception as e: + logging.warn(f"Could not store the data in the Metrics DB {data} - {e}") + +class LoggingStoreStrategy(StoreStrategy): + + def __init__(self): + self.log = [] + + def setup(self, **kwargs): + pass + + def store(self, data,**kwargs): + logging.info("Storing data: {}".format(data.to_dict())) + self.log.append(data) diff --git a/ml_benchmark/resource_tracker.py b/ml_benchmark/resource_tracker.py index fec04ce..f4f3fc9 100644 --- a/ml_benchmark/resource_tracker.py +++ b/ml_benchmark/resource_tracker.py @@ -1,5 +1,151 @@ +import datetime +import logging +from threading import Timer + +from prometheus_api_client import PrometheusConnect + +from ml_benchmark.metrics import NodeUsage +from ml_benchmark.metrics_storage import MetricsStorageStrategy + + +class RepeatTimer(Timer): + + def run(self): + while not self.finished.wait(self.interval): + self.function(*self.args, **self.kwargs) + + class ResourceTracker: - pass -# For trials open a seperate subprocess next to the objective, which polls resources every few seconds -# Handle it over a decorator as well + # update every 2 seconds ... maybe make this tuneable + UPDATE_INTERVAL = 2 + + def __init__(self, prometheus_url, resouce_store=MetricsStorageStrategy ): + if prometheus_url is None: + raise ValueError("Prometheus URL is required.") + self.prometheus_url = prometheus_url + self.prm = PrometheusConnect(url=self.prometheus_url, disable_ssl=True) + + if not self.prm.check_prometheus_connection(): + raise ValueError("Could not connect to Prometheus.") + + self.store = resouce_store() + self.store.setup() + + self.timer = RepeatTimer(self.UPDATE_INTERVAL, self.update) + + self._check_metrics() + + self.namespace = None + + def _check_metrics(self): + available = set(self.prm.all_metrics()) + + #check node_exporter metrics - cpu/memory + required = {"node_memory_MemFree_bytes", "node_memory_MemTotal_bytes", "node_cpu_seconds_total","scaph_host_power_microwatts","scaph_process_power_consumption_microwatts"} + if not required.issubset(available): + raise ValueError("Prometheus does not provide the required metrics.") + + #check if prometheus is managing a kubernetes cluster + if "container_network_transmit_bytes_total" in available: + self.network_metric = "container_network" + elif "node_network_transmit_bytes_total" in available: + self.network_metric = "node_network" + else: + raise ValueError("Prometheus does not provide a vaild network metric.") + + if "kube_node_info" in available: + info = self.prm.get_current_metric_value("kube_node_info") + self.node_map = dict(map(lambda x: (x["internal_ip"], x["node"]), map(lambda x: x["metric"], info))) + else: + self.node_map = {} + + + def update(self): + try: + self.track() + except Exception as e: + logging.exception("Error while updating resource tracker. %s", e) + + def _query(self): + """ + Query Prometheus for the current resource usage. + """ + # ? is there a better way to map nodes using the node_exporter + memory = 'avg by (instance) (node_memory_MemFree_bytes/node_memory_MemTotal_bytes)' + cpu = '100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[2m])*100))' + + ##needs mapping + network = f'sum by (instance) (rate({self.network_metric}_receive_bytes_total[2m])+rate({self.network_metric}_transmit_bytes_total[2m]))' + #TODO: reduce measurments to only the ones we care about - dose currently not work with scaph_process_power_consumption_microwatts + #if we can we collect the power consumption from the scaph_host_power_microwatts metric only for the used namespace + # if self.namespace: + # wattage = f'sum by (node) (scaph_process_power_consumption_microwatts{{namespace="{self.namespace}"}})' + # processes = f'count by (node) (scaph_process_power_consumption_microwatts{{namespace="{self.namespace}"}})' + # else : + wattage = f'sum by (node) (scaph_host_power_microwatts)' + processes = 'count by (node) (scaph_process_power_consumption_microwatts)' + + mem_result = self.prm.custom_query(memory) + cpu_result = self.prm.custom_query(cpu) + network_result = self.prm.custom_query(network) + wattage_result = self.prm.custom_query(wattage) + processes_result = self.prm.custom_query(processes) + + logging.debug("Got results from Prometheus.", mem_result, cpu_result, network_result) + + # assert len(mem_result) == len(cpu_result) == len(network_result) + + #grab the data per instance + mem_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), mem_result)) + cpu_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), cpu_result)) + network_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), network_result)) + wattage_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), wattage_result)) + processes_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), processes_result)) + + logging.debug("Processed Prometheus Results", mem_result, cpu_result, network_result, wattage_result, processes_result) + + # assert mem_result.keys() == cpu_result.keys() == network_result.keys() + + #merge the data + data = [] + for instance in mem_result: + n = NodeUsage(instance) + n.timestamp = datetime.datetime.now() + n.cpu_usage = cpu_result.get(instance, 0) + n.memory_usage = mem_result.get(instance, 0) + n.network_usage = network_result.get(instance, 0) + if instance in wattage_result: + n.wattage = wattage_result[instance] + n.processes = processes_result[instance] + else: + n.wattage = -1 + n.processes = -1 + + data.append(n) + # logging.debug("Added node usage for %s", instance) + + return data + + def track(self): + data = self._query() + + #insert the data + for n in data: + self.store.store(n,table_name="resources") + + def _try_norm(self, instance: str): + if instance in self.node_map: + return self.node_map[instance] + elif instance[:instance.find(":")] in self.node_map: + return self.node_map[instance[:instance.find(":")]] + else: + return instance + + def start(self): + logging.debug("Starting resource tracker.") + self.timer.start() + + def stop(self): + logging.debug("Stopping resource tracker.") + self.timer.cancel() diff --git a/ml_benchmark/results_tracker.py b/ml_benchmark/results_tracker.py new file mode 100644 index 0000000..2d670b2 --- /dev/null +++ b/ml_benchmark/results_tracker.py @@ -0,0 +1,24 @@ +import logging +from ml_benchmark.latency_tracker import Tracker #TODO: move to utils +from ml_benchmark.metrics import Result +from ml_benchmark.metrics_storage import MetricsStorageStrategy + +class ResultTracker(Tracker): + def __init__(self,store=MetricsStorageStrategy): + self.store = store() + self.store.setup() + + def track(self, objective_function, result): + r = Result(objective=objective_function) + + r.value = result["macro avg"]["f1-score"] + r.measure = "f1-score" + + r.hyperparameters = objective_function.__self__.get_hyperparameters() + r.classification_metrics = result + + try: + self.store.store(r,table_name="classification_metrics") + logging.info("Stored result") + except Exception as e: + logging.error(f"failed to store result {e}") diff --git a/ml_benchmark/utils/yaml_template_filler.py b/ml_benchmark/utils/yaml_template_filler.py index 95ec831..6bb40b2 100644 --- a/ml_benchmark/utils/yaml_template_filler.py +++ b/ml_benchmark/utils/yaml_template_filler.py @@ -22,3 +22,15 @@ def load_and_fill_yaml_template(yaml_path: str, yaml_values: dict) -> dict: with open(yaml_path, "r") as f: job_template = Template(f.read()) return yaml.safe_load_all(job_template.substitute(yaml_values)) + + @staticmethod + def as_yaml(yaml_path: str,obj : object) -> None: + """Safely writes an object to a YAML-File. + Args: + yaml_path (str): filename to write yaml to + obj (any): object to save as yaml + """ + with open(yaml_path, "w") as f: + f.write("# generated file - do not edit\n") + yaml.dump(obj, f) + \ No newline at end of file diff --git a/ml_benchmark/utils/yml_parser.py b/ml_benchmark/utils/yml_parser.py index 8e57bdd..321fa46 100644 --- a/ml_benchmark/utils/yml_parser.py +++ b/ml_benchmark/utils/yml_parser.py @@ -1,4 +1,4 @@ -from yaml import load, Loader +import ruamel.yaml class YMLParser: @@ -6,7 +6,7 @@ class YMLParser: @staticmethod def parse(hyperparameter_file_path): with open(hyperparameter_file_path, "r") as f: - hyper_dict = load(f, Loader=Loader) + hyper_dict = ruamel.yaml.safe_load(f) return hyper_dict diff --git a/ml_benchmark/workload/mnist/mlp.py b/ml_benchmark/workload/mnist/mlp.py index cb703b0..4d1c972 100644 --- a/ml_benchmark/workload/mnist/mlp.py +++ b/ml_benchmark/workload/mnist/mlp.py @@ -42,7 +42,7 @@ def _construct_layer(self, input_size, hidden_layer_config, output_size): def forward(self, x): for layer in self.layers[:-1]: - x = self.relu(layer(x)) + x = layer(x) x = self.layers[-1](x) return x diff --git a/ml_benchmark/workload/mnist/mlp_objective.py b/ml_benchmark/workload/mnist/mlp_objective.py index 0169b27..169ffc0 100644 --- a/ml_benchmark/workload/mnist/mlp_objective.py +++ b/ml_benchmark/workload/mnist/mlp_objective.py @@ -1,7 +1,7 @@ import torch import tqdm from ml_benchmark.config import MLPHyperparameter -from ml_benchmark.latency_tracker import latency_decorator +from ml_benchmark.decorators import latency_decorator, validation_latency_decorator from ml_benchmark.workload.mnist.mlp import MLP from ml_benchmark.workload.objective import Objective from sklearn.metrics import classification_report @@ -11,6 +11,7 @@ class MLPObjective(Objective): def __init__(self, epochs, train_loader, val_loader, test_loader, input_size, output_size) -> None: + super().__init__() self.train_loader = train_loader self.val_laoder = val_loader self.test_loader = test_loader @@ -26,6 +27,9 @@ def set_hyperparameters(self, hyperparameters: dict): print(self.hyperparameters) self.hyperparameters.update(hyperparameters) print(self.hyperparameters) + + def get_hyperparameters(self) -> dict: + return self.hyperparameters def set_device(self, device_str: str = None): if device_str: @@ -50,7 +54,7 @@ def train(self): epoch_losses.append(sum(batch_losses)/len(batch_losses)) return {"train_loss": epoch_losses} - @latency_decorator + @validation_latency_decorator def validate(self): self.model.eval() self.model = self.model.to(self.device) diff --git a/ml_benchmark/workload/objective.py b/ml_benchmark/workload/objective.py index 709d275..a441741 100644 --- a/ml_benchmark/workload/objective.py +++ b/ml_benchmark/workload/objective.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod -import os +from numpy import random +from datetime import datetime from ml_benchmark.config import MetricsStorageConfig @@ -10,6 +11,24 @@ class Objective(ABC): Interface for a training, validation and test procedure of a model. """ + def __init__(self) -> None: + self._unique_id = random.randint(0, 1000000) + self._created_at = datetime.now() + + @abstractmethod + def set_hyperparameters(self, hyperparameters: dict): + """ + Set the hyperparameters of the objective. + """ + pass + + @abstractmethod + def get_hyperparameters(self) -> dict: + """ + Get the hyperparameters of the objective. + """ + pass + @abstractmethod def train(self): pass diff --git a/setup.py b/setup.py index e8b67f0..37c485d 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,8 @@ def main(): python_requires=">=3.6", include_package_data=True, extras_require={"test": package.test_install_requires}, - long_description="/README.md" + long_description=package.long_description, + long_description_content_type="text/markdown" ) diff --git a/test/conftest.py b/test/conftest.py index c93523c..5aab816 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,8 @@ import pytest -from ml_benchmark.latency_tracker import latency_decorator +import requests +import os + +from ml_benchmark.decorators import latency_decorator, validation_latency_decorator from ml_benchmark.workload.objective import Objective @@ -13,11 +16,29 @@ def __init__(self) -> None: def train(self): pass - @latency_decorator + def get_hyperparameters(self) -> dict: + return {"test":True} + + def set_hyperparameters(self, hyperparameters: dict): + pass + + @validation_latency_decorator def validate(self): - return 0.5 + return {"macro avg":{"f1-score":0.5}} @latency_decorator def test(self): return {"score": 0.5} return TestObjective + +@pytest.fixture +def prometeus_url(): + url = os.environ.get("PROMETHEUS_URL", "http://localhost:9090") + try: + resp = requests.get(url) + if resp.status_code != 200: + pytest.skip("Prometheus is availible") + except Exception: + pytest.skip("Could not connect to Prometheus.") + + return url \ No newline at end of file diff --git a/test/test_ml_benchmark/test.yaml b/test/test_ml_benchmark/test.yaml new file mode 100644 index 0000000..968d6d9 --- /dev/null +++ b/test/test_ml_benchmark/test.yaml @@ -0,0 +1,13 @@ +kubernetesContext: "minikube" +metricsIP: auto +kubernetesMasterIP: minikube +deleteAfterRun: true +hyperparameter: + learning_rate: + start: 1e-4 + end: 1e-2 + step_size: 1e-5 + hidden_layer_config: + start: [10] + end: [100, 100, 100] + step_size: [10, 1] diff --git a/test/test_ml_benchmark/test_latency_tracker.py b/test/test_ml_benchmark/test_latency_tracker.py index 15292cb..58f8855 100644 --- a/test/test_ml_benchmark/test_latency_tracker.py +++ b/test/test_ml_benchmark/test_latency_tracker.py @@ -1,7 +1,7 @@ from ml_benchmark.metrics_storage import MetricsStorage import docker import json - +import os def test_latency_decorator(objective): objective = objective() @@ -18,3 +18,20 @@ def test_latency_decorator(objective): metrics_storage.stop_db() assert isinstance(json.dumps(result), str) + +def test_latency_decorator_using_env(objective): + objective = objective() + metrics_storage = MetricsStorage() + + try: + metrics_storage.start_db() + os.environ["METRICS_STORAGE_HOST"] = MetricsStorage.host + objective.train() + objective.validate() + objective.test() + result = metrics_storage.get_benchmark_results() + metrics_storage.stop_db() + except docker.errors.APIError: + metrics_storage.stop_db() + + assert isinstance(json.dumps(result), str) \ No newline at end of file diff --git a/test/test_ml_benchmark/test_metrics.py b/test/test_ml_benchmark/test_metrics.py new file mode 100644 index 0000000..8fbeab2 --- /dev/null +++ b/test/test_ml_benchmark/test_metrics.py @@ -0,0 +1,36 @@ +import imp +import logging +from ml_benchmark.metrics_storage import MetricsStorage +from ml_benchmark.resource_tracker import ResourceTracker +from ml_benchmark.results_tracker import ResultTracker + +from ml_benchmark.workload.mnist.mnist_task import MnistTask +from time import sleep + +def test_metrics(prometeus_url): + task = MnistTask({"epochs": 1}) + objective = task.create_objective() + metrics_storage = MetricsStorage() + resourceTracker = ResourceTracker(prometheus_url=prometeus_url) + try: + metrics_storage.start_db() + sleep(2) + resourceTracker.start() + objective.set_hyperparameters({"learning_rate":1e-3}) + objective.train() + score = objective.validate() + objective.test() + + sleep(15) + + result = metrics_storage.get_benchmark_results() + logging.info(result) + + assert len(result["latency"]) > 0 + assert len(result["classification"]) > 0 + assert len(result["resources"]) > 0 + except Exception as e: + assert False, e + finally: + resourceTracker.stop() + metrics_storage.stop_db() diff --git a/test/test_ml_benchmark/test_resouce_tracker.py b/test/test_ml_benchmark/test_resouce_tracker.py new file mode 100644 index 0000000..14a61f8 --- /dev/null +++ b/test/test_ml_benchmark/test_resouce_tracker.py @@ -0,0 +1,26 @@ + +import logging +from ml_benchmark.resource_tracker import ResourceTracker +from ml_benchmark.metrics_storage import LoggingStoreStrategy + + +def test_resouce_tracker(prometeus_url): + import time + logging.basicConfig(level=logging.DEBUG) + rt = ResourceTracker(prometheus_url=prometeus_url, resouce_store=LoggingStoreStrategy) + rt.start() + time.sleep(ResourceTracker.UPDATE_INTERVAL * 15) + rt.stop() + print(rt.store.log) + assert rt.store.log != [] + +def test_resouce_tracker_with_namespace(prometeus_url): + import time + logging.basicConfig(level=logging.DEBUG) + rt = ResourceTracker(prometheus_url=prometeus_url, resouce_store=LoggingStoreStrategy) + rt.namespace = "optuna-study" + rt.start() + time.sleep(ResourceTracker.UPDATE_INTERVAL * 15) + rt.stop() + print(rt.store.log) + assert rt.store.log != [] \ No newline at end of file diff --git a/test/test_ml_benchmark/yaml_test.py b/test/test_ml_benchmark/yaml_test.py new file mode 100644 index 0000000..9e60b78 --- /dev/null +++ b/test/test_ml_benchmark/yaml_test.py @@ -0,0 +1,16 @@ +import logging +from os import path +from ml_benchmark.utils.yml_parser import YMLParser +from ml_benchmark.utils.yaml_template_filler import YamlTemplateFiller + +def test(): + resources = YMLParser.parse(path.join(path.dirname(__file__),"test.yaml")) + assert resources["deleteAfterRun"] + + print(resources["hyperparameter"]) + + YamlTemplateFiller.as_yaml(path.join(path.dirname(__file__),"hyperparameter_space.yml"), resources["hyperparameter"]) + params = YMLParser.parse(path.join(path.dirname(__file__),"hyperparameter_space.yml")) + assert params == resources["hyperparameter"] + + \ No newline at end of file