From cb0be2b0872af4aa04daebf24b6d7c236dcfe38c Mon Sep 17 00:00:00 2001 From: zlatsic Date: Mon, 23 Sep 2024 17:52:02 +0200 Subject: [PATCH] ex 3 condense all into a single module, wip shortening it --- examples/0003/README.md | 6 +- examples/0003/conf/event.yaml | 42 ++++- .../{controller/module.py => controller.py} | 178 ++++++++++++++++-- .../modules/controller/model/__init__.py | 34 ---- .../modules/controller/model/anomaly.py | 55 ------ .../modules/controller/model/common.py | 77 -------- .../modules/controller/model/forecast.py | 54 ------ 7 files changed, 195 insertions(+), 251 deletions(-) rename examples/0003/src_py/air_supervision/modules/{controller/module.py => controller.py} (57%) delete mode 100644 examples/0003/src_py/air_supervision/modules/controller/model/__init__.py delete mode 100644 examples/0003/src_py/air_supervision/modules/controller/model/anomaly.py delete mode 100644 examples/0003/src_py/air_supervision/modules/controller/model/common.py delete mode 100644 examples/0003/src_py/air_supervision/modules/controller/model/forecast.py diff --git a/examples/0003/README.md b/examples/0003/README.md index 19f5e13..0a1ebeb 100644 --- a/examples/0003/README.md +++ b/examples/0003/README.md @@ -19,7 +19,7 @@ the main GUI app is on 23023 (credentials are `user` and `pass`). [```AnomalyModule```](src_py/air_supervision/modules/controller/anomaly.py) and [```ForecastModule```](src_py/air_supervision/modules/controller/forecast.py) run in parallel. Both are inherited from -[```GenericReadingsModule```](src_py/air_supervision/modules/controller/module.py). +[```GenericReadingsModule```](src_py/air_supervision/modules/controller.py). Both recieve new data from the device with event types: ```('gui', 'system', 'timeseries', 'reading')```. The received data is being saved in their @@ -58,7 +58,7 @@ current_model = 'linear' ``` -[```FitLock```](src_py/air_supervision/modules/controller/module.py) class is +[```FitLock```](src_py/air_supervision/modules/controller.py) class is used to manage which model is the current one,and in which state it is(is it fitted or not or is it even defined, that way we cant send predict actions beforehand). @@ -68,7 +68,7 @@ model with a same name in backend. When we get confim message, we fit that model and prepare data for prediction process. When -[```ReadingsHandler```](src_py/air_supervision/modules/controller/module.py) +[```ReadingsHandler```](src_py/air_supervision/modules/controller.py) is ready and current model is defined and is fitted (we also get a confirm message from AIMM), we send a batch of data to AIMM for prediction. AIMM will return predicted values,that we then send to the adapter. diff --git a/examples/0003/conf/event.yaml b/examples/0003/conf/event.yaml index f5e7115..d923abc 100644 --- a/examples/0003/conf/event.yaml +++ b/examples/0003/conf/event.yaml @@ -14,22 +14,44 @@ backend: timeseries: [] modules: - module: air_supervision.modules.readings - - module: air_supervision.modules.controller.module + - module: air_supervision.modules.controller model_family: anomaly - supported_models: - - Forest - - SVM - - Cluster batch_size: 48 min_readings: 24 - - module: air_supervision.modules.controller.module + models: + SVM: + contamination: 0.3 + svm1: 1 + svm2: 2 + Cluster: + contamination: 0.3 + cluster1: 1 + cluster2: 3 + Forest: + contamination: 0.3 + other_test_p: 1 + third: 4 + Forest2: + contamination: 0.3 + other_test_p: 1 + third: 4 + - module: air_supervision.modules.controller model_family: forecast - supported_models: - - MultiOutputSVR - - Linear - - Constant batch_size: 48 min_readings: 24 + models: + MultiOutputSVR: + C: 2000 + svm1: 1 + svm2: 2 + Linear: + contamination: 0.3 + cluster1: 1 + cluster2: 3 + Constant: + contamination: 0.3 + other_test_p: 1 + third: 4 - module: air_supervision.modules.enable_all eventer_server: host: 127.0.0.1 diff --git a/examples/0003/src_py/air_supervision/modules/controller/module.py b/examples/0003/src_py/air_supervision/modules/controller.py similarity index 57% rename from examples/0003/src_py/air_supervision/modules/controller/module.py rename to examples/0003/src_py/air_supervision/modules/controller.py index 77beaaa..bc81f41 100644 --- a/examples/0003/src_py/air_supervision/modules/controller/module.py +++ b/examples/0003/src_py/air_supervision/modules/controller.py @@ -1,6 +1,10 @@ +import abc +import csv from datetime import datetime +from enum import Enum +from itertools import count -from air_supervision.modules.controller import model +import numpy from hat.event import common from typing import Any import hat.aio @@ -9,9 +13,6 @@ import logging mlog = logging.getLogger(__name__) -json_schema_id = None -json_schema_repo = None -_source_id = 0 class FitLock: @@ -19,15 +20,9 @@ def __init__(self): self.lock = True self.current_model = None - def get_current_model(self): - return self.current_model - def can_fit(self): return not self.lock - def can_predict(self): - return not self.lock - def created(self, model): self.current_model = model @@ -44,9 +39,9 @@ def __init__(self, conf, engine, source): self._engine = engine self._source = source self._model_family = conf["model_family"] - self._supported_models = conf["supported_models"] self._batch_size = conf["batch_size"] self._min_readings = conf["min_readings"] + self._models_conf = conf["models"] self._subscription = hat.event.common.create_subscription([ ("user_action", self._model_family, "*"), @@ -54,6 +49,8 @@ def __init__(self, conf, engine, source): ("gui", "system", "timeseries", "reading"), ]) + self._model_prefix = f"air_supervision.aimm.{self._model_family}" + self._async_group = hat.aio.Group() self._readings = [] @@ -83,6 +80,17 @@ async def process(self, source, event): events = list(self._process_user_action(event)) return events + async def register_with_action_id( + self, + model_type, + request_id, + return_type, + events + ): + await self._engine.register(self._source, events) + self._request_ids[request_id] = (return_type, model_type) + + def _process_aimm(self, event): msg_type = event.type[1] if msg_type == "state": @@ -111,16 +119,16 @@ def _process_action(self, event): type_, model_name = self._request_ids[request_id] - if type_ == model.ReturnType.CREATE: + if type_ == ReturnType.CREATE: self._lock.created(model_name) self._async_group.spawn(self._models[model_name].fit) yield self._message(model_name, "new_current_model") params = self._models[model_name].hyperparameters yield self._message(params, "setting") - elif type_ == model.ReturnType.FIT: + elif type_ == ReturnType.FIT: self._lock.fitted() - elif type_ == model.ReturnType.PREDICT: + elif type_ == ReturnType.PREDICT: yield from self._process_predict(event) else: del self._request_ids[request_id] @@ -137,7 +145,7 @@ def _process_predict(self, event): ) def _process_reading(self, event): - yield self._message(self._supported_models, "supported_models") + yield self._message(list(self._models_conf), "supported_models") if not self._lock.can_fit(): return row = self._transform_row( @@ -200,11 +208,14 @@ def _process_model_change(self, event): yield self._message(received_model_name, "new_current_model") self._lock.changed(received_model_name) - new_model = model.factory( - self._model_family, received_model_name, self + new_model = Model( + self._model_family, + self, + f"{self._model_prefix}.{received_model_name}", + self._models_conf[received_model_name], ) - self._models[new_model.model_type] = new_model + self._models[new_model.model_class] = new_model self._async_group.spawn(new_model.create_instance) def _message(self, data, type_name): @@ -214,6 +225,137 @@ def _message(self, data, type_name): info = common.ModuleInfo(create=Controller) +request_id_counter = count(0) + + +class ReturnType(Enum): + CREATE = 1 + FIT = 2 + PREDICT = 3 + + +class Model(abc.ABC): + def __init__(self, model_family, module, model_class, hyperparameters): + self._model_family = model_family + self._module = module + + self._id = None + self._model_class = model_class + + self._hyperparameters = hyperparameters + self._executor = hat.aio.create_executor() + + @property + def model_class(self): + return self._model_class + + @property + def hyperparameters(self): + return self._hyperparameters + + def set_id(self, model_id): + self._id = model_id + + async def fit(self, **kwargs): + """Method used to invoke model fitting. + + Args: + **kwargs: matches concrete model's hyperparameters""" + if not self._id or self._model_family not in ("anomaly", "forecast"): + return + + dataset_fn = _ext_forecast_dataset + if self._model_family == "anomaly": + dataset_fn = _ext_anomaly_dataset + data = { + "args": await self._executor(dataset_fn), + "kwargs": kwargs, + "request_id": str(next(request_id_counter)), + } + await self._register_event( + ("aimm", "fit", self._id), data, ReturnType.FIT + ) + + async def create_instance(self): + event_type = ("aimm", "create_instance") + data = { + "model_type": self._model_class, + "args": [], + "kwargs": self.hyperparameters, + "request_id": str(next(request_id_counter)), + } + + await self._register_event(event_type, data, ReturnType.CREATE) + + async def predict(self, model_input): + event_type = ("aimm", "predict", self._id) + data = { + "args": model_input, + "kwargs": {}, + "request_id": str(next(request_id_counter)), + } + + await self._register_event(event_type, data, ReturnType.PREDICT) + + async def _register_event(self, event_type, data, return_type): + request_id = data["request_id"] + await self._module.register_with_action_id( + self._model_class, + request_id, + return_type, + [ + hat.event.common.RegisterEvent( + type=event_type, + source_timestamp=None, + payload=hat.event.common.EventPayloadJson(data), + ) + ] + ) + + +def _ext_line_generator(): + with open("dataset/ambient_temperature_system_failure.csv", "r") as f: + reader = csv.reader(f, delimiter="\t") + for i, line in enumerate(reader): + if not i: + continue + yield line + + +def _ext_forecast_dataset(): + values = [] + for line in _ext_line_generator(): + raw_value = float(line[0].split(",")[1]) + values.append((float(raw_value) - 32) * 5 / 9) + + x, y = [], [] + for i in range(48, len(values) - 24, 24): + x.append(values[i - 48 : i]) + y.append(values[i : i + 24]) + + x, y = numpy.array(x), numpy.array(y) + + fit_start = int(len(x) * 0.25) + return [x[fit_start:].tolist(), y[fit_start:].tolist()] + + +def _ext_anomaly_dataset(): + train_data = [] + for line in _ext_line_generator(): + timestamp = datetime.strptime( + line[0].split(",")[0], "%Y-%m-%d %H:%M:%S" + ) + value = float(line[0].split(",")[1]) + value = (float(value) - 32) * 5 / 9 + train_data.append([ + value, + timestamp.hour, + int((timestamp.hour >= 7) & (timestamp.hour <= 22)), + timestamp.weekday(), + int(timestamp.weekday() < 5), + ]) + fit_start = int(len(train_data) * 0.25) + return [train_data[fit_start:], None] def _register_event(event_type, payload, source_timestamp=None): diff --git a/examples/0003/src_py/air_supervision/modules/controller/model/__init__.py b/examples/0003/src_py/air_supervision/modules/controller/model/__init__.py deleted file mode 100644 index 8caaa63..0000000 --- a/examples/0003/src_py/air_supervision/modules/controller/model/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -from air_supervision.modules.controller.model.common import ReturnType -from air_supervision.modules.controller.model import anomaly, forecast - - -_type_prefix = "air_supervision.aimm" -_anomaly_prefix = f"{_type_prefix}.anomaly" -_forecast_prefix = f"{_type_prefix}.forecast" - - -def factory(model_type, model_name, module): - if model_type == "anomaly": - params = { - "SVM": {"contamination": 0.3, "svm1": 1, "svm2": 2}, - "Cluster": {"contamination": 0.3, "cluster1": 1, "cluster2": 3}, - "Forest": {"contamination": 0.3, "other_test_p": 1, "third": 4}, - "Forest2": {"contamination2": 0.3, "other_test_p": 1, "third": 4}, - }[model_name] - return anomaly.AnomalyModel( - module, f"{_anomaly_prefix}.{model_name}", params - ) - elif model_type == "forecast": - params = { - "MultiOutputSVR": {"C": 2000, "svm1": 1, "svm2": 2}, - "Linear": {"contamination": 0.3, "cluster1": 1, "cluster2": 3}, - "Constant": {"contamination2": 0.3, "other_test_p": 1, "third": 4}, - }[model_name] - return forecast.ForecastModel( - module, f"{_forecast_prefix}.{model_name}", params - ) - else: - raise ValueError(f"incorrect model type {model_type}") - - -__all__ = ["factory"] diff --git a/examples/0003/src_py/air_supervision/modules/controller/model/anomaly.py b/examples/0003/src_py/air_supervision/modules/controller/model/anomaly.py deleted file mode 100644 index 14301a9..0000000 --- a/examples/0003/src_py/air_supervision/modules/controller/model/anomaly.py +++ /dev/null @@ -1,55 +0,0 @@ -from air_supervision.modules.controller.model.common import ( - GenericModel, - ReturnType, - request_id_counter, -) -from datetime import datetime -from hat import aio -import csv - - -class AnomalyModel(GenericModel): - def __init__(self, module, model_type, hyperparameters): - super().__init__(module, model_type, hyperparameters) - self._executor = aio.create_executor() - - async def fit(self, **kwargs): - if not self._id: - return - event_type = ("aimm", "fit", self._id) - - train_data = await self._executor(self._ext_get_dataset) - data = { - "args": [train_data, None], - "kwargs": kwargs, - "request_id": str(next(request_id_counter)), - } - await self._register_event(event_type, data, ReturnType.FIT) - - def _ext_get_dataset(self): - train_data = [] - - with open("dataset/ambient_temperature_system_failure.csv", "r") as f: - reader = csv.reader(f, delimiter="\t") - for i, line in enumerate(reader): - if not i: - continue - timestamp = datetime.strptime( - line[0].split(",")[0], "%Y-%m-%d %H:%M:%S" - ) - value = float(line[0].split(",")[1]) - - value = (float(value) - 32) * 5 / 9 - - train_data.append( - [ - value, - timestamp.hour, - int((timestamp.hour >= 7) & (timestamp.hour <= 22)), - timestamp.weekday(), - int(timestamp.weekday() < 5), - ] - ) - - fit_start = int(len(train_data) * 0.25) - return train_data[fit_start:] diff --git a/examples/0003/src_py/air_supervision/modules/controller/model/common.py b/examples/0003/src_py/air_supervision/modules/controller/model/common.py deleted file mode 100644 index 094bd62..0000000 --- a/examples/0003/src_py/air_supervision/modules/controller/model/common.py +++ /dev/null @@ -1,77 +0,0 @@ -import abc -import hat.aio -import hat.event.common -from enum import Enum -from itertools import count - - -request_id_counter = count(0) - - -class ReturnType(Enum): - CREATE = 1 - FIT = 2 - PREDICT = 3 - - -class GenericModel(abc.ABC): - def __init__(self, module, model_type, hyperparameters): - self._module = module - - self._id = None - self._model_type = model_type - - self._hyperparameters = hyperparameters - - @property - def model_type(self): - return self._model_type - - @property - def hyperparameters(self): - return self._hyperparameters - - def set_id(self, model_id): - self._id = model_id - - @abc.abstractmethod - async def fit(self, **kwargs): - """Method used to invoke model fitting. - - Args: - **kwargs: matches concrete model's hyperparameters""" - - async def create_instance(self): - event_type = ("aimm", "create_instance") - data = { - "model_type": self.model_type, - "args": [], - "kwargs": self.hyperparameters, - "request_id": str(next(request_id_counter)), - } - - await self._register_event(event_type, data, ReturnType.CREATE) - - async def predict(self, model_input): - event_type = ("aimm", "predict", self._id) - data = { - "args": model_input, - "kwargs": {}, - "request_id": str(next(request_id_counter)), - } - - await self._register_event(event_type, data, ReturnType.PREDICT) - - async def _register_event(self, event_type, data, return_type): - await self._module._engine.register( - self._module._source, - [ - hat.event.common.RegisterEvent( - type=event_type, - source_timestamp=None, - payload=hat.event.common.EventPayloadJson(data), - ) - ], - ) - request_id = data["request_id"] - self._module._request_ids[request_id] = (return_type, self.model_type) diff --git a/examples/0003/src_py/air_supervision/modules/controller/model/forecast.py b/examples/0003/src_py/air_supervision/modules/controller/model/forecast.py deleted file mode 100644 index 4c933c1..0000000 --- a/examples/0003/src_py/air_supervision/modules/controller/model/forecast.py +++ /dev/null @@ -1,54 +0,0 @@ -from air_supervision.modules.controller.model.common import ( - GenericModel, - ReturnType, - request_id_counter, -) -from hat import aio -import csv -import numpy -import logging - - -mlog = logging.getLogger(__name__) - - -class ForecastModel(GenericModel): - def __init__(self, module, model_type, hyperparameters): - super().__init__(module, model_type, hyperparameters) - self._executor = aio.create_executor() - - async def fit(self, **kwargs): - if not self._id: - return - event_type = ("aimm", "fit", self._id) - - x, y = await self._executor(self._ext_get_dataset) - data = { - "args": [x.tolist(), y.tolist()], - "kwargs": kwargs, - "request_id": str(next(request_id_counter)), - } - await self._register_event(event_type, data, ReturnType.FIT) - - def _ext_get_dataset(self): - values = [] - - with open("dataset/ambient_temperature_system_failure.csv", "r") as f: - reader = csv.reader(f, delimiter="\t") - for i, line in enumerate(reader): - if not i: - continue - value = float(line[0].split(",")[1]) - value = (float(value) - 32) * 5 / 9 - - values.append(value) - - x, y = [], [] - for i in range(48, len(values) - 24, 24): - x.append(values[i - 48 : i]) - y.append(values[i : i + 24]) - - x, y = numpy.array(x), numpy.array(y) - - fit_start = int(len(x) * 0.25) - return x[fit_start:], y[fit_start:]