Skip to content

Commit

Permalink
Merge pull request #25 from ORNL/dev
Browse files Browse the repository at this point in the history
Main < Dev
  • Loading branch information
renan-souza authored Nov 18, 2022
2 parents 7999f94 + 367179d commit d11cf86
Show file tree
Hide file tree
Showing 23 changed files with 361 additions and 87 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/code-formatting.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
name: Code Formatting

on: [push, pull_request]

permissions:
Expand All @@ -9,7 +8,7 @@ jobs:
build:

runs-on: ubuntu-latest

if: "!contains(github.event.head_commit.message, 'CI Bot')"
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
Expand All @@ -24,5 +23,5 @@ jobs:
run: black --check .
- name: Run flake8 checks
run: |
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --max-complexity=10 --max-line-length=79 --statistics
flake8 . --count --show-source --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --show-source --max-complexity=10 --max-line-length=79 --statistics
2 changes: 1 addition & 1 deletion .github/workflows/create-release-n-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
curl --data "{\"tag_name\": \"v${CURRENT_VERSION}\",
\"target_commitish\": \"${TARGET}\",
\"name\": \"v${CURRENT_VERSION}\",
\"body\": \"Release of version ${CURRENT_VERSION}\",
\"body\": \"Release of version ${CURRENT_VERSION}.\nRun `pip install flowcept==${CURRENT_VERSION}` to install this version.\",
\"make_latest\": \"true\",
\"draft\": false,
\"prerelease\": false}" \
Expand Down
8 changes: 3 additions & 5 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
build:

runs-on: ubuntu-latest

if: "!contains(github.event.head_commit.message, 'CI Bot')"
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
Expand All @@ -27,10 +27,8 @@ jobs:
pip install -r extra_requirements/$requirements_file;
done;
fi
- name: Run Redis
run: docker run -p 6379:6379 --name redis -d redis
- name: Run RabbitMQ for Zambeze Plugin tests
run: docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.11-management
- name: Run Docker Compose
run: docker compose -f deployment/compose.yml up -d
- name: Test with pytest
run: |
pytest
2 changes: 1 addition & 1 deletion .github/workflows/version_bumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
f.write(
f"""# WARNING: CHANGE THIS FILE MANUALLY ONLY TO RESOLVE CONFLICTS!
# This file is supposed to be automatically modified by the CI Bot.
# The expected format is: v<Major>.<Minor>.<Patch><optional: branch>
# The expected format is: <Major>.<Minor>.<Patch><optional: branch>
# See .github/workflows/version_bumper.py
__version__ = "{new_version}"
"""
Expand Down
12 changes: 12 additions & 0 deletions bin/clean_directory.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Make sure you run this from the directory root
rm -rf .pytest_cache \
.build \
.dist \
build \
dist \
*egg* \
mlruns \
mlflow.db

rm -rf **/*mlruns*
rm -rf */*/*mlruns*
20 changes: 20 additions & 0 deletions deployment/compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: '3.8'
services:
flowcept_redis:
container_name: flowcept_redis
image: redis
ports:
- 6379:6379

mlflow_interceptor_redis:
container_name: mlflow_interceptor_redis
image: redis
ports:
- 60379:6379

zambeze_rabbitmq:
container_name: zambeze_rabbitmq
image: rabbitmq:3.11-management
ports:
- 5672:5672
- 15672:15672
3 changes: 2 additions & 1 deletion extra_requirements/mlflow-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mlflow-skinny
sqlalchemy
SQLAlchemy>=1.4.42
alembic
sqlparse
watchdog>=2.1.9

39 changes: 28 additions & 11 deletions flowcept/flowceptor/plugins/abstract_flowceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import json
import yaml
import redis
from redis import Redis

from flowcept.commons.vocabulary import Vocabulary
from flowcept.configs import (
Expand All @@ -12,34 +12,35 @@
REDIS_PORT,
REDIS_CHANNEL,
)
from flowcept.flowceptor.plugins.settings_data_classes import (
from flowcept.flowceptor.plugins.settings_dataclasses import (
ZambezeSettings,
KeyValuesToFilter,
KeyValue,
MLFlowSettings,
AbstractSettings,
)


class AbstractFlowceptor(object, metaclass=ABCMeta):
def __init__(self, plugin_key):
self.settings = AbstractFlowceptor.__get_settings(plugin_key)
self._redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
self.settings = AbstractFlowceptor.get_settings(plugin_key)
self._redis = Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)

@staticmethod
def __get_settings(plugin_key):
# TODO: use factory pattern
def get_settings(plugin_key: str) -> AbstractSettings:
# TODO: use the factory pattern
with open(SETTINGS_PATH) as f:
data = yaml.load(f, Loader=yaml.FullLoader)
settings = data[Vocabulary.Settings.PLUGINS][plugin_key]
settings["key"] = plugin_key
settings_obj: AbstractSettings = None
if (
settings[Vocabulary.Settings.KIND]
== Vocabulary.Settings.ZAMBEZE_KIND
):
settings_obj: ZambezeSettings = ZambezeSettings(**settings)
settings_obj.key_values_to_filter = [
KeyValuesToFilter(**item)
for item in settings_obj.key_values_to_filter
KeyValue(**item) for item in settings_obj.key_values_to_filter
]
return settings_obj
elif (
settings[Vocabulary.Settings.KIND]
== Vocabulary.Settings.MLFLOW_KIND
Expand All @@ -49,17 +50,33 @@ def __get_settings(plugin_key):
settings_obj.file_path = os.path.join(
PROJECT_DIR_PATH, settings_obj.file_path
)
return settings_obj
return settings_obj

@abstractmethod
def intercept(self, message: dict):
"""
Method that intercepts the identified data
:param message:
:return:
"""

raise NotImplementedError()

@abstractmethod
def observe(self):
raise NotImplementedError()

@abstractmethod
def callback(self, *args, **kwargs):
"""
Method that decides what do to when a change is identified.
If it's an interesting change, it calls self.intercept; otherwise,
let it go....
"""
raise NotImplementedError()

def post_intercept(self, intercepted_message: dict):
intercepted_message["plugin_key"] = self.settings.key
print(
f"Going to send to Redis an intercepted message:"
f"\n\t{json.dumps(intercepted_message)}"
Expand Down
29 changes: 29 additions & 0 deletions flowcept/flowceptor/plugins/interceptor_state_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from redis import Redis

from flowcept.flowceptor.plugins.settings_dataclasses import AbstractSettings


class InterceptorStateManager(object):
def __init__(self, settings: AbstractSettings):
self._set_name = settings.key

if not hasattr(settings, "redis_host"):
raise Exception(
f"This plugin setting {settings.key} "
f"does not have a Redis Host."
)

self._db = Redis(
host=settings.redis_host,
port=settings.redis_port,
db=0,
)

def clear_set(self):
self._db.delete(self._set_name)

def add_element_id(self, element_id: str):
self._db.sadd(self._set_name, element_id)

def has_element_id(self, element_id) -> bool:
return self._db.sismember(self._set_name, element_id)
94 changes: 94 additions & 0 deletions flowcept/flowceptor/plugins/mlflow/mlflow_dao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from typing import List
from sqlalchemy.engine import Row, create_engine
from textwrap import dedent
from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import RunData
from flowcept.flowceptor.plugins.settings_dataclasses import (
MLFlowSettings,
)


class MLFlowDAO:

_LIMIT = 10
# TODO: This should not at all be hard coded.
# This value needs to be greater than the amount of
# runs inserted in the Runs table at each data observation

def __init__(self, mlflow_settings: MLFlowSettings):
self._engine = MLFlowDAO._get_db_engine(mlflow_settings.file_path)

@staticmethod
def _get_db_engine(sqlite_file):
try:
db_uri = f"sqlite:///{sqlite_file}"
engine = create_engine(db_uri)
return engine
except Exception:
raise Exception(f"Could not create DB engine with uri: {db_uri}")

def get_finished_run_uuids(self) -> List[Row]:
sql = dedent(
f"""
SELECT run_uuid
FROM
runs
WHERE
status = 'FINISHED'
ORDER BY end_time DESC
LIMIT {MLFlowDAO._LIMIT}
"""
)
conn = self._engine.connect()
results = conn.execute(sql).fetchall()
return results

def get_run_data(self, run_uuid: str) -> RunData:
# TODO: consider outer joins to get the run data even if there's
# no metric or param
sql = dedent(
f"""
SELECT r.run_uuid, r.start_time, r.end_time, r.status,
m.key as 'metric_key', m.value as 'metric_value',
p.key as 'parameter_key', p.value as 'parameter_value'
FROM
runs AS r,
metrics as m,
params as p
WHERE
r.run_uuid = m.run_uuid AND
m.run_uuid = p.run_uuid AND
r.run_uuid = '{run_uuid}' AND
r.status = 'FINISHED'
ORDER BY
end_time DESC,
metric_key, metric_value,
parameter_key, parameter_value
LIMIT 30
"""
)
conn = self._engine.connect()
result_set = conn.execute(sql).fetchall()
run_data_dict = {"metrics": {}, "parameters": {}}
for tuple_ in result_set:
tuple_dict = tuple_._asdict()
metric_key = tuple_dict.get("metric_key", None)
metric_value = tuple_dict.get("metric_value", None)
if metric_key and metric_value:
if not (metric_key in run_data_dict["metrics"]):
run_data_dict["metrics"][metric_key] = None
run_data_dict["metrics"][metric_key] = metric_value

param_key = tuple_dict.get("parameter_key", None)
param_value = tuple_dict.get("parameter_value", None)
if param_key and param_value:
if not (param_key in run_data_dict["parameters"]):
run_data_dict["parameters"][param_key] = None
run_data_dict["parameters"][param_key] = param_value

run_data_dict["run_uuid"] = tuple_dict["run_uuid"]
run_data_dict["start_time"] = tuple_dict["start_time"]
run_data_dict["end_time"] = tuple_dict["end_time"]
run_data_dict["status"] = tuple_dict["status"]

run_data = RunData(**run_data_dict)
return run_data
12 changes: 12 additions & 0 deletions flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dataclasses import dataclass


@dataclass
class RunData:

run_uuid: str
start_time: int
end_time: int
metrics: dict
parameters: dict
status: str
34 changes: 27 additions & 7 deletions flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,46 @@
import os
import time
from watchdog.observers import Observer
from flowcept.flowceptor.plugins.abstract_flowceptor import AbstractFlowceptor
from flowcept.flowceptor.plugins.abstract_flowceptor import (
AbstractFlowceptor,
)
from flowcept.flowceptor.plugins.interceptor_state_manager import (
InterceptorStateManager,
)

from flowcept.flowceptor.plugins.mlflow.mlflow_dao import MLFlowDAO
from flowcept.flowceptor.plugins.mlflow.interception_event_handler import (
InterceptionEventHandler,
)


class MLFlowInterceptor(AbstractFlowceptor):
def __init__(self, plugin_key="mlflow"):
super().__init__(plugin_key)
self.state_manager = InterceptorStateManager(self.settings)
self.dao = MLFlowDAO(self.settings)

def intercept(self, message: dict):
super().post_intercept(message)

@staticmethod
def callback(interceptor_instance: "MLFlowInterceptor"):
def callback(self):
"""
function that decides what do to when a change is identified.
This function is called whenever a change is identified in the data.
It decides what to do in the event of a change.
If it's an interesting change, it calls self.intercept; otherwise,
let it go....
"""
# TODO get latest info
interceptor_instance.intercept({"nothing": "yet"})
from time import sleep

sleep(5)
runs = self.dao.get_finished_run_uuids()
for run_uuid_tuple in runs:
run_uuid = run_uuid_tuple[0]
if not self.state_manager.has_element_id(run_uuid):
print(f"We need to intercept this Run: {run_uuid}")
run_data = self.dao.get_run_data(run_uuid)
self.state_manager.add_element_id(run_uuid)
self.intercept(run_data.__dict__)

def observe(self):
event_handler = InterceptionEventHandler(
Expand All @@ -48,7 +68,7 @@ def observe(self):

if __name__ == "__main__":
try:
interceptor = MLFlowInterceptor("mlflow1")
interceptor = MLFlowInterceptor()
interceptor.observe()
while True:
time.sleep(interceptor.settings.watch_interval_sec)
Expand Down
Loading

0 comments on commit d11cf86

Please sign in to comment.