From a104a584a3d98646d1688e6230eddb8bcb2b09a1 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 23 Sep 2024 15:51:08 -0400 Subject: [PATCH 1/6] Refactor to simplify Flowcept API --- .github/workflows/run-tests-kafka.yml | 5 +- README.md | 29 +++-------- flowcept/__init__.py | 5 +- ...consumer_api.py => flowcept_controller.py} | 51 +++++++++++++++---- .../decorators/flowcept_task.py | 4 ++ .../decorators/responsible_ai.py | 4 +- flowcept/main.py | 4 +- flowcept/version.py | 2 +- tests/adapters/dask_test_utils.py | 6 ++- tests/adapters/test_dask.py | 9 ++-- tests/adapters/test_dask_with_context_mgmt.py | 4 +- tests/adapters/test_mlflow.py | 4 +- tests/adapters/test_tensorboard.py | 4 +- tests/adapters/test_zambeze.py | 4 +- tests/api/dbapi_test.py | 50 +++++++++--------- tests/api/flowcept_api_test.py | 44 ++++++++-------- .../flowcept_task_decorator_test.py | 9 ++-- tests/decorator_tests/ml_tests/dl_trainer.py | 5 +- .../ml_tests/llm_tests/llm_trainer.py | 5 +- 19 files changed, 134 insertions(+), 114 deletions(-) rename flowcept/flowcept_api/{consumer_api.py => flowcept_controller.py} (73%) diff --git a/.github/workflows/run-tests-kafka.yml b/.github/workflows/run-tests-kafka.yml index 33243e05..f2ec9b0b 100644 --- a/.github/workflows/run-tests-kafka.yml +++ b/.github/workflows/run-tests-kafka.yml @@ -1,5 +1,8 @@ name: All tests on Kafka MQ -on: [push] +on: + pull_request: + branches: [ "dev", "main" ] + types: [opened, synchronize, reopened] # branches: [ "disabled" ] jobs: diff --git a/README.md b/README.md index 8ad1196c..bf43e0b7 100644 --- a/README.md +++ b/README.md @@ -56,38 +56,25 @@ You may need to set the environment variable `FLOWCEPT_SETTINGS_PATH` with the a In addition to existing adapters to Dask, MLFlow, and others (it's extensible for any system that generates data), FlowCept also offers instrumentation via @decorators. ```python -from uuid import uuid4 - -from flowcept import ( - FlowceptConsumerAPI, - WorkflowObject, - DBAPI, - flowcept_task, - INSTRUMENTATION -) - +from flowcept import Flowcept, flowcept_task @flowcept_task -def sum_one(n, workflow_id=None): +def sum_one(n): return n + 1 @flowcept_task -def mult_two(n, workflow_id=None): +def mult_two(n): return n * 2 -db = DBAPI() -wf_id = str(uuid4()) -with FlowceptConsumerAPI(INSTRUMENTATION): - # The next line is optional - db.insert_or_update_workflow(WorkflowObject(workflow_id=wf_id)) +with Flowcept(workflow_name='test_workflow'): n = 3 - o1 = sum_one(n, workflow_id=wf_id) - o2 = mult_two(o1, workflow_id=wf_id) - -print(db.query(filter={"workflow_id": wf_id})) + o1 = sum_one(n) + o2 = mult_two(o1) + print(o2) +print(Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id})) ``` diff --git a/flowcept/__init__.py b/flowcept/__init__.py index d4da399e..4438052a 100644 --- a/flowcept/__init__.py +++ b/flowcept/__init__.py @@ -7,13 +7,10 @@ from flowcept.commons.vocabulary import Vocabulary -from flowcept.flowcept_api.consumer_api import FlowceptConsumerAPI +from flowcept.flowcept_api.flowcept_controller import Flowcept from flowcept.flowcept_api.task_query_api import TaskQueryAPI -from flowcept.flowcept_api.db_api import DBAPI from flowcept.instrumentation.decorators.flowcept_task import flowcept_task -INSTRUMENTATION = FlowceptConsumerAPI.INSTRUMENTATION - from flowcept.commons.flowcept_dataclasses.workflow_object import ( WorkflowObject, ) diff --git a/flowcept/flowcept_api/consumer_api.py b/flowcept/flowcept_api/flowcept_controller.py similarity index 73% rename from flowcept/flowcept_api/consumer_api.py rename to flowcept/flowcept_api/flowcept_controller.py index 5263446d..cb9aee96 100644 --- a/flowcept/flowcept_api/consumer_api.py +++ b/flowcept/flowcept_api/flowcept_controller.py @@ -1,19 +1,25 @@ from typing import List, Union from time import sleep +from flowcept.commons.flowcept_dataclasses.workflow_object import ( + WorkflowObject, +) + import flowcept.instrumentation.decorators from flowcept.commons import logger from flowcept.commons.daos.document_db_dao import DocumentDBDao from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao from flowcept.configs import MQ_INSTANCES +from flowcept.flowcept_api.db_api import DBAPI from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor -# TODO: :code-reorg: This may not be considered an API anymore as it's doing critical things for the good functioning of the system. -class FlowceptConsumerAPI(object): - INSTRUMENTATION = "instrumentation" +class Flowcept(object): + db = DBAPI() + + current_workflow_id = None def __init__( self, @@ -22,7 +28,23 @@ def __init__( ] = None, bundle_exec_id=None, start_doc_inserter=True, + workflow_id: str = None, + workflow_name: str = None, + workflow_args: str = None, ): + """ + Flowcept controller. + + This class controls the interceptors, including instrumentation. + If using for instrumentation, we assume one instance of this class + per workflow is being utilized. + + Parameters + ---------- + interceptors - list of Flowcept interceptors. If none, instrumentation will be used. If a string is passed, no interceptor will be started. # TODO: improve clarity for the documentation. + bundle_exec_id - A way to group interceptors. + start_doc_inserter - Whether you want to start consuming MQ messages to inject in the DB. + """ self.logger = FlowceptLogger() self._document_inserters: List[DocumentInserter] = [] @@ -31,13 +53,24 @@ def __init__( self._bundle_exec_id = id(self) else: self._bundle_exec_id = bundle_exec_id - if interceptors == FlowceptConsumerAPI.INSTRUMENTATION: - interceptors = ( - flowcept.instrumentation.decorators.instrumentation_interceptor + if isinstance(interceptors, str): + self._interceptors = None + else: + if interceptors is None: + interceptors = [ + flowcept.instrumentation.decorators.instrumentation_interceptor + ] + elif not isinstance(interceptors, list): + interceptors = [interceptors] + self._interceptors: List[BaseInterceptor] = interceptors + + if workflow_id or workflow_args or workflow_name: + wf_obj = WorkflowObject( + workflow_id, workflow_name, used=workflow_args ) - if interceptors is not None and type(interceptors) != list: - interceptors = [interceptors] - self._interceptors: List[BaseInterceptor] = interceptors + Flowcept.db.insert_or_update_workflow(wf_obj) + Flowcept.current_workflow_id = wf_obj.workflow_id + self.is_started = False def start(self): diff --git a/flowcept/instrumentation/decorators/flowcept_task.py b/flowcept/instrumentation/decorators/flowcept_task.py index 92841931..92554509 100644 --- a/flowcept/instrumentation/decorators/flowcept_task.py +++ b/flowcept/instrumentation/decorators/flowcept_task.py @@ -1,6 +1,7 @@ from time import time from functools import wraps import flowcept.commons +from flowcept import Flowcept from flowcept.commons.flowcept_dataclasses.task_object import ( TaskObject, Status, @@ -25,6 +26,9 @@ def default_args_handler(task_message: TaskObject, *args, **kwargs): "workflow_id", None ) args_handled.update(kwargs) + task_message.workflow_id = ( + task_message.workflow_id or Flowcept.current_workflow_id + ) if REPLACE_NON_JSON_SERIALIZABLE: args_handled = replace_non_serializable(args_handled) return args_handled diff --git a/flowcept/instrumentation/decorators/responsible_ai.py b/flowcept/instrumentation/decorators/responsible_ai.py index 5bda3519..18d19f97 100644 --- a/flowcept/instrumentation/decorators/responsible_ai.py +++ b/flowcept/instrumentation/decorators/responsible_ai.py @@ -1,7 +1,7 @@ from functools import wraps import numpy as np from torch import nn -from flowcept import DBAPI +from flowcept import Flowcept from flowcept.commons.utils import replace_non_serializable from flowcept.configs import REPLACE_NON_JSON_SERIALIZABLE, INSTRUMENTATION @@ -110,7 +110,7 @@ def wrapper(*args, **kwargs): if INSTRUMENTATION.get("torch", False) and INSTRUMENTATION[ "torch" ].get("save_models", False): - obj_id = DBAPI().save_torch_model( + obj_id = Flowcept.db.save_torch_model( model, custom_metadata=ret["responsible_ai_metadata"] ) ret["object_id"] = obj_id diff --git a/flowcept/main.py b/flowcept/main.py index 7e1d69d1..f6870e9c 100644 --- a/flowcept/main.py +++ b/flowcept/main.py @@ -1,7 +1,7 @@ import sys from flowcept import ( - FlowceptConsumerAPI, + Flowcept, ZambezeInterceptor, MLFlowInterceptor, TensorboardInterceptor, @@ -36,7 +36,7 @@ def main(): ) interceptors.append(interceptor) - consumer = FlowceptConsumerAPI(interceptors) + consumer = Flowcept(interceptors) consumer.start() diff --git a/flowcept/version.py b/flowcept/version.py index 070d2b22..41cdf622 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.4.1" +__version__ = "0.5.0" diff --git a/tests/adapters/dask_test_utils.py b/tests/adapters/dask_test_utils.py index 31500495..3d1a422a 100644 --- a/tests/adapters/dask_test_utils.py +++ b/tests/adapters/dask_test_utils.py @@ -1,7 +1,7 @@ from dask.distributed import Client, LocalCluster from distributed import Status -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept def close_dask(client, cluster): @@ -29,7 +29,9 @@ def setup_local_dask_cluster(consumer=None, n_workers=1, exec_bundle=None): ) if consumer is None or not consumer.is_started: - consumer = FlowceptConsumerAPI(bundle_exec_id=exec_bundle).start() + consumer = Flowcept( + interceptors="dask", bundle_exec_id=exec_bundle + ).start() cluster = LocalCluster(n_workers=n_workers) scheduler = cluster.scheduler diff --git a/tests/adapters/test_dask.py b/tests/adapters/test_dask.py index c5d66a7d..ee0c2b78 100644 --- a/tests/adapters/test_dask.py +++ b/tests/adapters/test_dask.py @@ -5,7 +5,7 @@ from dask.distributed import Client, LocalCluster -from flowcept import FlowceptConsumerAPI, TaskQueryAPI, DBAPI +from flowcept import Flowcept, TaskQueryAPI from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import ( assert_by_querying_tasks_until, @@ -51,12 +51,10 @@ def forced_error_func(x): class TestDask(unittest.TestCase): client: Client = None cluster: LocalCluster = None - consumer: FlowceptConsumerAPI = None + consumer: Flowcept = None def __init__(self, *args, **kwargs): super(TestDask, self).__init__(*args, **kwargs) - self.query_api = TaskQueryAPI() - self.db_api = DBAPI() self.logger = FlowceptLogger() @classmethod @@ -176,7 +174,8 @@ def test_observer_and_consumption(self): and len(docs[0]["generated"]) > 0, ) assert evaluate_until( - lambda: self.db_api.get_workflow(workflow_id=wf_id) is not None, + lambda: TestDask.consumer.db.get_workflow(workflow_id=wf_id) + is not None, msg="Checking if workflow object was saved in db", ) print("All conditions met!") diff --git a/tests/adapters/test_dask_with_context_mgmt.py b/tests/adapters/test_dask_with_context_mgmt.py index 05a1636c..13d34418 100644 --- a/tests/adapters/test_dask_with_context_mgmt.py +++ b/tests/adapters/test_dask_with_context_mgmt.py @@ -4,7 +4,7 @@ from dask.distributed import Client from distributed import LocalCluster -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until from flowcept.flowceptor.adapters.dask.dask_plugins import ( @@ -38,7 +38,7 @@ def test_workflow(self): client.register_plugin(FlowceptDaskWorkerAdapter()) register_dask_workflow(client) - with FlowceptConsumerAPI(): + with Flowcept("dask"): i1 = np.random.random() o1 = client.submit(dummy_func1, i1) self.logger.debug(o1.result()) diff --git a/tests/adapters/test_mlflow.py b/tests/adapters/test_mlflow.py index 6c3dcec7..5993d501 100644 --- a/tests/adapters/test_mlflow.py +++ b/tests/adapters/test_mlflow.py @@ -2,7 +2,7 @@ from time import sleep from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept import MLFlowInterceptor, FlowceptConsumerAPI +from flowcept import MLFlowInterceptor, Flowcept from flowcept.commons.utils import ( assert_by_querying_tasks_until, evaluate_until, @@ -67,7 +67,7 @@ def test_observer_and_consumption(self): # with open(self.interceptor.settings.file_path, 'w+') as f: # f.write("") - with FlowceptConsumerAPI(self.interceptor): + with Flowcept(self.interceptor): run_uuid = self.test_pure_run_mlflow() # sleep(3) diff --git a/tests/adapters/test_tensorboard.py b/tests/adapters/test_tensorboard.py index 01d53fe6..347efe72 100644 --- a/tests/adapters/test_tensorboard.py +++ b/tests/adapters/test_tensorboard.py @@ -5,7 +5,7 @@ from flowcept.configs import MONGO_INSERTION_BUFFER_TIME from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept import TensorboardInterceptor, FlowceptConsumerAPI, TaskQueryAPI +from flowcept import TensorboardInterceptor, Flowcept, TaskQueryAPI from flowcept.commons.utils import ( assert_by_querying_tasks_until, evaluate_until, @@ -142,7 +142,7 @@ def run(run_dir, hparams): def test_observer_and_consumption(self): self.reset_log_dir() - with FlowceptConsumerAPI(self.interceptor): + with Flowcept(self.interceptor): wf_id = self.run_tensorboard_hparam_tuning() self.logger.debug("Done training. Sleeping some time...") watch_interval_sec = MONGO_INSERTION_BUFFER_TIME diff --git a/tests/adapters/test_zambeze.py b/tests/adapters/test_zambeze.py index 6fd1482c..ad43efcf 100644 --- a/tests/adapters/test_zambeze.py +++ b/tests/adapters/test_zambeze.py @@ -7,7 +7,7 @@ from pika.exceptions import AMQPConnectionError from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept import ZambezeInterceptor, FlowceptConsumerAPI, TaskQueryAPI +from flowcept import ZambezeInterceptor, Flowcept, TaskQueryAPI from flowcept.flowceptor.adapters.zambeze.zambeze_dataclasses import ( ZambezeMessage, ) @@ -35,7 +35,7 @@ def __init__(self, *args, **kwargs): print(f"An error occurred: {e}") return - self.consumer = FlowceptConsumerAPI(interceptor) + self.consumer = Flowcept(interceptor) self._channel = self._connection.channel() self._queue_names = interceptor.settings.queue_names self._channel.queue_declare(queue=self._queue_names[0]) diff --git a/tests/api/dbapi_test.py b/tests/api/dbapi_test.py index 588e93ff..93d4a366 100644 --- a/tests/api/dbapi_test.py +++ b/tests/api/dbapi_test.py @@ -2,10 +2,7 @@ from uuid import uuid4 from flowcept.commons.flowcept_dataclasses.task_object import TaskObject -from flowcept.commons.flowcept_dataclasses.workflow_object import ( - WorkflowObject, -) -from flowcept.flowcept_api.db_api import DBAPI +from flowcept import Flowcept, WorkflowObject from flowcept.flowceptor.telemetry_capture import TelemetryCapture @@ -20,17 +17,16 @@ def __str__(self): class WorkflowDBTest(unittest.TestCase): def test_wf_dao(self): - dbapi = DBAPI() workflow1_id = str(uuid4()) wf1 = WorkflowObject() wf1.workflow_id = workflow1_id - assert dbapi.insert_or_update_workflow(wf1) + assert Flowcept.db.insert_or_update_workflow(wf1) wf1.custom_metadata = {"test": "abc"} - assert dbapi.insert_or_update_workflow(wf1) + assert Flowcept.db.insert_or_update_workflow(wf1) - wf_obj = dbapi.get_workflow(workflow_id=workflow1_id) + wf_obj = Flowcept.db.get_workflow(workflow_id=workflow1_id) assert wf_obj is not None print(wf_obj) @@ -41,56 +37,56 @@ def test_wf_dao(self): wf2.workflow_id = wf2_id tel = TelemetryCapture() - assert dbapi.insert_or_update_workflow(wf2) + assert Flowcept.db.insert_or_update_workflow(wf2) wf2.interceptor_ids = ["123"] - assert dbapi.insert_or_update_workflow(wf2) + assert Flowcept.db.insert_or_update_workflow(wf2) wf2.interceptor_ids = ["1234"] - assert dbapi.insert_or_update_workflow(wf2) - wf_obj = dbapi.get_workflow(wf2_id) + assert Flowcept.db.insert_or_update_workflow(wf2) + wf_obj = Flowcept.db.get_workflow(wf2_id) assert len(wf_obj.interceptor_ids) == 2 wf2.machine_info = {"123": tel.capture_machine_info()} - assert dbapi.insert_or_update_workflow(wf2) - wf_obj = dbapi.get_workflow(wf2_id) + assert Flowcept.db.insert_or_update_workflow(wf2) + wf_obj = Flowcept.db.get_workflow(wf2_id) assert wf_obj wf2.machine_info = {"1234": tel.capture_machine_info()} - assert dbapi.insert_or_update_workflow(wf2) - wf_obj = dbapi.get_workflow(wf2_id) + assert Flowcept.db.insert_or_update_workflow(wf2) + wf_obj = Flowcept.db.get_workflow(wf2_id) assert len(wf_obj.machine_info) == 2 def test_save_blob(self): - dbapi = DBAPI() import pickle obj = pickle.dumps(OurObject()) - obj_id = dbapi.save_object(object=obj) + obj_id = Flowcept.db.save_object(object=obj) print(obj_id) - obj_docs = dbapi.query(filter={"object_id": obj_id}, type="object") + obj_docs = Flowcept.db.query( + filter={"object_id": obj_id}, type="object" + ) loaded_obj = pickle.loads(obj_docs[0]["data"]) assert type(loaded_obj) == OurObject def test_dump(self): - dbapi = DBAPI() wf_id = str(uuid4()) - c0 = dbapi._dao.count() + c0 = Flowcept.db._dao.count() for i in range(10): t = TaskObject() t.workflow_id = wf_id t.task_id = str(uuid4()) - dbapi.insert_or_update_task(t) + Flowcept.db.insert_or_update_task(t) _filter = {"workflow_id": wf_id} - assert dbapi.dump_to_file( + assert Flowcept.db.dump_to_file( filter=_filter, ) - assert dbapi.dump_to_file(filter=_filter, should_zip=True) - assert dbapi.dump_to_file( + assert Flowcept.db.dump_to_file(filter=_filter, should_zip=True) + assert Flowcept.db.dump_to_file( filter=_filter, output_file="dump_test.json" ) - dbapi._dao.delete_with_filter(_filter) - c1 = dbapi._dao.count() + Flowcept.db._dao.delete_with_filter(_filter) + c1 = Flowcept.db._dao.count() assert c0 == c1 diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index 4e2531d2..fde9a203 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -1,49 +1,53 @@ import unittest -from time import sleep -from uuid import uuid4 from flowcept import ( - FlowceptConsumerAPI, - WorkflowObject, - DBAPI, - INSTRUMENTATION, + Flowcept, flowcept_task, ) from flowcept.commons.utils import assert_by_querying_tasks_until @flowcept_task -def sum_one(n, workflow_id=None): +def sum_one(n): return n + 1 @flowcept_task -def mult_two(n, workflow_id=None): +def mult_two(n): return n * 2 class FlowceptAPITest(unittest.TestCase): def test_simple_workflow(self): - db = DBAPI() - assert FlowceptConsumerAPI.services_alive() + assert Flowcept.services_alive() - wf_id = str(uuid4()) - with FlowceptConsumerAPI(INSTRUMENTATION): - # The next line is optional - db.insert_or_update_workflow(WorkflowObject(workflow_id=wf_id)) + with Flowcept(workflow_name="test_workflow"): n = 3 - o1 = sum_one(n, workflow_id=wf_id) - o2 = mult_two(o1, workflow_id=wf_id) + o1 = sum_one(n) + o2 = mult_two(o1) print(o2) assert assert_by_querying_tasks_until( - {"workflow_id": wf_id}, + {"workflow_id": Flowcept.current_workflow_id}, condition_to_evaluate=lambda docs: len(docs) == 2, ) - print("workflow_id", wf_id) + print("workflow_id", Flowcept.current_workflow_id) - assert len(db.query(filter={"workflow_id": wf_id})) == 2 assert ( - len(db.query(type="workflow", filter={"workflow_id": wf_id})) == 1 + len( + Flowcept.db.query( + filter={"workflow_id": Flowcept.current_workflow_id} + ) + ) + == 2 + ) + assert ( + len( + Flowcept.db.query( + type="workflow", + filter={"workflow_id": Flowcept.current_workflow_id}, + ) + ) + == 1 ) diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index 56c98dd1..9e8d0022 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -11,7 +11,7 @@ import flowcept.commons import flowcept.instrumentation.decorators -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept import unittest @@ -147,7 +147,7 @@ def decorated_function_with_self(self, x, workflow_id=None): def test_decorated_function(self): workflow_id = str(uuid.uuid4()) # TODO :refactor-base-interceptor: - with FlowceptConsumerAPI(FlowceptConsumerAPI.INSTRUMENTATION): + with Flowcept(): self.decorated_function_with_self(x=0.1, workflow_id=workflow_id) decorated_static_function( df=pd.DataFrame(), workflow_id=workflow_id @@ -169,10 +169,7 @@ def test_decorated_function_simple( workflow_id = str(uuid.uuid4()) print(workflow_id) # TODO :refactor-base-interceptor: - consumer = FlowceptConsumerAPI( - interceptors=FlowceptConsumerAPI.INSTRUMENTATION, - start_doc_inserter=start_doc_inserter, - ) + consumer = Flowcept(start_doc_inserter=start_doc_inserter) consumer.start() t0 = time() for i in range(max_tasks): diff --git a/tests/decorator_tests/ml_tests/dl_trainer.py b/tests/decorator_tests/ml_tests/dl_trainer.py index ade6bee6..050fe9e3 100644 --- a/tests/decorator_tests/ml_tests/dl_trainer.py +++ b/tests/decorator_tests/ml_tests/dl_trainer.py @@ -7,7 +7,7 @@ from flowcept import ( - FlowceptConsumerAPI, + Flowcept, ) from flowcept.instrumentation.decorators.flowcept_torch import ( register_modules, @@ -186,8 +186,7 @@ def model_fit( # We are calling the consumer api here (sometimes for the second time) # because we are capturing at two levels: at the model.fit and at # every layer. Can we do it better? - with FlowceptConsumerAPI( - FlowceptConsumerAPI.INSTRUMENTATION, + with Flowcept( bundle_exec_id=workflow_id, start_doc_inserter=False, ): diff --git a/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py b/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py index 7af1fceb..80b8433a 100644 --- a/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py +++ b/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py @@ -11,7 +11,7 @@ from datasets import load_dataset import flowcept -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept from flowcept.configs import N_GPUS from flowcept.instrumentation.decorators.flowcept_torch import ( @@ -280,8 +280,7 @@ def model_train( # TODO :ml-refactor: save device type and random seed: https://pytorch.org/docs/stable/notes/randomness.html # TODO :base-interceptor-refactor: Can we do it better? - with FlowceptConsumerAPI( - FlowceptConsumerAPI.INSTRUMENTATION, + with Flowcept( bundle_exec_id=workflow_id, start_doc_inserter=False, ): From ce8b6f595636e821eced3024930df7f17a41f0df Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 23 Sep 2024 15:59:52 -0400 Subject: [PATCH 2/6] Fix test error --- flowcept/flowcept_api/flowcept_controller.py | 2 ++ tests/decorator_tests/ml_tests/ml_decorator_test.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flowcept/flowcept_api/flowcept_controller.py b/flowcept/flowcept_api/flowcept_controller.py index cb9aee96..82139024 100644 --- a/flowcept/flowcept_api/flowcept_controller.py +++ b/flowcept/flowcept_api/flowcept_controller.py @@ -70,6 +70,8 @@ def __init__( ) Flowcept.db.insert_or_update_workflow(wf_obj) Flowcept.current_workflow_id = wf_obj.workflow_id + else: + Flowcept.current_workflow_id = None self.is_started = False diff --git a/tests/decorator_tests/ml_tests/ml_decorator_test.py b/tests/decorator_tests/ml_tests/ml_decorator_test.py index fb0c3869..e3155643 100644 --- a/tests/decorator_tests/ml_tests/ml_decorator_test.py +++ b/tests/decorator_tests/ml_tests/ml_decorator_test.py @@ -2,7 +2,7 @@ import unittest -from flowcept import DBAPI +from flowcept import Flowcept from tests.decorator_tests.ml_tests.dl_trainer import ModelTrainer, TestNet @@ -32,7 +32,7 @@ def test_cnn_model_trainer(): c.pop("workflow_id") loaded_model = TestNet(**c) - loaded_model = DBAPI().load_torch_model( + loaded_model = Flowcept.db.load_torch_model( loaded_model, result["object_id"] ) assert len(loaded_model(result["test_data"])) From 921a90145b97c22be7d268c790a154f1df35ff3b Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 23 Sep 2024 16:37:18 -0400 Subject: [PATCH 3/6] Fix dask llm test --- .../ml_tests/llm_tests/decorator_dask_llm_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py b/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py index b9b52798..db4b6f63 100644 --- a/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py +++ b/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py @@ -1,8 +1,8 @@ import unittest - +import itertools import uuid -from flowcept import WorkflowObject, DBAPI +from flowcept import WorkflowObject, Flowcept from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.adapters.dask.dask_plugins import ( @@ -80,7 +80,6 @@ def __init__(self, *args, **kwargs): def test_llm(self): # Manually registering the DataPrep workflow (manual instrumentation) tokenizer = "toktok" # basic_english, moses, toktok - db_api = DBAPI() dataset_prep_wf = WorkflowObject() dataset_prep_wf.workflow_id = f"prep_wikitext_tokenizer_{tokenizer}" dataset_prep_wf.used = {"tokenizer": tokenizer} @@ -94,7 +93,7 @@ def test_llm(self): "test_data": id(test_data), } print(dataset_prep_wf) - db_api.insert_or_update_workflow(dataset_prep_wf) + Flowcept.db.insert_or_update_workflow(dataset_prep_wf) # Automatically registering the Dask workflow train_wf_id = str(uuid.uuid4()) From b9e85a2f9a0da746f1f06b49cbf9fb34342a9fa1 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 23 Sep 2024 16:46:49 -0400 Subject: [PATCH 4/6] Fix in Kafka GHAction --- .github/workflows/run-tests-kafka.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-tests-kafka.yml b/.github/workflows/run-tests-kafka.yml index f2ec9b0b..608eee77 100644 --- a/.github/workflows/run-tests-kafka.yml +++ b/.github/workflows/run-tests-kafka.yml @@ -23,6 +23,8 @@ jobs: run: | python -m pip install --upgrade pip pip install -e .[fulldev] + - name: Pip list + run: pip list - name: Run Docker Compose run: docker compose -f deployment/compose-kafka.yml up -d - name: Wait 1 min @@ -32,7 +34,7 @@ jobs: export MQ_TYPE=kafka export MQ_PORT=9092 python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")' - python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' - name: Run Tests with Kafka run: | export MQ_TYPE=kafka @@ -44,7 +46,7 @@ jobs: export MQ_TYPE=kafka export MQ_PORT=9092 python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")' - python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' python flowcept/flowcept_webserver/app.py & sleep 3 From a92d70b3007466ea858685270f0f3fcb97121c1e Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 23 Sep 2024 17:06:51 -0400 Subject: [PATCH 5/6] Fix notebooks for new Flowcept api --- notebooks/analytics.ipynb | 5 ++-- notebooks/dask.ipynb | 46 +++++++++++++++++------------------ notebooks/dask_from_CLI.ipynb | 10 ++++---- notebooks/mlflow.ipynb | 10 ++++---- notebooks/sample_data.csv | 10 ++++++++ notebooks/tensorboard.ipynb | 10 ++++---- notebooks/zambeze.ipynb | 10 ++++---- 7 files changed, 55 insertions(+), 46 deletions(-) create mode 100644 notebooks/sample_data.csv diff --git a/notebooks/analytics.ipynb b/notebooks/analytics.ipynb index 48a288fa..0bc2ea70 100644 --- a/notebooks/analytics.ipynb +++ b/notebooks/analytics.ipynb @@ -32,8 +32,7 @@ " \"\"\"\n", " import json\n", " from uuid import uuid4\n", - " from flowcept import DBAPI\n", - " db_api = DBAPI()\n", + " from flowcept import Flowcept\n", " test_data_path = '../tests/api/sample_data_with_telemetry_and_rai.json' # This sample data contains a workflow composed of 9 tasks.\n", " with open(test_data_path) as f:\n", " base_data = json.loads(f.read())\n", @@ -47,7 +46,7 @@ " new_doc[\"workflow_id\"] = wf_id\n", " docs.append(new_doc)\n", " \n", - " inserted_ids = db_api._dao.insert_many(docs)\n", + " inserted_ids = Flowcept.db._dao.insert_many(docs)\n", " assert len(inserted_ids) == len(base_data)\n", " return wf_id" ] diff --git a/notebooks/dask.ipynb b/notebooks/dask.ipynb index 071de54e..ff81a146 100644 --- a/notebooks/dask.ipynb +++ b/notebooks/dask.ipynb @@ -14,7 +14,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "e7f738dd-4e78-4707-a0b4-b7ddc729a635", "metadata": { "tags": [] @@ -30,7 +30,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "0d399983-63f2-4f0d-acdc-6e3ff4abbb4d", "metadata": { "tags": [] @@ -44,7 +44,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "50721f98-6f40-4bd9-83f1-56e83e75aa8b", "metadata": { "tags": [] @@ -80,7 +80,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "47a4cf2c-4b5c-4fe5-9973-cda2734b0623", "metadata": { "tags": [] @@ -114,27 +114,27 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "b1e311c3-ca2a-4cf5-9a38-6742c91a0035", "metadata": { "tags": [] }, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept('dask')" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "fe609b49-28cf-4f2c-9027-ee7bc51fb86a", "metadata": { "tags": [] }, "outputs": [], "source": [ - "consumer.start()" + "flowcept.start()" ] }, { @@ -149,7 +149,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "702f3c58-2a52-4763-87d9-fd7062192e48", "metadata": { "tags": [] @@ -172,7 +172,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "e843f2c8-4566-46f2-95de-34d17bd4c061", "metadata": { "tags": [] @@ -194,7 +194,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "84307c0a-6ef5-428d-bf01-fd921e148c86", "metadata": { "tags": [] @@ -212,7 +212,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "e211229d-ac01-48c6-81f1-efba8e72d58c", "metadata": { "tags": [] @@ -234,7 +234,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "34fbe181-c55d-4ac4-84bf-0684fb3f54ca", "metadata": { "tags": [] @@ -247,7 +247,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "72a80432-f4fd-459e-a3f2-900beeea434d", "metadata": { "tags": [] @@ -267,7 +267,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "82d8a1cc-86c8-48a6-b91e-d822c0417c1b", "metadata": { "tags": [] @@ -281,7 +281,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "2a1afa5d-3934-4188-8a35-4d221bd58550", "metadata": { "tags": [] @@ -293,7 +293,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "f0a5d746-1157-4591-af37-76360e7a7b1c", "metadata": { "tags": [] @@ -315,19 +315,19 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "45291c13-4fcf-47b4-9b9f-de0050b1b076", "metadata": { "tags": [] }, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "7a3d4d67-315a-46dd-a41e-b15174dc9784", "metadata": { "tags": [] @@ -339,7 +339,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "fb51cbaf-2127-4fe3-8fb6-e1be9d009f7e", "metadata": { "tags": [] @@ -366,7 +366,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/dask_from_CLI.ipynb b/notebooks/dask_from_CLI.ipynb index dc8bfe9e..2aed4697 100644 --- a/notebooks/dask_from_CLI.ipynb +++ b/notebooks/dask_from_CLI.ipynb @@ -51,9 +51,9 @@ }, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI()\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept()\n", + "flowcept.start()" ] }, { @@ -161,7 +161,7 @@ "metadata": {}, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] } ], @@ -181,7 +181,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/mlflow.ipynb b/notebooks/mlflow.ipynb index 7f5a84fa..e8b9d5ab 100644 --- a/notebooks/mlflow.ipynb +++ b/notebooks/mlflow.ipynb @@ -148,9 +148,9 @@ "metadata": {}, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI(interceptor)\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept(interceptor)\n", + "flowcept.start()" ] }, { @@ -402,7 +402,7 @@ }, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] } ], @@ -422,7 +422,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/sample_data.csv b/notebooks/sample_data.csv new file mode 100644 index 00000000..86cfa453 --- /dev/null +++ b/notebooks/sample_data.csv @@ -0,0 +1,10 @@ +,used.max_epochs,generated.loss,generated.accuracy,generated.responsible_ai_metadata.shap_sum,generated.responsible_ai_metadata.flops,generated.responsible_ai_metadata.params,generated.responsible_ai_metadata.max_width,generated.responsible_ai_metadata.depth,generated.responsible_ai_metadata.n_fc_layers,generated.responsible_ai_metadata.n_cv_layers,telemetry_diff.cpu.times_avg.user,telemetry_diff.cpu.times_avg.system,telemetry_diff.cpu.times_avg.idle,telemetry_diff.process.memory.rss,telemetry_diff.process.memory.vms,telemetry_diff.process.memory.pfaults,telemetry_diff.process.memory.pageins,telemetry_diff.process.cpu_times.user,telemetry_diff.process.cpu_times.system,telemetry_diff.memory.virtual.available,telemetry_diff.memory.virtual.used,telemetry_diff.memory.virtual.free,telemetry_diff.memory.virtual.active,telemetry_diff.memory.virtual.inactive,telemetry_diff.memory.virtual.wired,telemetry_diff.memory.swap.total,telemetry_diff.memory.swap.used,telemetry_diff.memory.swap.free,telemetry_diff.memory.swap.sin,telemetry_diff.memory.swap.sout,telemetry_diff.disk.disk_usage.free,used.conv_in_outs_sum,used.conv_kernel_sizes_sum,used.conv_pool_sizes_sum,used.fc_in_outs_sum,used.softmax_dims_sum,telemetry_diff.network.activity,telemetry_diff.disk.activity,telemetry_diff.process.activity +0,1.0,0.014728536784648895,40.75,0.0,21880192.0,162990.0,100.0,12.0,5.0,7.0,397.4499999999971,155.3100000000013,16.699999999953434,2720251904.0,4253073408.0,7128964.0,39.0,314.642169344,60.419478912,-779124736.0,-900841472.0,119111680.0,-967688192.0,-746061824.0,66846720.0,1073741824.0,577437696.0,496304128.0,371359744.0,5390336.0,-1067229184.0,41.0,29.0,2.0,220.0,1.0,156472.4375,171472968.0,1997778.0 +1,1.0,0.040325844478607174,11.35,0.0,47275136.0,359840.0,400.0,16.0,9.0,7.0,411.41999999999825,159.40999999999985,17.17000000004191,2675638272.0,4349198336.0,7289754.0,39.0,326.14157747200005,62.117328288,-930316288.0,-887881728.0,12861440.0,-937263104.0,-792707072.0,49381376.0,1073741824.0,577437696.0,496304128.0,373030912.0,5390336.0,-1067397120.0,41.0,29.0,2.0,1620.0,3.0,162635.8125,172406277.0,2044184.0 +2,1.0,0.05815730080604553,11.35,0.0,5405073024.0,42184840.0,4000.0,24.0,17.0,7.0,1179.7599999999948,401.4300000000003,50.68000000005122,2231304192.0,3327492096.0,17076525.0,39.0,942.530430464,153.566138432,-159088640.0,-1295351808.0,751190016.0,-1227440128.0,-747388928.0,-67911680.0,1073741824.0,560660480.0,513081344.0,738394112.0,14172160.0,-1065861120.0,41.0,29.0,2.0,32020.0,7.0,369630.8125,251856855.0,4531840.0 +3,1.0,0.018241909003257752,10.28,0.0,324195712.0,1890690.0,100.0,16.0,5.0,11.0,1862.6100000000006,579.5999999999985,114.22000000003027,2189475840.0,3818635264.0,23538438.0,39.0,1486.561319424,216.581898688,-693305344.0,-804503552.0,55394304.0,-755220480.0,-590577664.0,-49283072.0,1073741824.0,552271872.0,521469952.0,803258368.0,21512192.0,-1069760512.0,181.0,30.0,3.0,260.0,1.0,650028.625,272619547.0,6284750.0 +4,1.0,0.040312224340438844,11.35,0.0,349846656.0,2089540.0,400.0,20.0,9.0,11.0,1875.5800000000017,582.4599999999991,115.87000000005355,2155741184.0,3380838400.0,23604991.0,37.0,1496.636215552,217.575447296,-489193472.0,-862502912.0,309952512.0,-788512768.0,-642940928.0,-73990144.0,1073741824.0,552271872.0,521469952.0,819249152.0,22134784.0,-1070206976.0,181.0,30.0,3.0,1660.0,3.0,652738.75,275295002.8333333,6311004.0 +5,1.0,0.05813799858093262,11.35,0.0,5709692544.0,43930540.0,4000.0,28.0,17.0,11.0,2384.9100000000035,701.8199999999997,252.69000000000233,2042101760.0,3384459264.0,27004118.0,37.0,1888.629006592,255.444219456,-567836672.0,-731906048.0,224264192.0,-695762944.0,-632913920.0,-36143104.0,1073741824.0,552271872.0,521469952.0,884097024.0,29392896.0,-1060265984.0,181.0,30.0,3.0,32060.0,7.0,799624.5,295061285.8333333,7313349.0 +6,1.0,0.018207813382148743,10.09,0.0,1810792832.0,8485880.0,120.0,20.0,5.0,15.0,4169.770000000004,1086.2099999999991,1084.350000000035,968032256.0,2068332544.0,35022644.0,37.0,3220.7134123520004,354.79103712,-17186816.0,-623886336.0,534364160.0,-553648128.0,-393068544.0,-70238208.0,1073741824.0,543883264.0,529858560.0,1058029568.0,49823744.0,-1066872832.0,481.0,31.0,4.0,320.0,1.0,1362017.5625,347619554.0,10075678.0 +7,1.0,0.04012699522972107,11.35,0.0,1836827776.0,8687730.0,400.0,24.0,9.0,15.0,4172.880000000005,1086.9700000000012,1087.1500000000233,1305690112.0,2392899584.0,35045100.0,37.0,3222.663728896,354.923501504,-17170432.0,-623886336.0,534364160.0,-553648128.0,-393052160.0,-70238208.0,1073741824.0,543883264.0,529858560.0,1058029568.0,49823744.0,-1067188224.0,481.0,31.0,4.0,1720.0,3.0,1363559.3125,345680791.6666667,10079114.0 +8,1.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,481.0,31.0,4.0,32120.0,7.0,,,0.0 diff --git a/notebooks/tensorboard.ipynb b/notebooks/tensorboard.ipynb index f8f9fc10..b69f84f7 100644 --- a/notebooks/tensorboard.ipynb +++ b/notebooks/tensorboard.ipynb @@ -302,9 +302,9 @@ }, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI(interceptor)\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept(interceptor)\n", + "flowcept.start()" ] }, { @@ -339,7 +339,7 @@ "outputs": [], "source": [ "sleep(10)\n", - "consumer.stop()" + "flowcept.stop()" ] }, { @@ -385,7 +385,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/zambeze.ipynb b/notebooks/zambeze.ipynb index 77b031ff..979e87a7 100644 --- a/notebooks/zambeze.ipynb +++ b/notebooks/zambeze.ipynb @@ -114,9 +114,9 @@ "metadata": {}, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI(interceptor)\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept(interceptor)\n", + "flowcept.start()" ] }, { @@ -205,7 +205,7 @@ "metadata": {}, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] } ], @@ -225,7 +225,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, From 585f40f449e89d8f340786801f639128d4d0a123 Mon Sep 17 00:00:00 2001 From: Renan Francisco Santos Souza <1754978+renan-souza@users.noreply.github.com> Date: Mon, 23 Sep 2024 21:21:40 -0400 Subject: [PATCH 6/6] Update run-tests.yml --- .github/workflows/run-tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 60b2f45e..82cc8d63 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -45,10 +45,10 @@ jobs: export MQ_TYPE=kafka export MQ_PORT=9092 python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")' - python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' - name: Run Tests with Kafka run: | export MQ_TYPE=kafka export MQ_PORT=9092 # Ignoring heavy tests. They are executed with Kafka in another GH Action. - pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py \ No newline at end of file + pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py