Skip to content

Commit

Permalink
ex 3 condense all into a single module, wip shortening it
Browse files Browse the repository at this point in the history
  • Loading branch information
zlatsic committed Sep 23, 2024
1 parent e8b9d12 commit cb0be2b
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 251 deletions.
6 changes: 3 additions & 3 deletions examples/0003/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ the main GUI app is on 23023 (credentials are `user` and `pass`).
[```AnomalyModule```](src_py/air_supervision/modules/controller/anomaly.py) and
[```ForecastModule```](src_py/air_supervision/modules/controller/forecast.py)
run in parallel. Both are inherited from
[```GenericReadingsModule```](src_py/air_supervision/modules/controller/module.py).
[```GenericReadingsModule```](src_py/air_supervision/modules/controller.py).

Both recieve new data from the device with event types: ```('gui', 'system',
'timeseries', 'reading')```. The received data is being saved in their
Expand Down Expand Up @@ -58,7 +58,7 @@ current_model = 'linear'
```


[```FitLock```](src_py/air_supervision/modules/controller/module.py) class is
[```FitLock```](src_py/air_supervision/modules/controller.py) class is
used to manage which model is the current one,and in which state it is(is it
fitted or not or is it even defined, that way we cant send predict actions
beforehand).
Expand All @@ -68,7 +68,7 @@ model with a same name in backend. When we get confim message, we fit that
model and prepare data for prediction process.

When
[```ReadingsHandler```](src_py/air_supervision/modules/controller/module.py)
[```ReadingsHandler```](src_py/air_supervision/modules/controller.py)
is ready and current model is defined and is fitted (we also get a confirm
message from AIMM), we send a batch of data to AIMM for prediction. AIMM will
return predicted values,that we then send to the adapter.
Expand Down
42 changes: 32 additions & 10 deletions examples/0003/conf/event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,44 @@ backend:
timeseries: []
modules:
- module: air_supervision.modules.readings
- module: air_supervision.modules.controller.module
- module: air_supervision.modules.controller
model_family: anomaly
supported_models:
- Forest
- SVM
- Cluster
batch_size: 48
min_readings: 24
- module: air_supervision.modules.controller.module
models:
SVM:
contamination: 0.3
svm1: 1
svm2: 2
Cluster:
contamination: 0.3
cluster1: 1
cluster2: 3
Forest:
contamination: 0.3
other_test_p: 1
third: 4
Forest2:
contamination: 0.3
other_test_p: 1
third: 4
- module: air_supervision.modules.controller
model_family: forecast
supported_models:
- MultiOutputSVR
- Linear
- Constant
batch_size: 48
min_readings: 24
models:
MultiOutputSVR:
C: 2000
svm1: 1
svm2: 2
Linear:
contamination: 0.3
cluster1: 1
cluster2: 3
Constant:
contamination: 0.3
other_test_p: 1
third: 4
- module: air_supervision.modules.enable_all
eventer_server:
host: 127.0.0.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import abc
import csv
from datetime import datetime
from enum import Enum
from itertools import count

from air_supervision.modules.controller import model
import numpy
from hat.event import common
from typing import Any
import hat.aio
Expand All @@ -9,25 +13,16 @@
import logging

mlog = logging.getLogger(__name__)
json_schema_id = None
json_schema_repo = None
_source_id = 0


class FitLock:
def __init__(self):
self.lock = True
self.current_model = None

def get_current_model(self):
return self.current_model

def can_fit(self):
return not self.lock

def can_predict(self):
return not self.lock

def created(self, model):
self.current_model = model

Expand All @@ -44,16 +39,18 @@ def __init__(self, conf, engine, source):
self._engine = engine
self._source = source
self._model_family = conf["model_family"]
self._supported_models = conf["supported_models"]
self._batch_size = conf["batch_size"]
self._min_readings = conf["min_readings"]
self._models_conf = conf["models"]

self._subscription = hat.event.common.create_subscription([
("user_action", self._model_family, "*"),
("aimm", "*"),
("gui", "system", "timeseries", "reading"),
])

self._model_prefix = f"air_supervision.aimm.{self._model_family}"

self._async_group = hat.aio.Group()

self._readings = []
Expand Down Expand Up @@ -83,6 +80,17 @@ async def process(self, source, event):
events = list(self._process_user_action(event))
return events

async def register_with_action_id(
self,
model_type,
request_id,
return_type,
events
):
await self._engine.register(self._source, events)
self._request_ids[request_id] = (return_type, model_type)


def _process_aimm(self, event):
msg_type = event.type[1]
if msg_type == "state":
Expand Down Expand Up @@ -111,16 +119,16 @@ def _process_action(self, event):

type_, model_name = self._request_ids[request_id]

if type_ == model.ReturnType.CREATE:
if type_ == ReturnType.CREATE:
self._lock.created(model_name)
self._async_group.spawn(self._models[model_name].fit)

yield self._message(model_name, "new_current_model")
params = self._models[model_name].hyperparameters
yield self._message(params, "setting")
elif type_ == model.ReturnType.FIT:
elif type_ == ReturnType.FIT:
self._lock.fitted()
elif type_ == model.ReturnType.PREDICT:
elif type_ == ReturnType.PREDICT:
yield from self._process_predict(event)
else:
del self._request_ids[request_id]
Expand All @@ -137,7 +145,7 @@ def _process_predict(self, event):
)

def _process_reading(self, event):
yield self._message(self._supported_models, "supported_models")
yield self._message(list(self._models_conf), "supported_models")
if not self._lock.can_fit():
return
row = self._transform_row(
Expand Down Expand Up @@ -200,11 +208,14 @@ def _process_model_change(self, event):
yield self._message(received_model_name, "new_current_model")

self._lock.changed(received_model_name)
new_model = model.factory(
self._model_family, received_model_name, self
new_model = Model(
self._model_family,
self,
f"{self._model_prefix}.{received_model_name}",
self._models_conf[received_model_name],
)

self._models[new_model.model_type] = new_model
self._models[new_model.model_class] = new_model
self._async_group.spawn(new_model.create_instance)

def _message(self, data, type_name):
Expand All @@ -214,6 +225,137 @@ def _message(self, data, type_name):


info = common.ModuleInfo(create=Controller)
request_id_counter = count(0)


class ReturnType(Enum):
CREATE = 1
FIT = 2
PREDICT = 3


class Model(abc.ABC):
def __init__(self, model_family, module, model_class, hyperparameters):
self._model_family = model_family
self._module = module

self._id = None
self._model_class = model_class

self._hyperparameters = hyperparameters
self._executor = hat.aio.create_executor()

@property
def model_class(self):
return self._model_class

@property
def hyperparameters(self):
return self._hyperparameters

def set_id(self, model_id):
self._id = model_id

async def fit(self, **kwargs):
"""Method used to invoke model fitting.
Args:
**kwargs: matches concrete model's hyperparameters"""
if not self._id or self._model_family not in ("anomaly", "forecast"):
return

dataset_fn = _ext_forecast_dataset
if self._model_family == "anomaly":
dataset_fn = _ext_anomaly_dataset
data = {
"args": await self._executor(dataset_fn),
"kwargs": kwargs,
"request_id": str(next(request_id_counter)),
}
await self._register_event(
("aimm", "fit", self._id), data, ReturnType.FIT
)

async def create_instance(self):
event_type = ("aimm", "create_instance")
data = {
"model_type": self._model_class,
"args": [],
"kwargs": self.hyperparameters,
"request_id": str(next(request_id_counter)),
}

await self._register_event(event_type, data, ReturnType.CREATE)

async def predict(self, model_input):
event_type = ("aimm", "predict", self._id)
data = {
"args": model_input,
"kwargs": {},
"request_id": str(next(request_id_counter)),
}

await self._register_event(event_type, data, ReturnType.PREDICT)

async def _register_event(self, event_type, data, return_type):
request_id = data["request_id"]
await self._module.register_with_action_id(
self._model_class,
request_id,
return_type,
[
hat.event.common.RegisterEvent(
type=event_type,
source_timestamp=None,
payload=hat.event.common.EventPayloadJson(data),
)
]
)


def _ext_line_generator():
with open("dataset/ambient_temperature_system_failure.csv", "r") as f:
reader = csv.reader(f, delimiter="\t")
for i, line in enumerate(reader):
if not i:
continue
yield line


def _ext_forecast_dataset():
values = []
for line in _ext_line_generator():
raw_value = float(line[0].split(",")[1])
values.append((float(raw_value) - 32) * 5 / 9)

x, y = [], []
for i in range(48, len(values) - 24, 24):
x.append(values[i - 48 : i])
y.append(values[i : i + 24])

x, y = numpy.array(x), numpy.array(y)

fit_start = int(len(x) * 0.25)
return [x[fit_start:].tolist(), y[fit_start:].tolist()]


def _ext_anomaly_dataset():
train_data = []
for line in _ext_line_generator():
timestamp = datetime.strptime(
line[0].split(",")[0], "%Y-%m-%d %H:%M:%S"
)
value = float(line[0].split(",")[1])
value = (float(value) - 32) * 5 / 9
train_data.append([
value,
timestamp.hour,
int((timestamp.hour >= 7) & (timestamp.hour <= 22)),
timestamp.weekday(),
int(timestamp.weekday() < 5),
])
fit_start = int(len(train_data) * 0.25)
return [train_data[fit_start:], None]


def _register_event(event_type, payload, source_timestamp=None):
Expand Down

This file was deleted.

Loading

0 comments on commit cb0be2b

Please sign in to comment.