From 3b326b2b92ea759ea9285c2f7d7fd2f19483633f Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 16 Nov 2022 15:16:44 -0500 Subject: [PATCH 01/20] Minor changes in version bumper --- .github/workflows/create-release-n-publish.yml | 2 +- .github/workflows/version_bumper.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/create-release-n-publish.yml b/.github/workflows/create-release-n-publish.yml index 51c3b539..e3afe727 100644 --- a/.github/workflows/create-release-n-publish.yml +++ b/.github/workflows/create-release-n-publish.yml @@ -45,7 +45,7 @@ jobs: curl --data "{\"tag_name\": \"v${CURRENT_VERSION}\", \"target_commitish\": \"${TARGET}\", \"name\": \"v${CURRENT_VERSION}\", - \"body\": \"Release of version ${CURRENT_VERSION}\", + \"body\": \"Release of version ${CURRENT_VERSION}. Run `pip install flowcept==${CURRENT_VERSION}` to install this version.\", \"make_latest\": \"true\", \"draft\": false, \"prerelease\": false}" \ diff --git a/.github/workflows/version_bumper.py b/.github/workflows/version_bumper.py index 9127458a..a0534efa 100644 --- a/.github/workflows/version_bumper.py +++ b/.github/workflows/version_bumper.py @@ -32,7 +32,7 @@ f.write( f"""# WARNING: CHANGE THIS FILE MANUALLY ONLY TO RESOLVE CONFLICTS! # This file is supposed to be automatically modified by the CI Bot. -# The expected format is: v.. +# The expected format is: .. # See .github/workflows/version_bumper.py __version__ = "{new_version}" """ From 338917aea06dbce806f4db1ddf099456f0f0faf3 Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Wed, 16 Nov 2022 20:17:00 +0000 Subject: [PATCH 02/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flowcept/version.py b/flowcept/version.py index 7d6cf04e..17561cb6 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -1,5 +1,5 @@ # WARNING: CHANGE THIS FILE MANUALLY ONLY TO RESOLVE CONFLICTS! # This file is supposed to be automatically modified by the CI Bot. -# The expected format is: v.. +# The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.68dev" +__version__ = "0.0.69dev" From bf6549347abb6d5d04810950061dc432cc65b11d Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Wed, 16 Nov 2022 20:19:45 +0000 Subject: [PATCH 03/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index 17561cb6..29d7ddfb 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.69dev" +__version__ = "0.0.70renan-mlflow-checks" From 293e4fb1de7fb5ee2337dff168c285eb7d962eaa Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 16 Nov 2022 18:00:31 -0500 Subject: [PATCH 04/20] checkpoint of mlflow interception --- .github/workflows/run-unit-tests.yml | 6 +-- deployment/compose.yml | 20 ++++++++++ extra_requirements/mlflow-requirements.txt | 3 +- .../flowceptor/plugins/abstract_flowceptor.py | 4 ++ .../flowceptor/plugins/mlflow/mlflow_dao.py | 35 ++++++++++++++++ .../plugins/mlflow/mlflow_dataclasses.py | 16 ++++++++ .../plugins/mlflow/mlflow_interceptor.py | 21 ++++++++-- .../mlflow_interceptor_state_manager.py | 27 +++++++++++++ .../plugins/settings_data_classes.py | 2 + .../plugins/zambeze/zambeze_interceptor.py | 9 ++++- pyproject.toml | 2 +- resources/settings.yaml | 2 + tests/plugins/test_mlflow.py | 4 +- tests/plugins/test_mlflow_db.py | 40 +++++++++++++++++++ tests/plugins/test_zambeze.py | 11 +++-- 15 files changed, 186 insertions(+), 16 deletions(-) create mode 100644 deployment/compose.yml create mode 100644 flowcept/flowceptor/plugins/mlflow/mlflow_dao.py create mode 100644 flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py create mode 100644 flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py create mode 100644 tests/plugins/test_mlflow_db.py diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 92048ced..e35786a3 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -27,10 +27,8 @@ jobs: pip install -r extra_requirements/$requirements_file; done; fi - - name: Run Redis - run: docker run -p 6379:6379 --name redis -d redis - - name: Run RabbitMQ for Zambeze Plugin tests - run: docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.11-management + - name: Run Docker Compose + run: docker compose -f deployment/compose.yml up - name: Test with pytest run: | pytest diff --git a/deployment/compose.yml b/deployment/compose.yml new file mode 100644 index 00000000..88afaef1 --- /dev/null +++ b/deployment/compose.yml @@ -0,0 +1,20 @@ +version: '3.8' +services: + flowcept_redis: + container_name: flowcept_redis + image: redis + ports: + - 6379:6379 + + mlflow_interceptor_redis: + container_name: mlflow_interceptor_redis + image: redis + ports: + - 60379:6379 + + zambeze_rabbitmq: + container_name: zambeze_rabbitmq + image: rabbitmq:3.11-management + ports: + - 5672:5672 + - 15672:15672 diff --git a/extra_requirements/mlflow-requirements.txt b/extra_requirements/mlflow-requirements.txt index 31c9dcfe..235cc21d 100644 --- a/extra_requirements/mlflow-requirements.txt +++ b/extra_requirements/mlflow-requirements.txt @@ -1,5 +1,6 @@ mlflow-skinny -sqlalchemy +SQLAlchemy>=1.4.42 alembic sqlparse watchdog>=2.1.9 + diff --git a/flowcept/flowceptor/plugins/abstract_flowceptor.py b/flowcept/flowceptor/plugins/abstract_flowceptor.py index bc6bbd7f..a6eb7cc2 100644 --- a/flowcept/flowceptor/plugins/abstract_flowceptor.py +++ b/flowcept/flowceptor/plugins/abstract_flowceptor.py @@ -59,6 +59,10 @@ def intercept(self, message: dict): def observe(self): raise NotImplementedError() + @abstractmethod + def callback(self, *args, **kwargs): + raise NotImplementedError() + def post_intercept(self, intercepted_message: dict): print( f"Going to send to Redis an intercepted message:" diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py b/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py new file mode 100644 index 00000000..74c2b19c --- /dev/null +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py @@ -0,0 +1,35 @@ +from sqlalchemy import create_engine +from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run +from flowcept.flowceptor.plugins.settings_data_classes import ( + MLFlowSettings, +) + + +class MLFlowDAO: + + _LIMIT = 10 + # TODO: This should not at all be hard coded. + # This value needs to be greater than the amount of + # runs inserted in the Runs table at each data observation + + def __init__(self, mlflow_settings: MLFlowSettings): + self._engine = MLFlowDAO._get_db_engine(mlflow_settings.file_path) + + @staticmethod + def _get_db_engine(sqlite_file): + try: + db_uri = f"sqlite:///{sqlite_file}" + engine = create_engine(db_uri) + return engine + except Exception: + raise Exception(f"Could not create DB engine with uri: {db_uri}") + + def get_runs(self): + + sql = ( + f"SELECT {Run.fields} FROM" + f" runs ORDER BY end_time LIMIT {MLFlowDAO._LIMIT}" + ) + conn = self._engine.connect() + results = conn.execute(sql).fetchall() + return results diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py b/flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py new file mode 100644 index 00000000..7fe2cd2c --- /dev/null +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass, fields + + +@dataclass +class Run: + + run_uuid: str + name: str + user_id: str + start_time: int + end_time: int + + @classmethod + @property + def fields(cls): + return ", ".join([field.name for field in fields(cls)]) diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py index 675e3b36..f5d23ccf 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py @@ -2,26 +2,39 @@ import os import time from watchdog.observers import Observer -from flowcept.flowceptor.plugins.abstract_flowceptor import AbstractFlowceptor +from flowcept.flowceptor.plugins.abstract_flowceptor import ( + AbstractFlowceptor, +) +from flowcept.flowceptor.plugins.mlflow.mlflow_dao import MLFlowDAO +from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run from flowcept.flowceptor.plugins.mlflow.interception_event_handler import ( InterceptionEventHandler, ) class MLFlowInterceptor(AbstractFlowceptor): + def __init__(self, plugin_key): + super().__init__(plugin_key) + self.dao = MLFlowDAO(self.settings) + def intercept(self, message: dict): super().post_intercept(message) - @staticmethod - def callback(interceptor_instance: "MLFlowInterceptor"): + def callback(self): """ function that decides what do to when a change is identified. If it's an interesting change, it calls self.intercept; otherwise, let it go.... """ + + runs = self.dao.get_runs() + + for run in runs: + Run(**run) + # TODO get latest info - interceptor_instance.intercept({"nothing": "yet"}) + self.intercept({"nothing": "yet"}) def observe(self): event_handler = InterceptionEventHandler( diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py new file mode 100644 index 00000000..1529577c --- /dev/null +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py @@ -0,0 +1,27 @@ +from redis import Redis +from flowcept.flowceptor.plugins.settings_data_classes import ( + MLFlowSettings, +) + + +class MLFlowInterceptorStateManager: + + _SET_NAME = "runs" + + def __init__(self, mlflow_settings: MLFlowSettings): + self._db = Redis( + host=mlflow_settings.redis_host, + port=mlflow_settings.redis_port, + db=0, + ) + + def clear_set(self): + self._db.delete(MLFlowInterceptorStateManager._SET_NAME) + + def add_run(self, run_id: str): + self._db.sadd(MLFlowInterceptorStateManager._SET_NAME, run_id) + + def has_run(self, run_id): + return self._db.sismember( + MLFlowInterceptorStateManager._SET_NAME, run_id + ) diff --git a/flowcept/flowceptor/plugins/settings_data_classes.py b/flowcept/flowceptor/plugins/settings_data_classes.py index 0641fd94..4f63c50b 100644 --- a/flowcept/flowceptor/plugins/settings_data_classes.py +++ b/flowcept/flowceptor/plugins/settings_data_classes.py @@ -28,6 +28,8 @@ class MLFlowSettings: log_params: List[str] log_metrics: List[str] watch_interval_sec: int + redis_port: int kind: str = "mlflow" observer_type: str = "db" observer_subtype: str = "sqlite" + redis_host: str = "mlflow_interceptor_redis" diff --git a/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py b/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py index bc0a18ed..7de31f9d 100644 --- a/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py +++ b/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py @@ -2,7 +2,9 @@ import sys import json from datetime import datetime -from flowcept.flowceptor.plugins.abstract_flowceptor import AbstractFlowceptor +from flowcept.flowceptor.plugins.abstract_flowceptor import ( + AbstractFlowceptor, +) class ZambezeInterceptor(AbstractFlowceptor): @@ -13,7 +15,10 @@ def intercept(self, message: dict): now = datetime.now() dt_string = now.strftime("%d/%m/%Y %H:%M:%S") # TODO: make constants - intercepted_message = {"time": dt_string, "application_msg": {}} + intercepted_message = { + "time": dt_string, + "application_msg": {}, + } for key in self.settings.keys_to_intercept: if key in message: intercepted_message["application_msg"][key] = message[key] diff --git a/pyproject.toml b/pyproject.toml index 7795d0bf..5b3fb8a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,3 @@ [tool.black] -line-length = 79 +line-length = 78 target-version = ['py37'] diff --git a/resources/settings.yaml b/resources/settings.yaml index c7d9eaa0..3b866069 100644 --- a/resources/settings.yaml +++ b/resources/settings.yaml @@ -6,6 +6,8 @@ plugins: file_path: mlflow.db log_params: ['*'] log_metrics: ['*'] + redis_host: localhost + redis_port: 60379 watch_interval_sec: 2 zambeze1: diff --git a/tests/plugins/test_mlflow.py b/tests/plugins/test_mlflow.py index a201df06..e18bc759 100644 --- a/tests/plugins/test_mlflow.py +++ b/tests/plugins/test_mlflow.py @@ -2,7 +2,9 @@ import threading import time -from flowcept.flowcept_consumer.consumer import consume_intercepted_messages +from flowcept.flowcept_consumer.consumer import ( + consume_intercepted_messages, +) from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor import ( MLFlowInterceptor, ) diff --git a/tests/plugins/test_mlflow_db.py b/tests/plugins/test_mlflow_db.py new file mode 100644 index 00000000..26217abd --- /dev/null +++ b/tests/plugins/test_mlflow_db.py @@ -0,0 +1,40 @@ +import unittest +from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor import ( + MLFlowInterceptor, +) + +from flowcept.flowceptor.plugins.mlflow.mlflow_dao import MLFlowDAO +from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run + +# fmt: off +from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor_state_manager \ + import MLFlowInterceptorStateManager + + +class MLFlowDB(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(MLFlowDB, self).__init__(*args, **kwargs) + interceptor = MLFlowInterceptor("mlflow1") + self.dao = MLFlowDAO(interceptor.settings) + self.mlflow_state = MLFlowInterceptorStateManager(interceptor.settings) + self.mlflow_state.clear_set() + + def test_check_db(self): + self.mlflow_state.add_run("f783309ac32f473b94fa48aa6d484306") + self.mlflow_state.add_run("b885f7b3f05e4afe8f008a146fa09ec6") + self.mlflow_state.add_run("22f19b78539b464fbc3a83f79c670e7f") + + runs = self.dao.get_runs() + + # Check if the last runs have been checked. + for run_tuple in runs: + run = Run(**run_tuple) + if not self.mlflow_state.has_run(run.run_uuid): + print("We need to intercept this! " + run.run_uuid) + self.mlflow_state.add_run(run.run_uuid) + + print() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/plugins/test_zambeze.py b/tests/plugins/test_zambeze.py index d36e3f8d..a3cd212a 100644 --- a/tests/plugins/test_zambeze.py +++ b/tests/plugins/test_zambeze.py @@ -3,12 +3,16 @@ import threading import pika -from flowcept.flowcept_consumer.consumer import consume_intercepted_messages +from flowcept.flowcept_consumer.consumer import ( + consume_intercepted_messages, +) from flowcept.flowceptor.plugins.zambeze.zambeze_interceptor import ( ZambezeInterceptor, ) -from flowcept.flowceptor.plugins.zambeze.zambeze_message import ZambezeMessage +from flowcept.flowceptor.plugins.zambeze.zambeze_message import ( + ZambezeMessage, +) class TestZambeze(unittest.TestCase): @@ -18,7 +22,8 @@ def __init__(self, *args, **kwargs): self._connection = pika.BlockingConnection( pika.ConnectionParameters( - self.interceptor.settings.host, self.interceptor.settings.port + self.interceptor.settings.host, + self.interceptor.settings.port, ) ) self._channel = self._connection.channel() From 785547631664b2b7ac58f2809da6fef568482f25 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 16 Nov 2022 18:14:01 -0500 Subject: [PATCH 05/20] Add code format --- tests/plugins/test_zambeze.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/plugins/test_zambeze.py b/tests/plugins/test_zambeze.py index a3cd212a..cda834ea 100644 --- a/tests/plugins/test_zambeze.py +++ b/tests/plugins/test_zambeze.py @@ -27,7 +27,9 @@ def __init__(self, *args, **kwargs): ) ) self._channel = self._connection.channel() - self._channel.queue_declare(queue=self.interceptor.settings.queue_name) + self._channel.queue_declare( + queue=self.interceptor.settings.queue_name + ) threading.Thread(target=self.interceptor.observe, daemon=True).start() threading.Thread( target=consume_intercepted_messages, daemon=True From 1b48e44a0c5237bb19d536be266af46ffc614777 Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Wed, 16 Nov 2022 23:14:27 +0000 Subject: [PATCH 06/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index 29d7ddfb..21232dbc 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.70renan-mlflow-checks" +__version__ = "0.0.71renan-mlflow-checks" From 95fb66b878d2a213407dc01f79d56c3cb7b8cfcd Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 17 Nov 2022 07:53:21 -0500 Subject: [PATCH 07/20] checkpoint --- .github/workflows/code-formatting.yml | 3 +- .../workflows/create-release-n-publish.yml | 2 +- .github/workflows/run-unit-tests.yml | 2 +- .../flowceptor/plugins/abstract_flowceptor.py | 8 ++-- .../flowceptor/plugins/mlflow/mlflow_dao.py | 2 +- .../plugins/mlflow/mlflow_interceptor.py | 2 +- .../mlflow_interceptor_state_manager.py | 2 +- .../plugins/settings_data_classes.py | 35 --------------- .../plugins/settings_dataclasses.py | 45 +++++++++++++++++++ ...beze_message.py => zambeze_dataclasses.py} | 0 .../plugins/zambeze/zambeze_interceptor.py | 2 +- resources/settings.yaml | 5 ++- tests/plugins/test_mlflow.py | 2 +- tests/plugins/test_mlflow_db.py | 2 +- tests/plugins/test_zambeze.py | 4 +- 15 files changed, 63 insertions(+), 53 deletions(-) delete mode 100644 flowcept/flowceptor/plugins/settings_data_classes.py create mode 100644 flowcept/flowceptor/plugins/settings_dataclasses.py rename flowcept/flowceptor/plugins/zambeze/{zambeze_message.py => zambeze_dataclasses.py} (100%) diff --git a/.github/workflows/code-formatting.yml b/.github/workflows/code-formatting.yml index 5c119877..0310346b 100644 --- a/.github/workflows/code-formatting.yml +++ b/.github/workflows/code-formatting.yml @@ -1,5 +1,4 @@ name: Code Formatting - on: [push, pull_request] permissions: @@ -9,7 +8,7 @@ jobs: build: runs-on: ubuntu-latest - + if: "!contains(github.event.head_commit.message, 'CI Bot')" steps: - uses: actions/checkout@v3 - name: Set up Python 3.10 diff --git a/.github/workflows/create-release-n-publish.yml b/.github/workflows/create-release-n-publish.yml index e3afe727..3b7b0dcc 100644 --- a/.github/workflows/create-release-n-publish.yml +++ b/.github/workflows/create-release-n-publish.yml @@ -45,7 +45,7 @@ jobs: curl --data "{\"tag_name\": \"v${CURRENT_VERSION}\", \"target_commitish\": \"${TARGET}\", \"name\": \"v${CURRENT_VERSION}\", - \"body\": \"Release of version ${CURRENT_VERSION}. Run `pip install flowcept==${CURRENT_VERSION}` to install this version.\", + \"body\": \"Release of version ${CURRENT_VERSION}.\nRun `pip install flowcept==${CURRENT_VERSION}` to install this version.\", \"make_latest\": \"true\", \"draft\": false, \"prerelease\": false}" \ diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index e35786a3..c727c406 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -6,7 +6,7 @@ jobs: build: runs-on: ubuntu-latest - + if: "!contains(github.event.head_commit.message, 'CI Bot')" steps: - uses: actions/checkout@v3 - name: Set up Python 3.10 diff --git a/flowcept/flowceptor/plugins/abstract_flowceptor.py b/flowcept/flowceptor/plugins/abstract_flowceptor.py index a6eb7cc2..25bec671 100644 --- a/flowcept/flowceptor/plugins/abstract_flowceptor.py +++ b/flowcept/flowceptor/plugins/abstract_flowceptor.py @@ -12,9 +12,9 @@ REDIS_PORT, REDIS_CHANNEL, ) -from flowcept.flowceptor.plugins.settings_data_classes import ( +from flowcept.flowceptor.plugins.settings_dataclasses import ( ZambezeSettings, - KeyValuesToFilter, + KeyValue, MLFlowSettings, ) @@ -26,7 +26,7 @@ def __init__(self, plugin_key): @staticmethod def __get_settings(plugin_key): - # TODO: use factory pattern + # TODO: use the factory pattern with open(SETTINGS_PATH) as f: data = yaml.load(f, Loader=yaml.FullLoader) settings = data[Vocabulary.Settings.PLUGINS][plugin_key] @@ -36,7 +36,7 @@ def __get_settings(plugin_key): ): settings_obj: ZambezeSettings = ZambezeSettings(**settings) settings_obj.key_values_to_filter = [ - KeyValuesToFilter(**item) + KeyValue(**item) for item in settings_obj.key_values_to_filter ] return settings_obj diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py b/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py index 74c2b19c..1ae68959 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py @@ -1,6 +1,6 @@ from sqlalchemy import create_engine from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run -from flowcept.flowceptor.plugins.settings_data_classes import ( +from flowcept.flowceptor.plugins.settings_dataclasses import ( MLFlowSettings, ) diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py index f5d23ccf..2e8670bb 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py @@ -14,7 +14,7 @@ class MLFlowInterceptor(AbstractFlowceptor): - def __init__(self, plugin_key): + def __init__(self, plugin_key="mlflow"): super().__init__(plugin_key) self.dao = MLFlowDAO(self.settings) diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py index 1529577c..fec2444b 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py @@ -1,5 +1,5 @@ from redis import Redis -from flowcept.flowceptor.plugins.settings_data_classes import ( +from flowcept.flowceptor.plugins.settings_dataclasses import ( MLFlowSettings, ) diff --git a/flowcept/flowceptor/plugins/settings_data_classes.py b/flowcept/flowceptor/plugins/settings_data_classes.py deleted file mode 100644 index 4f63c50b..00000000 --- a/flowcept/flowceptor/plugins/settings_data_classes.py +++ /dev/null @@ -1,35 +0,0 @@ -from dataclasses import dataclass -from typing import Any, List - - -@dataclass -class KeyValuesToFilter: - key: str - value: Any - - -@dataclass -class ZambezeSettings: - - host: str - port: int - queue_name: str - key_values_to_filter: List[KeyValuesToFilter] - keys_to_intercept: List[str] - kind: str = "zambeze" - observer_type: str = "message_broker" - observer_subtype: str = "rabbit_mq" - - -@dataclass -class MLFlowSettings: - - file_path: str - log_params: List[str] - log_metrics: List[str] - watch_interval_sec: int - redis_port: int - kind: str = "mlflow" - observer_type: str = "db" - observer_subtype: str = "sqlite" - redis_host: str = "mlflow_interceptor_redis" diff --git a/flowcept/flowceptor/plugins/settings_dataclasses.py b/flowcept/flowceptor/plugins/settings_dataclasses.py new file mode 100644 index 00000000..aa5c218d --- /dev/null +++ b/flowcept/flowceptor/plugins/settings_dataclasses.py @@ -0,0 +1,45 @@ +import abc +from dataclasses import dataclass +from typing import Any, List + + +@dataclass +class KeyValue: + key: str + value: Any + + +@dataclass +class AbstractSettings(abc.ABC): + + kind: str + observer_type: str + observer_subtype: str + + +@dataclass +class ZambezeSettings(AbstractSettings): + + host: str + port: int + queue_name: str + key_values_to_filter: List[KeyValue] + keys_to_intercept: List[str] + kind = "zambeze" + observer_type = "message_broker" + observer_subtype = "rabbit_mq" + + +@dataclass +class MLFlowSettings(AbstractSettings): + + file_path: str + log_params: List[str] + log_metrics: List[str] + watch_interval_sec: int + redis_port: int + redis_host: str + kind = "mlflow" + observer_type = "db" + observer_subtype = "sqlite" + diff --git a/flowcept/flowceptor/plugins/zambeze/zambeze_message.py b/flowcept/flowceptor/plugins/zambeze/zambeze_dataclasses.py similarity index 100% rename from flowcept/flowceptor/plugins/zambeze/zambeze_message.py rename to flowcept/flowceptor/plugins/zambeze/zambeze_dataclasses.py diff --git a/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py b/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py index 7de31f9d..b06e8e21 100644 --- a/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py +++ b/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py @@ -8,7 +8,7 @@ class ZambezeInterceptor(AbstractFlowceptor): - def __init__(self, plugin_key): + def __init__(self, plugin_key="zambeze"): super().__init__(plugin_key) def intercept(self, message: dict): diff --git a/resources/settings.yaml b/resources/settings.yaml index 3b866069..05e2bceb 100644 --- a/resources/settings.yaml +++ b/resources/settings.yaml @@ -1,5 +1,6 @@ plugins: - mlflow1: + # For each key below, you can have multiple instances. Like mlflow1, mlflow2; zambeze1, zambeze2. + mlflow: kind: mlflow observer_type: db observer_subtype: sqlite @@ -10,7 +11,7 @@ plugins: redis_port: 60379 watch_interval_sec: 2 - zambeze1: + zambeze: kind: zambeze observer_type: message_broker observer_subtype: rabbit_mq diff --git a/tests/plugins/test_mlflow.py b/tests/plugins/test_mlflow.py index e18bc759..50555f15 100644 --- a/tests/plugins/test_mlflow.py +++ b/tests/plugins/test_mlflow.py @@ -13,7 +13,7 @@ class TestMLFlow(unittest.TestCase): def __init__(self, *args, **kwargs): super(TestMLFlow, self).__init__(*args, **kwargs) - self.interceptor = MLFlowInterceptor("mlflow1") + self.interceptor = MLFlowInterceptor() threading.Thread(target=self.interceptor.observe, daemon=True).start() threading.Thread( diff --git a/tests/plugins/test_mlflow_db.py b/tests/plugins/test_mlflow_db.py index 26217abd..e1a002f0 100644 --- a/tests/plugins/test_mlflow_db.py +++ b/tests/plugins/test_mlflow_db.py @@ -14,7 +14,7 @@ class MLFlowDB(unittest.TestCase): def __init__(self, *args, **kwargs): super(MLFlowDB, self).__init__(*args, **kwargs) - interceptor = MLFlowInterceptor("mlflow1") + interceptor = MLFlowInterceptor() self.dao = MLFlowDAO(interceptor.settings) self.mlflow_state = MLFlowInterceptorStateManager(interceptor.settings) self.mlflow_state.clear_set() diff --git a/tests/plugins/test_zambeze.py b/tests/plugins/test_zambeze.py index cda834ea..80ed4e88 100644 --- a/tests/plugins/test_zambeze.py +++ b/tests/plugins/test_zambeze.py @@ -10,7 +10,7 @@ from flowcept.flowceptor.plugins.zambeze.zambeze_interceptor import ( ZambezeInterceptor, ) -from flowcept.flowceptor.plugins.zambeze.zambeze_message import ( +from flowcept.flowceptor.plugins.zambeze.zambeze_dataclasses import ( ZambezeMessage, ) @@ -18,7 +18,7 @@ class TestZambeze(unittest.TestCase): def __init__(self, *args, **kwargs): super(TestZambeze, self).__init__(*args, **kwargs) - self.interceptor = ZambezeInterceptor("zambeze1") + self.interceptor = ZambezeInterceptor() self._connection = pika.BlockingConnection( pika.ConnectionParameters( From 81f5500d97a2843a5c254ce427a55aa106406d63 Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Thu, 17 Nov 2022 12:53:38 +0000 Subject: [PATCH 08/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index 21232dbc..ffb10f49 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.71renan-mlflow-checks" +__version__ = "0.0.72renan-mlflow-checks" From 03007aa09335545717428b7f8f156c46dbe22797 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 17 Nov 2022 09:08:56 -0500 Subject: [PATCH 09/20] Making local state manager more generic --- .github/workflows/code-formatting.yml | 4 +-- bin/clean_directory.sh | 12 ++++++++ .../flowceptor/plugins/abstract_flowceptor.py | 13 ++++---- .../abstract_interceptor_state_manager.py | 30 +++++++++++++++++++ .../mlflow_interceptor_state_manager.py | 28 +++-------------- .../plugins/settings_dataclasses.py | 1 - tests/plugins/test_mlflow_db.py | 17 ++++++----- tests/plugins/test_zambeze.py | 3 +- 8 files changed, 65 insertions(+), 43 deletions(-) create mode 100644 bin/clean_directory.sh create mode 100644 flowcept/flowceptor/plugins/abstract_interceptor_state_manager.py diff --git a/.github/workflows/code-formatting.yml b/.github/workflows/code-formatting.yml index 0310346b..e5ff531b 100644 --- a/.github/workflows/code-formatting.yml +++ b/.github/workflows/code-formatting.yml @@ -23,5 +23,5 @@ jobs: run: black --check . - name: Run flake8 checks run: | - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - flake8 . --count --max-complexity=10 --max-line-length=79 --statistics + flake8 . --count --show-source --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --show-source --max-complexity=10 --max-line-length=79 --statistics diff --git a/bin/clean_directory.sh b/bin/clean_directory.sh new file mode 100644 index 00000000..a88758b0 --- /dev/null +++ b/bin/clean_directory.sh @@ -0,0 +1,12 @@ +# Make sure you run this from the directory root +rm -rf .pytest_cache \ + .build \ + .dist \ + build \ + dist \ + *egg* \ + mlruns \ + mlflow.db + +rm -rf **/*mlruns* +rm -rf */*/*mlruns* diff --git a/flowcept/flowceptor/plugins/abstract_flowceptor.py b/flowcept/flowceptor/plugins/abstract_flowceptor.py index 25bec671..1cc10be5 100644 --- a/flowcept/flowceptor/plugins/abstract_flowceptor.py +++ b/flowcept/flowceptor/plugins/abstract_flowceptor.py @@ -2,7 +2,7 @@ import os import json import yaml -import redis +from redis import Redis from flowcept.commons.vocabulary import Vocabulary from flowcept.configs import ( @@ -16,16 +16,18 @@ ZambezeSettings, KeyValue, MLFlowSettings, + AbstractSettings, ) class AbstractFlowceptor(object, metaclass=ABCMeta): def __init__(self, plugin_key): - self.settings = AbstractFlowceptor.__get_settings(plugin_key) - self._redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0) + self.plugin_key = plugin_key + self.settings = AbstractFlowceptor.get_settings(self.plugin_key) + self._redis = Redis(host=REDIS_HOST, port=REDIS_PORT, db=0) @staticmethod - def __get_settings(plugin_key): + def get_settings(plugin_key: str) -> AbstractSettings: # TODO: use the factory pattern with open(SETTINGS_PATH) as f: data = yaml.load(f, Loader=yaml.FullLoader) @@ -36,8 +38,7 @@ def __get_settings(plugin_key): ): settings_obj: ZambezeSettings = ZambezeSettings(**settings) settings_obj.key_values_to_filter = [ - KeyValue(**item) - for item in settings_obj.key_values_to_filter + KeyValue(**item) for item in settings_obj.key_values_to_filter ] return settings_obj elif ( diff --git a/flowcept/flowceptor/plugins/abstract_interceptor_state_manager.py b/flowcept/flowceptor/plugins/abstract_interceptor_state_manager.py new file mode 100644 index 00000000..9e2fc843 --- /dev/null +++ b/flowcept/flowceptor/plugins/abstract_interceptor_state_manager.py @@ -0,0 +1,30 @@ +from abc import ABCMeta +from redis import Redis + +from flowcept.flowceptor.plugins.abstract_flowceptor import AbstractFlowceptor + + +class AbstractInterceptorStateManager(object, metaclass=ABCMeta): + def __init__(self, plugin_key: str): + self.set_name = plugin_key + settings = AbstractFlowceptor.get_settings(plugin_key) + if not hasattr(settings, "redis_host"): + raise Exception( + f"This plugin setting {plugin_key} " + f"does not have a Redis Host." + ) + + self._db = Redis( + host=settings.redis_host, + port=settings.redis_port, + db=0, + ) + + def clear_set(self): + self._db.delete(self.set_name) + + def add_element_id(self, element_id: str): + self._db.sadd(self.set_name, element_id) + + def has_element_id(self, element_id) -> bool: + return self._db.sismember(self.set_name, element_id) diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py index fec2444b..37e8af5a 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py @@ -1,27 +1,7 @@ -from redis import Redis -from flowcept.flowceptor.plugins.settings_dataclasses import ( - MLFlowSettings, +from flowcept.flowceptor.plugins.abstract_interceptor_state_manager import ( + AbstractInterceptorStateManager, ) -class MLFlowInterceptorStateManager: - - _SET_NAME = "runs" - - def __init__(self, mlflow_settings: MLFlowSettings): - self._db = Redis( - host=mlflow_settings.redis_host, - port=mlflow_settings.redis_port, - db=0, - ) - - def clear_set(self): - self._db.delete(MLFlowInterceptorStateManager._SET_NAME) - - def add_run(self, run_id: str): - self._db.sadd(MLFlowInterceptorStateManager._SET_NAME, run_id) - - def has_run(self, run_id): - return self._db.sismember( - MLFlowInterceptorStateManager._SET_NAME, run_id - ) +class MLFlowInterceptorStateManager(AbstractInterceptorStateManager): + pass diff --git a/flowcept/flowceptor/plugins/settings_dataclasses.py b/flowcept/flowceptor/plugins/settings_dataclasses.py index aa5c218d..6bae7773 100644 --- a/flowcept/flowceptor/plugins/settings_dataclasses.py +++ b/flowcept/flowceptor/plugins/settings_dataclasses.py @@ -42,4 +42,3 @@ class MLFlowSettings(AbstractSettings): kind = "mlflow" observer_type = "db" observer_subtype = "sqlite" - diff --git a/tests/plugins/test_mlflow_db.py b/tests/plugins/test_mlflow_db.py index e1a002f0..00470af2 100644 --- a/tests/plugins/test_mlflow_db.py +++ b/tests/plugins/test_mlflow_db.py @@ -3,7 +3,6 @@ MLFlowInterceptor, ) -from flowcept.flowceptor.plugins.mlflow.mlflow_dao import MLFlowDAO from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run # fmt: off @@ -15,23 +14,25 @@ class MLFlowDB(unittest.TestCase): def __init__(self, *args, **kwargs): super(MLFlowDB, self).__init__(*args, **kwargs) interceptor = MLFlowInterceptor() - self.dao = MLFlowDAO(interceptor.settings) - self.mlflow_state = MLFlowInterceptorStateManager(interceptor.settings) + self.dao = interceptor.dao + self.mlflow_state = MLFlowInterceptorStateManager( + interceptor.plugin_key + ) self.mlflow_state.clear_set() def test_check_db(self): - self.mlflow_state.add_run("f783309ac32f473b94fa48aa6d484306") - self.mlflow_state.add_run("b885f7b3f05e4afe8f008a146fa09ec6") - self.mlflow_state.add_run("22f19b78539b464fbc3a83f79c670e7f") + self.mlflow_state.add_element_id("f783309ac32f473b94fa48aa6d484306") + self.mlflow_state.add_element_id("b885f7b3f05e4afe8f008a146fa09ec6") + self.mlflow_state.add_element_id("22f19b78539b464fbc3a83f79c670e7f") runs = self.dao.get_runs() # Check if the last runs have been checked. for run_tuple in runs: run = Run(**run_tuple) - if not self.mlflow_state.has_run(run.run_uuid): + if not self.mlflow_state.has_element_id(run.run_uuid): print("We need to intercept this! " + run.run_uuid) - self.mlflow_state.add_run(run.run_uuid) + self.mlflow_state.add_element_id(run.run_uuid) print() diff --git a/tests/plugins/test_zambeze.py b/tests/plugins/test_zambeze.py index 80ed4e88..289a5728 100644 --- a/tests/plugins/test_zambeze.py +++ b/tests/plugins/test_zambeze.py @@ -6,12 +6,11 @@ from flowcept.flowcept_consumer.consumer import ( consume_intercepted_messages, ) - from flowcept.flowceptor.plugins.zambeze.zambeze_interceptor import ( ZambezeInterceptor, ) from flowcept.flowceptor.plugins.zambeze.zambeze_dataclasses import ( - ZambezeMessage, + ZambezeMessage ) From 3177f1e26d5796150aeac8d81a39670151f11a85 Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Thu, 17 Nov 2022 14:09:15 +0000 Subject: [PATCH 10/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index ffb10f49..ae5c7705 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.72renan-mlflow-checks" +__version__ = "0.0.73renan-mlflow-checks" From ee4d21e4e0d3ed515dfeb5d578c1c721d57bb853 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 17 Nov 2022 09:10:15 -0500 Subject: [PATCH 11/20] Code formatting --- tests/plugins/test_zambeze.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/plugins/test_zambeze.py b/tests/plugins/test_zambeze.py index 289a5728..9769d5a1 100644 --- a/tests/plugins/test_zambeze.py +++ b/tests/plugins/test_zambeze.py @@ -10,7 +10,7 @@ ZambezeInterceptor, ) from flowcept.flowceptor.plugins.zambeze.zambeze_dataclasses import ( - ZambezeMessage + ZambezeMessage, ) From 7a928a11fb9e6135654a7f26a1e5e6056f44306e Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Thu, 17 Nov 2022 14:10:33 +0000 Subject: [PATCH 12/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index ae5c7705..717c8582 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.73renan-mlflow-checks" +__version__ = "0.0.74renan-mlflow-checks" From 1d29377ceb09bee97924d3ddbd6f84e4574716b2 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 17 Nov 2022 09:24:05 -0500 Subject: [PATCH 13/20] adding -d in docker compose gh actions --- .github/workflows/run-unit-tests.yml | 2 +- .../plugins/mlflow/mlflow_interceptor_state_manager.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index c727c406..6d2bda47 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -28,7 +28,7 @@ jobs: done; fi - name: Run Docker Compose - run: docker compose -f deployment/compose.yml up + run: docker compose -f deployment/compose.yml up -d - name: Test with pytest run: | pytest diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py index 37e8af5a..a1262143 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py @@ -4,4 +4,7 @@ class MLFlowInterceptorStateManager(AbstractInterceptorStateManager): + # TODO: check if we really need this class. + # Only reason we would need this is if we want to have specific methods + # for this class. pass From bfe4d1eb3a22f9155e8655d14818bb9fdd726553 Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Thu, 17 Nov 2022 14:24:26 +0000 Subject: [PATCH 14/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index 717c8582..0a045de0 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.74renan-mlflow-checks" +__version__ = "0.0.75renan-mlflow-checks" From d94bbdc79e2912e3df53fe73da2ebfbdcbefecbc Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 17 Nov 2022 09:45:22 -0500 Subject: [PATCH 15/20] Refactoring state manager --- .../flowceptor/plugins/abstract_flowceptor.py | 5 ++- ...anager.py => interceptor_state_manager.py} | 13 +++--- .../plugins/mlflow/mlflow_interceptor.py | 13 ++++-- .../mlflow_interceptor_state_manager.py | 6 +-- .../plugins/settings_dataclasses.py | 1 + flowcept/flowceptor/plugins/utils.py | 0 tests/plugins/test_mlflow_db.py | 41 ------------------- tests/plugins/test_mlflow_state.py | 33 +++++++++++++++ 8 files changed, 56 insertions(+), 56 deletions(-) rename flowcept/flowceptor/plugins/{abstract_interceptor_state_manager.py => interceptor_state_manager.py} (61%) delete mode 100644 flowcept/flowceptor/plugins/utils.py delete mode 100644 tests/plugins/test_mlflow_db.py create mode 100644 tests/plugins/test_mlflow_state.py diff --git a/flowcept/flowceptor/plugins/abstract_flowceptor.py b/flowcept/flowceptor/plugins/abstract_flowceptor.py index 1cc10be5..046067c4 100644 --- a/flowcept/flowceptor/plugins/abstract_flowceptor.py +++ b/flowcept/flowceptor/plugins/abstract_flowceptor.py @@ -32,6 +32,8 @@ def get_settings(plugin_key: str) -> AbstractSettings: with open(SETTINGS_PATH) as f: data = yaml.load(f, Loader=yaml.FullLoader) settings = data[Vocabulary.Settings.PLUGINS][plugin_key] + settings["key"] = plugin_key + settings_obj: AbstractSettings = None if ( settings[Vocabulary.Settings.KIND] == Vocabulary.Settings.ZAMBEZE_KIND @@ -40,7 +42,6 @@ def get_settings(plugin_key: str) -> AbstractSettings: settings_obj.key_values_to_filter = [ KeyValue(**item) for item in settings_obj.key_values_to_filter ] - return settings_obj elif ( settings[Vocabulary.Settings.KIND] == Vocabulary.Settings.MLFLOW_KIND @@ -50,7 +51,7 @@ def get_settings(plugin_key: str) -> AbstractSettings: settings_obj.file_path = os.path.join( PROJECT_DIR_PATH, settings_obj.file_path ) - return settings_obj + return settings_obj @abstractmethod def intercept(self, message: dict): diff --git a/flowcept/flowceptor/plugins/abstract_interceptor_state_manager.py b/flowcept/flowceptor/plugins/interceptor_state_manager.py similarity index 61% rename from flowcept/flowceptor/plugins/abstract_interceptor_state_manager.py rename to flowcept/flowceptor/plugins/interceptor_state_manager.py index 9e2fc843..f02aad12 100644 --- a/flowcept/flowceptor/plugins/abstract_interceptor_state_manager.py +++ b/flowcept/flowceptor/plugins/interceptor_state_manager.py @@ -1,16 +1,15 @@ -from abc import ABCMeta from redis import Redis -from flowcept.flowceptor.plugins.abstract_flowceptor import AbstractFlowceptor +from flowcept.flowceptor.plugins.settings_dataclasses import AbstractSettings -class AbstractInterceptorStateManager(object, metaclass=ABCMeta): - def __init__(self, plugin_key: str): - self.set_name = plugin_key - settings = AbstractFlowceptor.get_settings(plugin_key) +class InterceptorStateManager(object): + def __init__(self, settings: AbstractSettings): + self.set_name = settings.key + if not hasattr(settings, "redis_host"): raise Exception( - f"This plugin setting {plugin_key} " + f"This plugin setting {settings.key} " f"does not have a Redis Host." ) diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py index 2e8670bb..64de3c3f 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py @@ -5,6 +5,10 @@ from flowcept.flowceptor.plugins.abstract_flowceptor import ( AbstractFlowceptor, ) +from flowcept.flowceptor.plugins.interceptor_state_manager import ( + InterceptorStateManager, +) + from flowcept.flowceptor.plugins.mlflow.mlflow_dao import MLFlowDAO from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run @@ -16,6 +20,7 @@ class MLFlowInterceptor(AbstractFlowceptor): def __init__(self, plugin_key="mlflow"): super().__init__(plugin_key) + self.state_manager = InterceptorStateManager(self.settings) self.dao = MLFlowDAO(self.settings) def intercept(self, message: dict): @@ -28,10 +33,12 @@ def callback(self): let it go.... """ - runs = self.dao.get_runs() + run_tuples = self.dao.get_runs() - for run in runs: - Run(**run) + for run_tuple in run_tuples: + run = Run(**run_tuple) + print(run) + # if run.run_uuid in : # TODO get latest info self.intercept({"nothing": "yet"}) diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py index a1262143..2fd5a3ce 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor_state_manager.py @@ -1,9 +1,9 @@ -from flowcept.flowceptor.plugins.abstract_interceptor_state_manager import ( - AbstractInterceptorStateManager, +from flowcept.flowceptor.plugins.interceptor_state_manager import ( + InterceptorStateManager, ) -class MLFlowInterceptorStateManager(AbstractInterceptorStateManager): +class MLFlowInterceptorStateManager(InterceptorStateManager): # TODO: check if we really need this class. # Only reason we would need this is if we want to have specific methods # for this class. diff --git a/flowcept/flowceptor/plugins/settings_dataclasses.py b/flowcept/flowceptor/plugins/settings_dataclasses.py index 6bae7773..b59022c5 100644 --- a/flowcept/flowceptor/plugins/settings_dataclasses.py +++ b/flowcept/flowceptor/plugins/settings_dataclasses.py @@ -12,6 +12,7 @@ class KeyValue: @dataclass class AbstractSettings(abc.ABC): + key: str kind: str observer_type: str observer_subtype: str diff --git a/flowcept/flowceptor/plugins/utils.py b/flowcept/flowceptor/plugins/utils.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/plugins/test_mlflow_db.py b/tests/plugins/test_mlflow_db.py deleted file mode 100644 index 00470af2..00000000 --- a/tests/plugins/test_mlflow_db.py +++ /dev/null @@ -1,41 +0,0 @@ -import unittest -from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor import ( - MLFlowInterceptor, -) - -from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run - -# fmt: off -from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor_state_manager \ - import MLFlowInterceptorStateManager - - -class MLFlowDB(unittest.TestCase): - def __init__(self, *args, **kwargs): - super(MLFlowDB, self).__init__(*args, **kwargs) - interceptor = MLFlowInterceptor() - self.dao = interceptor.dao - self.mlflow_state = MLFlowInterceptorStateManager( - interceptor.plugin_key - ) - self.mlflow_state.clear_set() - - def test_check_db(self): - self.mlflow_state.add_element_id("f783309ac32f473b94fa48aa6d484306") - self.mlflow_state.add_element_id("b885f7b3f05e4afe8f008a146fa09ec6") - self.mlflow_state.add_element_id("22f19b78539b464fbc3a83f79c670e7f") - - runs = self.dao.get_runs() - - # Check if the last runs have been checked. - for run_tuple in runs: - run = Run(**run_tuple) - if not self.mlflow_state.has_element_id(run.run_uuid): - print("We need to intercept this! " + run.run_uuid) - self.mlflow_state.add_element_id(run.run_uuid) - - print() - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/plugins/test_mlflow_state.py b/tests/plugins/test_mlflow_state.py new file mode 100644 index 00000000..52e84831 --- /dev/null +++ b/tests/plugins/test_mlflow_state.py @@ -0,0 +1,33 @@ +import unittest +from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor import ( + MLFlowInterceptor, +) + +from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run + + +class TestMLFlowState(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(TestMLFlowState, self).__init__(*args, **kwargs) + interceptor = MLFlowInterceptor() + self.dao = interceptor.dao + self.state_manager = interceptor.state_manager + self.state_manager.clear_set() + + def test_check_db(self): + self.state_manager.add_element_id("f783309ac32f473b94fa48aa6d484306") + self.state_manager.add_element_id("b885f7b3f05e4afe8f008a146fa09ec6") + self.state_manager.add_element_id("22f19b78539b464fbc3a83f79c670e7f") + + runs = self.dao.get_runs() + + # Check if the last runs have been checked. + for run_tuple in runs: + run = Run(**run_tuple) + if not self.state_manager.has_element_id(run.run_uuid): + print("We need to intercept this! " + run.run_uuid) + self.state_manager.add_element_id(run.run_uuid) + + +if __name__ == "__main__": + unittest.main() From ef5be1b766f3f7b7cb0e2746eb6f4ae575a58c56 Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Thu, 17 Nov 2022 14:45:38 +0000 Subject: [PATCH 16/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index 0a045de0..fe8e5898 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.75renan-mlflow-checks" +__version__ = "0.0.76renan-mlflow-checks" From f52d4fe7bc58aece29eff19ae951d112e4c2fc05 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 17 Nov 2022 16:20:58 -0500 Subject: [PATCH 17/20] MLFlow tests working for params and metrics --- .../flowceptor/plugins/abstract_flowceptor.py | 15 +++- .../plugins/interceptor_state_manager.py | 8 +- .../flowceptor/plugins/mlflow/mlflow_dao.py | 73 +++++++++++++++++-- .../plugins/mlflow/mlflow_dataclasses.py | 14 ++-- .../plugins/mlflow/mlflow_interceptor.py | 26 +++---- tests/plugins/test_mlflow.py | 50 +++++++++++-- tests/plugins/test_mlflow_state.py | 33 --------- 7 files changed, 143 insertions(+), 76 deletions(-) delete mode 100644 tests/plugins/test_mlflow_state.py diff --git a/flowcept/flowceptor/plugins/abstract_flowceptor.py b/flowcept/flowceptor/plugins/abstract_flowceptor.py index 046067c4..8c277906 100644 --- a/flowcept/flowceptor/plugins/abstract_flowceptor.py +++ b/flowcept/flowceptor/plugins/abstract_flowceptor.py @@ -22,8 +22,7 @@ class AbstractFlowceptor(object, metaclass=ABCMeta): def __init__(self, plugin_key): - self.plugin_key = plugin_key - self.settings = AbstractFlowceptor.get_settings(self.plugin_key) + self.settings = AbstractFlowceptor.get_settings(plugin_key) self._redis = Redis(host=REDIS_HOST, port=REDIS_PORT, db=0) @staticmethod @@ -55,6 +54,12 @@ def get_settings(plugin_key: str) -> AbstractSettings: @abstractmethod def intercept(self, message: dict): + """ + Method that intercepts the identified data + :param message: + :return: + """ + raise NotImplementedError() @abstractmethod @@ -63,9 +68,15 @@ def observe(self): @abstractmethod def callback(self, *args, **kwargs): + """ + Method that decides what do to when a change is identified. + If it's an interesting change, it calls self.intercept; otherwise, + let it go.... + """ raise NotImplementedError() def post_intercept(self, intercepted_message: dict): + intercepted_message["plugin_key"] = self.settings.key print( f"Going to send to Redis an intercepted message:" f"\n\t{json.dumps(intercepted_message)}" diff --git a/flowcept/flowceptor/plugins/interceptor_state_manager.py b/flowcept/flowceptor/plugins/interceptor_state_manager.py index f02aad12..22f7a93d 100644 --- a/flowcept/flowceptor/plugins/interceptor_state_manager.py +++ b/flowcept/flowceptor/plugins/interceptor_state_manager.py @@ -5,7 +5,7 @@ class InterceptorStateManager(object): def __init__(self, settings: AbstractSettings): - self.set_name = settings.key + self._set_name = settings.key if not hasattr(settings, "redis_host"): raise Exception( @@ -20,10 +20,10 @@ def __init__(self, settings: AbstractSettings): ) def clear_set(self): - self._db.delete(self.set_name) + self._db.delete(self._set_name) def add_element_id(self, element_id: str): - self._db.sadd(self.set_name, element_id) + self._db.sadd(self._set_name, element_id) def has_element_id(self, element_id) -> bool: - return self._db.sismember(self.set_name, element_id) + return self._db.sismember(self._set_name, element_id) diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py b/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py index 1ae68959..d318ecd9 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_dao.py @@ -1,5 +1,7 @@ -from sqlalchemy import create_engine -from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run +from typing import List +from sqlalchemy.engine import Row, create_engine +from textwrap import dedent +from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import RunData from flowcept.flowceptor.plugins.settings_dataclasses import ( MLFlowSettings, ) @@ -24,12 +26,69 @@ def _get_db_engine(sqlite_file): except Exception: raise Exception(f"Could not create DB engine with uri: {db_uri}") - def get_runs(self): - - sql = ( - f"SELECT {Run.fields} FROM" - f" runs ORDER BY end_time LIMIT {MLFlowDAO._LIMIT}" + def get_finished_run_uuids(self) -> List[Row]: + sql = dedent( + f""" + SELECT run_uuid + FROM + runs + WHERE + status = 'FINISHED' + ORDER BY end_time DESC + LIMIT {MLFlowDAO._LIMIT} + """ ) conn = self._engine.connect() results = conn.execute(sql).fetchall() return results + + def get_run_data(self, run_uuid: str) -> RunData: + # TODO: consider outer joins to get the run data even if there's + # no metric or param + sql = dedent( + f""" + SELECT r.run_uuid, r.start_time, r.end_time, r.status, + m.key as 'metric_key', m.value as 'metric_value', + p.key as 'parameter_key', p.value as 'parameter_value' + FROM + runs AS r, + metrics as m, + params as p + WHERE + r.run_uuid = m.run_uuid AND + m.run_uuid = p.run_uuid AND + r.run_uuid = '{run_uuid}' AND + r.status = 'FINISHED' + ORDER BY + end_time DESC, + metric_key, metric_value, + parameter_key, parameter_value + LIMIT 30 +""" + ) + conn = self._engine.connect() + result_set = conn.execute(sql).fetchall() + run_data_dict = {"metrics": {}, "parameters": {}} + for tuple_ in result_set: + tuple_dict = tuple_._asdict() + metric_key = tuple_dict.get("metric_key", None) + metric_value = tuple_dict.get("metric_value", None) + if metric_key and metric_value: + if not (metric_key in run_data_dict["metrics"]): + run_data_dict["metrics"][metric_key] = None + run_data_dict["metrics"][metric_key] = metric_value + + param_key = tuple_dict.get("parameter_key", None) + param_value = tuple_dict.get("parameter_value", None) + if param_key and param_value: + if not (param_key in run_data_dict["parameters"]): + run_data_dict["parameters"][param_key] = None + run_data_dict["parameters"][param_key] = param_value + + run_data_dict["run_uuid"] = tuple_dict["run_uuid"] + run_data_dict["start_time"] = tuple_dict["start_time"] + run_data_dict["end_time"] = tuple_dict["end_time"] + run_data_dict["status"] = tuple_dict["status"] + + run_data = RunData(**run_data_dict) + return run_data diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py b/flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py index 7fe2cd2c..ce8fb90e 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_dataclasses.py @@ -1,16 +1,12 @@ -from dataclasses import dataclass, fields +from dataclasses import dataclass @dataclass -class Run: +class RunData: run_uuid: str - name: str - user_id: str start_time: int end_time: int - - @classmethod - @property - def fields(cls): - return ", ".join([field.name for field in fields(cls)]) + metrics: dict + parameters: dict + status: str diff --git a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py index 64de3c3f..fa88148a 100644 --- a/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py +++ b/flowcept/flowceptor/plugins/mlflow/mlflow_interceptor.py @@ -9,9 +9,7 @@ InterceptorStateManager, ) - from flowcept.flowceptor.plugins.mlflow.mlflow_dao import MLFlowDAO -from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run from flowcept.flowceptor.plugins.mlflow.interception_event_handler import ( InterceptionEventHandler, ) @@ -28,20 +26,22 @@ def intercept(self, message: dict): def callback(self): """ - function that decides what do to when a change is identified. + This function is called whenever a change is identified in the data. + It decides what to do in the event of a change. If it's an interesting change, it calls self.intercept; otherwise, let it go.... """ + from time import sleep - run_tuples = self.dao.get_runs() - - for run_tuple in run_tuples: - run = Run(**run_tuple) - print(run) - # if run.run_uuid in : - - # TODO get latest info - self.intercept({"nothing": "yet"}) + sleep(5) + runs = self.dao.get_finished_run_uuids() + for run_uuid_tuple in runs: + run_uuid = run_uuid_tuple[0] + if not self.state_manager.has_element_id(run_uuid): + print(f"We need to intercept this Run: {run_uuid}") + run_data = self.dao.get_run_data(run_uuid) + self.state_manager.add_element_id(run_uuid) + self.intercept(run_data.__dict__) def observe(self): event_handler = InterceptionEventHandler( @@ -68,7 +68,7 @@ def observe(self): if __name__ == "__main__": try: - interceptor = MLFlowInterceptor("mlflow1") + interceptor = MLFlowInterceptor() interceptor.observe() while True: time.sleep(interceptor.settings.watch_interval_sec) diff --git a/tests/plugins/test_mlflow.py b/tests/plugins/test_mlflow.py index 50555f15..50870abf 100644 --- a/tests/plugins/test_mlflow.py +++ b/tests/plugins/test_mlflow.py @@ -15,13 +15,7 @@ def __init__(self, *args, **kwargs): super(TestMLFlow, self).__init__(*args, **kwargs) self.interceptor = MLFlowInterceptor() - threading.Thread(target=self.interceptor.observe, daemon=True).start() - threading.Thread( - target=consume_intercepted_messages, daemon=True - ).start() - time.sleep(3) - - def test_mlflow(self): + def test_pure_run_mlflow(self): import uuid import mlflow @@ -34,13 +28,53 @@ def test_mlflow(self): experiment_id = mlflow.create_experiment( experiment_name + str(uuid.uuid4()) ) - with mlflow.start_run(experiment_id=experiment_id): + with mlflow.start_run(experiment_id=experiment_id) as run: mlflow.log_params({"number_epochs": 10}) + mlflow.log_params({"batch_size": 64}) + print("\nTrained model") mlflow.log_metric("loss", 0.04) + return run.info.run_uuid + + def test_get_runs(self): + runs = self.interceptor.dao.get_finished_run_uuids() + assert len(runs) > 0 + for run in runs: + assert type(run[0]) == str + print(run[0]) + + def test_get_run_data(self): + run_uuid = self.test_pure_run_mlflow() + run_data = self.interceptor.dao.get_run_data(run_uuid) + assert run_data.run_uuid == run_uuid + + def test_check_state_manager(self): + self.interceptor.state_manager.clear_set() + self.interceptor.state_manager.add_element_id("dummy-value") + self.test_pure_run_mlflow() + runs = self.interceptor.dao.get_finished_run_uuids() + assert len(runs) > 0 + for run_tuple in runs: + run_uuid = run_tuple[0] + assert type(run_uuid) == str + if not self.interceptor.state_manager.has_element_id(run_uuid): + print(f"We need to intercept {run_uuid}") + self.interceptor.state_manager.add_element_id(run_uuid) + + def _init_mlflow_consumption(self): + threading.Thread(target=self.interceptor.observe, daemon=True).start() + threading.Thread( + target=consume_intercepted_messages, daemon=True + ).start() time.sleep(3) + def test_mlflow_observer_and_consumption(self): + self._init_mlflow_consumption() + run_uuid = self.test_pure_run_mlflow() + time.sleep(5) + assert self.interceptor.state_manager.has_element_id(run_uuid) is True + if __name__ == "__main__": unittest.main() diff --git a/tests/plugins/test_mlflow_state.py b/tests/plugins/test_mlflow_state.py deleted file mode 100644 index 52e84831..00000000 --- a/tests/plugins/test_mlflow_state.py +++ /dev/null @@ -1,33 +0,0 @@ -import unittest -from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor import ( - MLFlowInterceptor, -) - -from flowcept.flowceptor.plugins.mlflow.mlflow_dataclasses import Run - - -class TestMLFlowState(unittest.TestCase): - def __init__(self, *args, **kwargs): - super(TestMLFlowState, self).__init__(*args, **kwargs) - interceptor = MLFlowInterceptor() - self.dao = interceptor.dao - self.state_manager = interceptor.state_manager - self.state_manager.clear_set() - - def test_check_db(self): - self.state_manager.add_element_id("f783309ac32f473b94fa48aa6d484306") - self.state_manager.add_element_id("b885f7b3f05e4afe8f008a146fa09ec6") - self.state_manager.add_element_id("22f19b78539b464fbc3a83f79c670e7f") - - runs = self.dao.get_runs() - - # Check if the last runs have been checked. - for run_tuple in runs: - run = Run(**run_tuple) - if not self.state_manager.has_element_id(run.run_uuid): - print("We need to intercept this! " + run.run_uuid) - self.state_manager.add_element_id(run.run_uuid) - - -if __name__ == "__main__": - unittest.main() From 6495cdd71daa7ece71fbab468840a22ca5f09975 Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Thu, 17 Nov 2022 21:21:16 +0000 Subject: [PATCH 18/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index fe8e5898..76418964 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.76renan-mlflow-checks" +__version__ = "0.0.77renan-mlflow-checks" From 5c8eccefbc5c4c2e6174586105105d006a90972b Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Fri, 18 Nov 2022 13:20:24 +0000 Subject: [PATCH 19/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index 76418964..b4d0e866 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.77renan-mlflow-checks" +__version__ = "0.0.78dev" From 367179d395aa28a3bda80ec62aff88431b98e42c Mon Sep 17 00:00:00 2001 From: Flowcept CI Bot Date: Fri, 18 Nov 2022 13:22:22 +0000 Subject: [PATCH 20/20] Flowcept CI Bot: bumping version --- flowcept/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/version.py b/flowcept/version.py index b4d0e866..6b420c4d 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.0.78dev" +__version__ = "0.0.79dev"