From 8bd5b9fa926d1693275c20cfda0068aa6621fa8f Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 26 Sep 2024 10:29:06 -0400 Subject: [PATCH 1/9] Using the interceptor instead of direct workflow insertion --- extra_requirements/amd-requirements.txt | 1 + flowcept/flowcept_api/flowcept_controller.py | 26 +++++++++++++------ .../flowceptor/adapters/base_interceptor.py | 13 +++++----- .../instrumentation/decorators/__init__.py | 2 +- tests/api/flowcept_api_test.py | 19 ++++++++++++++ 5 files changed, 45 insertions(+), 16 deletions(-) diff --git a/extra_requirements/amd-requirements.txt b/extra_requirements/amd-requirements.txt index e69de29b..73030b11 100644 --- a/extra_requirements/amd-requirements.txt +++ b/extra_requirements/amd-requirements.txt @@ -0,0 +1 @@ +# On the machines that have AMD GPUs, we use the official AMD ROCM library to capture GPU runtime data. Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/ \ No newline at end of file diff --git a/flowcept/flowcept_api/flowcept_controller.py b/flowcept/flowcept_api/flowcept_controller.py index 82139024..e17d821c 100644 --- a/flowcept/flowcept_api/flowcept_controller.py +++ b/flowcept/flowcept_api/flowcept_controller.py @@ -64,14 +64,9 @@ def __init__( interceptors = [interceptors] self._interceptors: List[BaseInterceptor] = interceptors - if workflow_id or workflow_args or workflow_name: - wf_obj = WorkflowObject( - workflow_id, workflow_name, used=workflow_args - ) - Flowcept.db.insert_or_update_workflow(wf_obj) - Flowcept.current_workflow_id = wf_obj.workflow_id - else: - Flowcept.current_workflow_id = None + self.workflow_id = workflow_id + self.workflow_name = workflow_name + self.workflow_args = workflow_args self.is_started = False @@ -91,6 +86,21 @@ def start(self): interceptor.start(bundle_exec_id=self._bundle_exec_id) self.logger.debug(f"...Flowceptor {key} started ok!") + if ( + self.workflow_id + or self.workflow_args + or self.workflow_name + ) and interceptor.kind == "instrumentation": + wf_obj = WorkflowObject( + self.workflow_id, + self.workflow_name, + used=self.workflow_args, + ) + interceptor.send_workflow_message(wf_obj) + Flowcept.current_workflow_id = wf_obj.workflow_id + else: + Flowcept.current_workflow_id = None + if self._start_doc_inserter: self.logger.debug("Flowcept Consumer starting...") diff --git a/flowcept/flowceptor/adapters/base_interceptor.py b/flowcept/flowceptor/adapters/base_interceptor.py index a0c2f157..ee097fa4 100644 --- a/flowcept/flowceptor/adapters/base_interceptor.py +++ b/flowcept/flowceptor/adapters/base_interceptor.py @@ -1,4 +1,5 @@ from abc import ABCMeta, abstractmethod +from uuid import uuid4 from flowcept.commons.flowcept_dataclasses.workflow_object import ( WorkflowObject, @@ -24,7 +25,7 @@ # in the code. https://github.com/ORNL/flowcept/issues/109 # class BaseInterceptor(object, metaclass=ABCMeta): class BaseInterceptor(object): - def __init__(self, plugin_key=None): + def __init__(self, plugin_key=None, kind=None): self.logger = FlowceptLogger() if ( plugin_key is not None @@ -38,6 +39,7 @@ def __init__(self, plugin_key=None): self.telemetry_capture = TelemetryCapture() self._saved_workflows = set() self._generated_workflow_id = False + self.kind = kind def prepare_task_msg(self, *args, **kwargs) -> TaskObject: raise NotImplementedError() @@ -79,12 +81,8 @@ def callback(self, *args, **kwargs): raise NotImplementedError() def send_workflow_message(self, workflow_obj: WorkflowObject): - wf_id = workflow_obj.workflow_id - if wf_id is None: - self.logger.warning( - f"Workflow_id is empty, we can't save this workflow_obj: {workflow_obj}" - ) - return + wf_id = workflow_obj.workflow_id or str(uuid4()) + workflow_obj.workflow_id = wf_id if wf_id in self._saved_workflows: return self._saved_workflows.add(wf_id) @@ -105,6 +103,7 @@ def send_workflow_message(self, workflow_obj: WorkflowObject): if ENRICH_MESSAGES: workflow_obj.enrich(self.settings.key if self.settings else None) self.intercept(workflow_obj.to_dict()) + return wf_id def intercept(self, obj_msg): self._mq_dao.buffer.append(obj_msg) diff --git a/flowcept/instrumentation/decorators/__init__.py b/flowcept/instrumentation/decorators/__init__.py index 1d68cb1a..362bdbc7 100644 --- a/flowcept/instrumentation/decorators/__init__.py +++ b/flowcept/instrumentation/decorators/__init__.py @@ -6,7 +6,7 @@ # Perhaps we should have a BaseAdaptor that would work for both and # observability and instrumentation adapters. This would be a major refactor # in the code. https://github.com/ORNL/flowcept/issues/109 -instrumentation_interceptor = BaseInterceptor() +instrumentation_interceptor = BaseInterceptor(kind="instrumentation") # TODO This above is bad because I am reusing the same BaseInterceptor both # for adapter-based observability + traditional instrumentation via @decorator # I'm just setting _registered_workflow to avoid the auto wf register that diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index fde9a203..0a911fad 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -34,6 +34,12 @@ def test_simple_workflow(self): print("workflow_id", Flowcept.current_workflow_id) + print( + Flowcept.db.query( + filter={"workflow_id": Flowcept.current_workflow_id} + ) + ) + assert ( len( Flowcept.db.query( @@ -51,3 +57,16 @@ def test_simple_workflow(self): ) == 1 ) + + @unittest.skip("Test only for dev.") + def test_continuous_run(self): + import numpy as np + from time import sleep + + with Flowcept(workflow_name="continuous_workflow_test"): + print(Flowcept.current_workflow_id) + while True: + n = np.random.rand() + o1 = sum_one(n) + o2 = mult_two(o1) + sleep(1) From 6976d156859a1340c62dd038df799a47e572009d Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Thu, 26 Sep 2024 11:44:40 -0400 Subject: [PATCH 2/9] Storing timestamps as dates in UTC --- .../flowcept_dataclasses/task_object.py | 16 +++++++++++ .../flowceptor/consumers/document_inserter.py | 28 +++++++++++++------ 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/flowcept/commons/flowcept_dataclasses/task_object.py b/flowcept/commons/flowcept_dataclasses/task_object.py index 13a1f35f..68bd4b2d 100644 --- a/flowcept/commons/flowcept_dataclasses/task_object.py +++ b/flowcept/commons/flowcept_dataclasses/task_object.py @@ -132,6 +132,22 @@ def to_dict(self): def serialize(self): return msgpack.dumps(self.to_dict()) + @staticmethod + def enrich_task_dict(task_dict: dict): + attributes = { + "campaign_id": CAMPAIGN_ID, + "node_name": NODE_NAME, + "login_name": LOGIN_NAME, + "public_ip": PUBLIC_IP, + "private_ip": PRIVATE_IP, + "hostname": HOSTNAME, + } + for key, fallback_value in attributes.items(): + if ( + key not in task_dict or task_dict[key] is None + ) and fallback_value is not None: + task_dict[key] = fallback_value + # @staticmethod # def deserialize(serialized_data) -> 'TaskObject': # dict_obj = msgpack.loads(serialized_data) diff --git a/flowcept/flowceptor/consumers/document_inserter.py b/flowcept/flowceptor/consumers/document_inserter.py index 5aced530..c0fb0a64 100644 --- a/flowcept/flowceptor/consumers/document_inserter.py +++ b/flowcept/flowceptor/consumers/document_inserter.py @@ -1,8 +1,11 @@ +from datetime import datetime from time import time, sleep from threading import Thread, Event, Lock from typing import Dict from uuid import uuid4 +import pytz + import flowcept.commons from flowcept.commons.daos.autoflush_buffer import AutoflushBuffer from flowcept.commons.flowcept_dataclasses.workflow_object import ( @@ -17,6 +20,7 @@ MONGO_ADAPTIVE_BUFFER_SIZE, JSON_SERIALIZER, MONGO_REMOVE_EMPTY_FIELDS, + ENRICH_MESSAGES, ) from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao @@ -107,10 +111,6 @@ def flush_function(buffer, doc_dao, logger=flowcept.commons.logger): logger.info(f"Flushed {len(buffer)} msgs to DocDB!") def _handle_task_message(self, message: Dict): - # if "utc_timestamp" in message: - # dt = datetime.fromtimestamp(message["utc_timestamp"]) - # message["timestamp"] = dt.utcnow() - # if DEBUG_MODE: # message["debug"] = True if "task_id" not in message: @@ -121,11 +121,21 @@ def _handle_task_message(self, message: Dict): if wf_id: message["workflow_id"] = wf_id - if not any( - time_field in message - for time_field in TaskObject.get_time_field_names() - ): - message["registered_at"] = time() + has_time_fields = False + for time_field in TaskObject.get_time_field_names(): + if time_field in message: + has_time_fields = True + message[time_field] = datetime.fromtimestamp( + message[time_field], pytz.utc + ) + + if not has_time_fields: + message["registered_at"] = datetime.fromtimestamp( + time(), pytz.utc + ) + + if ENRICH_MESSAGES: + TaskObject.enrich_task_dict(message) message.pop("type") From 603fa6a7ec6bd36575ce1b86cb3eb1a0cff75cbd Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 27 Sep 2024 13:10:38 -0400 Subject: [PATCH 3/9] Addressing notebook errors --- .github/workflows/run-tests.yml | 2 +- flowcept/flowcept_api/flowcept_controller.py | 6 +- notebooks/mlflow.ipynb | 127 ++++++++++++++++--- notebooks/tensorboard.ipynb | 34 ++++- tests/adapters/test_mlflow.py | 35 +++-- 5 files changed, 171 insertions(+), 33 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 82cc8d63..738b6fff 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -33,7 +33,7 @@ jobs: python flowcept/flowcept_webserver/app.py & sleep 3 export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml - pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb --ignore=notebooks/mlflow.ipynb - name: Shut down compose run: docker compose -f deployment/compose-full.yml down - name: Start Docker Compose with Kafka diff --git a/flowcept/flowcept_api/flowcept_controller.py b/flowcept/flowcept_api/flowcept_controller.py index e17d821c..4cacd3c8 100644 --- a/flowcept/flowcept_api/flowcept_controller.py +++ b/flowcept/flowcept_api/flowcept_controller.py @@ -64,7 +64,7 @@ def __init__( interceptors = [interceptors] self._interceptors: List[BaseInterceptor] = interceptors - self.workflow_id = workflow_id + self.current_workflow_id = workflow_id self.workflow_name = workflow_name self.workflow_args = workflow_args @@ -87,12 +87,12 @@ def start(self): self.logger.debug(f"...Flowceptor {key} started ok!") if ( - self.workflow_id + self.current_workflow_id or self.workflow_args or self.workflow_name ) and interceptor.kind == "instrumentation": wf_obj = WorkflowObject( - self.workflow_id, + self.current_workflow_id, self.workflow_name, used=self.workflow_args, ) diff --git a/notebooks/mlflow.ipynb b/notebooks/mlflow.ipynb index e8b9d5ab..4d262a14 100644 --- a/notebooks/mlflow.ipynb +++ b/notebooks/mlflow.ipynb @@ -24,7 +24,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "b2cd32bb-8f92-4026-a15b-8cee25189499", "metadata": { "tags": [] @@ -36,12 +36,54 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "a15ac32f-7bbc-4a14-829c-594467102947", "metadata": { "tags": [] }, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024/09/27 10:59:42 INFO mlflow.store.db.utils: Creating initial MLflow database tables...\n", + "2024/09/27 10:59:42 INFO mlflow.store.db.utils: Updating database tables\n", + "INFO [alembic.runtime.migration] Context impl SQLiteImpl.\n", + "INFO [alembic.runtime.migration] Will assume non-transactional DDL.\n", + "INFO [alembic.runtime.migration] Running upgrade -> 451aebb31d03, add metric step\n", + "INFO [alembic.runtime.migration] Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags\n", + "INFO [alembic.runtime.migration] Running upgrade 90e64c465722 -> 181f10493468, allow nulls for metric values\n", + "INFO [alembic.runtime.migration] Running upgrade 181f10493468 -> df50e92ffc5e, Add Experiment Tags Table\n", + "INFO [alembic.runtime.migration] Running upgrade df50e92ffc5e -> 7ac759974ad8, Update run tags with larger limit\n", + "INFO [alembic.runtime.migration] Running upgrade 7ac759974ad8 -> 89d4b8295536, create latest metrics table\n", + "INFO [89d4b8295536_create_latest_metrics_table_py] Migration complete!\n", + "INFO [alembic.runtime.migration] Running upgrade 89d4b8295536 -> 2b4d017a5e9b, add model registry tables to db\n", + "INFO [2b4d017a5e9b_add_model_registry_tables_to_db_py] Adding registered_models and model_versions tables to database.\n", + "INFO [2b4d017a5e9b_add_model_registry_tables_to_db_py] Migration complete!\n", + "INFO [alembic.runtime.migration] Running upgrade 2b4d017a5e9b -> cfd24bdc0731, Update run status constraint with killed\n", + "INFO [alembic.runtime.migration] Running upgrade cfd24bdc0731 -> 0a8213491aaa, drop_duplicate_killed_constraint\n", + "INFO [alembic.runtime.migration] Running upgrade 0a8213491aaa -> 728d730b5ebd, add registered model tags table\n", + "INFO [alembic.runtime.migration] Running upgrade 728d730b5ebd -> 27a6a02d2cf1, add model version tags table\n", + "INFO [alembic.runtime.migration] Running upgrade 27a6a02d2cf1 -> 84291f40a231, add run_link to model_version\n", + "INFO [alembic.runtime.migration] Running upgrade 84291f40a231 -> a8c4a736bde6, allow nulls for run_id\n", + "INFO [alembic.runtime.migration] Running upgrade a8c4a736bde6 -> 39d1c3be5f05, add_is_nan_constraint_for_metrics_tables_if_necessary\n", + "INFO [alembic.runtime.migration] Running upgrade 39d1c3be5f05 -> c48cb773bb87, reset_default_value_for_is_nan_in_metrics_table_for_mysql\n", + "INFO [alembic.runtime.migration] Running upgrade c48cb773bb87 -> bd07f7e963c5, create index on run_uuid\n", + "INFO [alembic.runtime.migration] Running upgrade bd07f7e963c5 -> 0c779009ac13, add deleted_time field to runs table\n", + "INFO [alembic.runtime.migration] Running upgrade 0c779009ac13 -> cc1f77228345, change param value length to 500\n", + "INFO [alembic.runtime.migration] Running upgrade cc1f77228345 -> 97727af70f4d, Add creation_time and last_update_time to experiments table\n", + "INFO [alembic.runtime.migration] Running upgrade 97727af70f4d -> 3500859a5d39, Add Model Aliases table\n", + "INFO [alembic.runtime.migration] Running upgrade 3500859a5d39 -> 7f2a7d5fae7d, add datasets inputs input_tags tables\n", + "INFO [alembic.runtime.migration] Running upgrade 7f2a7d5fae7d -> 2d6e25af4d3e, increase max param val length from 500 to 8000\n", + "INFO [alembic.runtime.migration] Running upgrade 2d6e25af4d3e -> acf3f17fdcc7, add storage location field to model versions\n", + "INFO [alembic.runtime.migration] Running upgrade acf3f17fdcc7 -> 867495a8f9d4, add trace tables\n", + "INFO [alembic.runtime.migration] Running upgrade 867495a8f9d4 -> 5b0e9adcef9c, add cascade deletion to trace tables foreign keys\n", + "INFO [alembic.runtime.migration] Running upgrade 5b0e9adcef9c -> 4465047574b1, increase max dataset schema size\n", + "INFO [alembic.runtime.migration] Context impl SQLiteImpl.\n", + "INFO [alembic.runtime.migration] Will assume non-transactional DDL.\n" + ] + } + ], "source": [ "## This cell Resets MLFlow Database\n", "! rm -f {SQLITE_PATH}\n", @@ -53,7 +95,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "id": "3ebd2157-dd51-494b-834a-5d6ba2034197", "metadata": { "tags": [] @@ -75,7 +117,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "c08c39cf-8489-47eb-aff3-d96768afc7e9", "metadata": {}, "outputs": [], @@ -110,10 +152,19 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "id": "7edd571f-9507-496f-8926-6dcc608b2358", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[flowcept][DEBUG][MAC132633][pid=74491][thread=8670187328][function=_build_logger][flowcept's base log is set up!]\n", + "DEBUG [flowcept] flowcept's base log is set up!\n" + ] + } + ], "source": [ "from flowcept import MLFlowInterceptor\n", "interceptor = MLFlowInterceptor()" @@ -121,7 +172,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "c03d7ef6-41cb-4be5-8cda-1aa8ddd33a5a", "metadata": { "tags": [] @@ -143,10 +194,21 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "id": "b57ee95c-a061-4f2d-b43a-d7e02d66bda8", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "from flowcept import Flowcept\n", "flowcept = Flowcept(interceptor)\n", @@ -165,12 +227,21 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 12, "id": "f1c0f6d7-7d47-4c61-9d5b-e5bd398326cd", "metadata": {}, - "outputs": [], - "source": [ - "mlflow_run_id = run_mlflow_workflow(interceptor.settings.file_path)\n", + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Generated training metadata.\n", + "MLflow task id=e78450bf8e274e649e92b12f0e6a3fd8\n" + ] + } + ], + "source": [ + "mlflow_run_id = run_mlflow_workflow(interceptor.settings.file_path, batch_size=18)\n", "print(f\"MLflow task id={mlflow_run_id}\")" ] }, @@ -253,7 +324,9 @@ "tags": [] }, "source": [ - "### Get the tasks executed in my experiment in the last 60 minutes" + "### Get the tasks executed in my experiment in the last 60 minutes\n", + "\n", + "This example assumes that you have run the Dask notebook example before. If you haven't run it, just ignore these queries." ] }, { @@ -337,6 +410,16 @@ "### Now run a new MLFlow task using the batch_sizes generated by the Dask workflow" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "072bdd5d-05e5-4387-be0c-e70c902b06d7", + "metadata": {}, + "outputs": [], + "source": [ + "batch_sizes = batch_sizes if len(batch_sizes) else [32] # To use this if you haven't executed the Dask workflow first" + ] + }, { "cell_type": "code", "execution_count": null, @@ -345,7 +428,7 @@ "outputs": [], "source": [ "for batch_size in batch_sizes:\n", - " mlflow_task = run_mlflow_workflow(interceptor.settings.file_path, batch_size=batch_size)\n", + " mlflow_task = run_mlflow_workflow(batch_size=batch_size)\n", " print(mlflow_task)" ] }, @@ -358,7 +441,7 @@ }, "outputs": [], "source": [ - "sleep(10)" + "sleep(15)" ] }, { @@ -385,6 +468,16 @@ "docs" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "9d7d75ed-9dd1-41b3-9276-65d052570771", + "metadata": {}, + "outputs": [], + "source": [ + "assert len(docs)" + ] + }, { "cell_type": "markdown", "id": "02448f6c-3f55-443f-95fd-51ed3c633265", diff --git a/notebooks/tensorboard.ipynb b/notebooks/tensorboard.ipynb index b69f84f7..d0694201 100644 --- a/notebooks/tensorboard.ipynb +++ b/notebooks/tensorboard.ipynb @@ -259,6 +259,14 @@ "epochs_params" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "7c268ab5-a9f8-4da8-ab83-37aad729000e", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "markdown", "id": "48eb62a2-f9af-45a1-8291-e3c25b6a01a0", @@ -314,7 +322,31 @@ "tags": [] }, "source": [ - "## Start training" + "### Now start a train using the `epochs_params` generated by the Dask workflow.\n", + "\n", + "This example assumes that you have run the Dask notebook example before. If you haven't run it, `epochs_params` will be empty." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "19a4adc3-55c0-4d1e-bb83-a8b48351481e", + "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'epochs_params' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[1], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m epochs_params \u001b[38;5;241m=\u001b[39m epochs_params \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(\u001b[43mepochs_params\u001b[49m) \u001b[38;5;28;01melse\u001b[39;00m {\u001b[38;5;241m1\u001b[39m}\n", + "\u001b[0;31mNameError\u001b[0m: name 'epochs_params' is not defined" + ] + } + ], + "source": [ + "epochs_params = epochs_params if len(epochs_params) else {1}" ] }, { diff --git a/tests/adapters/test_mlflow.py b/tests/adapters/test_mlflow.py index 5993d501..62ea9291 100644 --- a/tests/adapters/test_mlflow.py +++ b/tests/adapters/test_mlflow.py @@ -1,6 +1,6 @@ import unittest from time import sleep - +import numpy as np from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept import MLFlowInterceptor, Flowcept from flowcept.commons.utils import ( @@ -15,7 +15,7 @@ def __init__(self, *args, **kwargs): self.interceptor = MLFlowInterceptor() self.logger = FlowceptLogger() - def test_pure_run_mlflow(self): + def test_pure_run_mlflow(self, epochs=10, batch_size=64): import uuid import mlflow @@ -27,11 +27,11 @@ def test_pure_run_mlflow(self): experiment_name + str(uuid.uuid4()) ) with mlflow.start_run(experiment_id=experiment_id) as run: - mlflow.log_params({"number_epochs": 10}) - mlflow.log_params({"batch_size": 64}) - + mlflow.log_params({"number_epochs": epochs}) + mlflow.log_params({"batch_size": batch_size}) + # Actual training code would come here self.logger.debug("\nTrained model") - mlflow.log_metric("loss", 0.04) + mlflow.log_metric("loss", np.random.random()) return run.info.run_uuid @@ -61,11 +61,6 @@ def test_check_state_manager(self): self.interceptor.state_manager.add_element_id(run_uuid) def test_observer_and_consumption(self): - # if os.path.exists(self.interceptor.settings.file_path): - # os.remove(self.interceptor.settings.file_path) - # - # with open(self.interceptor.settings.file_path, 'w+') as f: - # f.write("") with Flowcept(self.interceptor): run_uuid = self.test_pure_run_mlflow() @@ -79,6 +74,24 @@ def test_observer_and_consumption(self): {"task_id": run_uuid}, ) + @unittest.skip("Skipping this test as we need to debug it further.") + def test_multiple_tasks(self): + + run_ids = [] + with Flowcept(self.interceptor): + for i in range(1, 10): + run_ids.append(self.test_pure_run_mlflow(epochs=i*10, batch_size=i*2)) + sleep(3) + + for run_id in run_ids: + # assert evaluate_until( + # lambda: self.interceptor.state_manager.has_element_id(run_id), + # ) + + assert assert_by_querying_tasks_until( + {"task_id": run_id}, max_trials=60, max_time=120, + ) + if __name__ == "__main__": unittest.main() From e664cad6a156ce231718f7961f29492cf161476d Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 27 Sep 2024 15:45:50 -0400 Subject: [PATCH 4/9] Changes to facilitate turn off/on instrum. --- flowcept/configs.py | 11 ++++++----- flowcept/flowcept_api/flowcept_controller.py | 16 ++++++++++------ .../instrumentation/decorators/flowcept_task.py | 4 ++-- resources/sample_settings.yaml | 1 + 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/flowcept/configs.py b/flowcept/configs.py index 90b995fe..4c891dc1 100644 --- a/flowcept/configs.py +++ b/flowcept/configs.py @@ -126,9 +126,6 @@ TELEMETRY_CAPTURE = settings["project"].get("telemetry_capture", None) REGISTER_WORKFLOW = settings["project"].get("register_workflow", True) -REGISTER_INSTRUMENTED_TASKS = settings["project"].get( - "register_instrumented_tasks", True -) ################################## # GPU TELEMETRY CAPTURE SETTINGS # @@ -247,8 +244,9 @@ # Web Server # ###################### -WEBSERVER_HOST = settings["web_server"].get("host", "0.0.0.0") -WEBSERVER_PORT = int(settings["web_server"].get("port", "5000")) +_webserver_settings = settings.get("web_server", {}) +WEBSERVER_HOST = _webserver_settings.get("host", "0.0.0.0") +WEBSERVER_PORT = int(_webserver_settings.get("port", 5000)) ###################### # ANALYTICS # @@ -260,6 +258,9 @@ #### INSTRUMENTATION = settings.get("instrumentation", None) +INSTRUMENTATION_ENABLED = False +if INSTRUMENTATION: + INSTRUMENTATION_ENABLED = INSTRUMENTATION.get("enabled", False) ################# Enabled ADAPTERS diff --git a/flowcept/flowcept_api/flowcept_controller.py b/flowcept/flowcept_api/flowcept_controller.py index 4cacd3c8..1e870546 100644 --- a/flowcept/flowcept_api/flowcept_controller.py +++ b/flowcept/flowcept_api/flowcept_controller.py @@ -9,7 +9,8 @@ from flowcept.commons import logger from flowcept.commons.daos.document_db_dao import DocumentDBDao from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao -from flowcept.configs import MQ_INSTANCES +from flowcept.configs import MQ_INSTANCES, INSTRUMENTATION, \ + INSTRUMENTATION_ENABLED from flowcept.flowcept_api.db_api import DBAPI from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger @@ -53,10 +54,15 @@ def __init__( self._bundle_exec_id = id(self) else: self._bundle_exec_id = bundle_exec_id + self.enabled = True + self.is_started = False if isinstance(interceptors, str): self._interceptors = None else: if interceptors is None: + if not INSTRUMENTATION_ENABLED: + self.enabled = False + return interceptors = [ flowcept.instrumentation.decorators.instrumentation_interceptor ] @@ -68,11 +74,9 @@ def __init__( self.workflow_name = workflow_name self.workflow_args = workflow_args - self.is_started = False - def start(self): - if self.is_started: - self.logger.warning("Consumer is already started!") + if self.is_started or not self.enabled: + self.logger.warning("Consumer may be already started or instrumentation is not set") return self if self._interceptors and len(self._interceptors): @@ -129,7 +133,7 @@ def start(self): return self def stop(self): - if not self.is_started: + if not self.is_started or not self.enabled: self.logger.warning("Consumer is already stopped!") return sleep_time = 1 diff --git a/flowcept/instrumentation/decorators/flowcept_task.py b/flowcept/instrumentation/decorators/flowcept_task.py index 92554509..d538de18 100644 --- a/flowcept/instrumentation/decorators/flowcept_task.py +++ b/flowcept/instrumentation/decorators/flowcept_task.py @@ -11,7 +11,7 @@ from flowcept.commons.utils import replace_non_serializable from flowcept.configs import ( REPLACE_NON_JSON_SERIALIZABLE, - REGISTER_INSTRUMENTED_TASKS, + INSTRUMENTATION_ENABLED, ) @@ -120,7 +120,7 @@ def flowcept_task(func=None, **decorator_kwargs): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): - if not REGISTER_INSTRUMENTED_TASKS: + if not INSTRUMENTATION_ENABLED: return func(*args, **kwargs) args_handler = decorator_kwargs.get( diff --git a/resources/sample_settings.yaml b/resources/sample_settings.yaml index 5352a4e0..9c64ab46 100644 --- a/resources/sample_settings.yaml +++ b/resources/sample_settings.yaml @@ -18,6 +18,7 @@ project: machine_info: true instrumentation: + enabled: true torch: mode: telemetry_and_tensor_inspection # tensor_inspection, telemetry, telemetry_and_tensor_inspection, full, ~ save_models: True From 5d5a2b568073d1536dfec4f3f240aa9df0caaf91 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 27 Sep 2024 15:46:11 -0400 Subject: [PATCH 5/9] Making pandas and other deps more flexible --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 951e1f44..2b54d0b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,9 +2,10 @@ redis==4.4.2 psutil==5.9.5 py-cpuinfo==9.0.0 pymongo==4.3.3 -pandas==2.0.3 +pandas omegaconf flask requests flask_restful Werkzeug +msgpack From 66472a9c7e5935072cd48036ca17655a0f574dc6 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 27 Sep 2024 21:45:41 -0400 Subject: [PATCH 6/9] Small changes in configs.py --- .gitignore | 1 + README.md | 3 +-- extra_requirements/amd-requirements.txt | 10 +++++++++- flowcept/configs.py | 8 ++++---- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 78fde49f..f322f23c 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ test.py **/*dump* time.txt tmp/ +deployment/data diff --git a/README.md b/README.md index 01439189..2e6e0391 100644 --- a/README.md +++ b/README.md @@ -100,8 +100,7 @@ If you are doing extensive performance evaluation experiments using this softwar ## Install AMD GPU Lib On the machines that have AMD GPUs, we use the official AMD ROCM library to capture GPU runtime data. -Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions -in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/ +See [AMD Requirements](extra_requirements/amd-requirements.txt). ## See also diff --git a/extra_requirements/amd-requirements.txt b/extra_requirements/amd-requirements.txt index 73030b11..4b6c7b79 100644 --- a/extra_requirements/amd-requirements.txt +++ b/extra_requirements/amd-requirements.txt @@ -1 +1,9 @@ -# On the machines that have AMD GPUs, we use the official AMD ROCM library to capture GPU runtime data. Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/ \ No newline at end of file +# On the machines that have AMD GPUs, we use the official AMD ROCM library to capture GPU runtime data. Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/ + +# Here is a summary: + +# 1. Install the AMD drivers on the machine (check if they are available already under `/opt/rocm-*`). +# 2. Suppose it is /opt/rocm-5.7.1. Then, make sure it has a share/amd_smi subdirectory and pyproject.toml or setup.py in it. +# 3. Copy the amd_smi to your home directory: `cp -r /opt/rocm-5.7.1/share/amd_smi ~` +# 4. cd ~/amd_smi +# 5. In your python environment, do a pip install . diff --git a/flowcept/configs.py b/flowcept/configs.py index 4c891dc1..82db4165 100644 --- a/flowcept/configs.py +++ b/flowcept/configs.py @@ -45,7 +45,7 @@ ########################## FLOWCEPT_USER = settings["experiment"].get("user", "blank_user") -CAMPAIGN_ID = settings["experiment"].get("campaign_id", "super_campaign") +CAMPAIGN_ID = settings["experiment"].get("campaign_id", os.environ.get("CAMPAIGN_ID", "super_campaign")) ###################### # MQ Settings # @@ -81,9 +81,9 @@ ###################### # MongoDB Settings # ###################### -MONGO_URI = settings["mongodb"].get("uri", None) -MONGO_HOST = settings["mongodb"].get("host", "localhost") -MONGO_PORT = int(settings["mongodb"].get("port", "27017")) +MONGO_URI = settings["mongodb"].get("uri", os.environ.get("MONGO_URI", None)) +MONGO_HOST = settings["mongodb"].get("host", os.environ.get("MONGO_HOST", "localhost")) +MONGO_PORT = int(settings["mongodb"].get("port", os.environ.get("MONGO_PORT", "27017"))) MONGO_DB = settings["mongodb"].get("db", PROJECT_NAME) MONGO_CREATE_INDEX = settings["mongodb"].get("create_collection_index", True) From 5463638dea2f3398203edf61eb1f00c26f9c4240 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 15 Oct 2024 15:01:06 -0400 Subject: [PATCH 7/9] Minor updates before merge --- extra_requirements/amd-requirements.txt | 7 +++++-- flowcept/commons/daos/document_db_dao.py | 1 - tests/api/flowcept_api_test.py | 18 +++++++++++++++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/extra_requirements/amd-requirements.txt b/extra_requirements/amd-requirements.txt index 4b6c7b79..2b39d7de 100644 --- a/extra_requirements/amd-requirements.txt +++ b/extra_requirements/amd-requirements.txt @@ -3,7 +3,10 @@ # Here is a summary: # 1. Install the AMD drivers on the machine (check if they are available already under `/opt/rocm-*`). -# 2. Suppose it is /opt/rocm-5.7.1. Then, make sure it has a share/amd_smi subdirectory and pyproject.toml or setup.py in it. -# 3. Copy the amd_smi to your home directory: `cp -r /opt/rocm-5.7.1/share/amd_smi ~` +# 2. Suppose it is /opt/rocm-6.2.0. Then, make sure it has a share/amd_smi subdirectory and pyproject.toml or setup.py in it. +# 3. Copy the amd_smi to your home directory: `cp -r /opt/rocm-6.2.0/share/amd_smi ~` # 4. cd ~/amd_smi # 5. In your python environment, do a pip install . + +# Current code is compatible with this version: amdsmi==24.6.2+2b02a07 +# Which was installed using Frontier's /opt/rocm-6.2.0/share/amd_smi diff --git a/flowcept/commons/daos/document_db_dao.py b/flowcept/commons/daos/document_db_dao.py index f7f9c575..910bceed 100644 --- a/flowcept/commons/daos/document_db_dao.py +++ b/flowcept/commons/daos/document_db_dao.py @@ -6,7 +6,6 @@ import pickle import zipfile -import pymongo from bson import ObjectId from bson.json_util import dumps from pymongo import MongoClient, UpdateOne diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index 0a911fad..ff68c933 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -1,4 +1,5 @@ import unittest +from time import sleep from flowcept import ( Flowcept, @@ -17,6 +18,16 @@ def mult_two(n): return n * 2 +@flowcept_task +def sum_one_(x): + return {"y": x + 1} + + +@flowcept_task +def mult_two_(y): + return {"z": y * 2} + + class FlowceptAPITest(unittest.TestCase): def test_simple_workflow(self): assert Flowcept.services_alive() @@ -26,6 +37,7 @@ def test_simple_workflow(self): o1 = sum_one(n) o2 = mult_two(o1) print(o2) + sleep(10) assert assert_by_querying_tasks_until( {"workflow_id": Flowcept.current_workflow_id}, @@ -67,6 +79,6 @@ def test_continuous_run(self): print(Flowcept.current_workflow_id) while True: n = np.random.rand() - o1 = sum_one(n) - o2 = mult_two(o1) - sleep(1) + o1 = sum_one_(x=n) + o2 = mult_two_(**o1) + sleep(10) From 2d8987c41da4c93cc4c03dbf4027a8b8151a5d94 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 15 Oct 2024 15:06:20 -0400 Subject: [PATCH 8/9] Code formatsg --- flowcept/configs.py | 12 +++++++++--- flowcept/flowcept_api/flowcept_controller.py | 11 ++++++++--- tests/adapters/test_mlflow.py | 10 ++++++---- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/flowcept/configs.py b/flowcept/configs.py index 82db4165..b872af98 100644 --- a/flowcept/configs.py +++ b/flowcept/configs.py @@ -45,7 +45,9 @@ ########################## FLOWCEPT_USER = settings["experiment"].get("user", "blank_user") -CAMPAIGN_ID = settings["experiment"].get("campaign_id", os.environ.get("CAMPAIGN_ID", "super_campaign")) +CAMPAIGN_ID = settings["experiment"].get( + "campaign_id", os.environ.get("CAMPAIGN_ID", "super_campaign") +) ###################### # MQ Settings # @@ -82,8 +84,12 @@ # MongoDB Settings # ###################### MONGO_URI = settings["mongodb"].get("uri", os.environ.get("MONGO_URI", None)) -MONGO_HOST = settings["mongodb"].get("host", os.environ.get("MONGO_HOST", "localhost")) -MONGO_PORT = int(settings["mongodb"].get("port", os.environ.get("MONGO_PORT", "27017"))) +MONGO_HOST = settings["mongodb"].get( + "host", os.environ.get("MONGO_HOST", "localhost") +) +MONGO_PORT = int( + settings["mongodb"].get("port", os.environ.get("MONGO_PORT", "27017")) +) MONGO_DB = settings["mongodb"].get("db", PROJECT_NAME) MONGO_CREATE_INDEX = settings["mongodb"].get("create_collection_index", True) diff --git a/flowcept/flowcept_api/flowcept_controller.py b/flowcept/flowcept_api/flowcept_controller.py index 1e870546..a348d910 100644 --- a/flowcept/flowcept_api/flowcept_controller.py +++ b/flowcept/flowcept_api/flowcept_controller.py @@ -9,8 +9,11 @@ from flowcept.commons import logger from flowcept.commons.daos.document_db_dao import DocumentDBDao from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao -from flowcept.configs import MQ_INSTANCES, INSTRUMENTATION, \ - INSTRUMENTATION_ENABLED +from flowcept.configs import ( + MQ_INSTANCES, + INSTRUMENTATION, + INSTRUMENTATION_ENABLED, +) from flowcept.flowcept_api.db_api import DBAPI from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger @@ -76,7 +79,9 @@ def __init__( def start(self): if self.is_started or not self.enabled: - self.logger.warning("Consumer may be already started or instrumentation is not set") + self.logger.warning( + "Consumer may be already started or instrumentation is not set" + ) return self if self._interceptors and len(self._interceptors): diff --git a/tests/adapters/test_mlflow.py b/tests/adapters/test_mlflow.py index 62ea9291..3be4ffc9 100644 --- a/tests/adapters/test_mlflow.py +++ b/tests/adapters/test_mlflow.py @@ -61,7 +61,6 @@ def test_check_state_manager(self): self.interceptor.state_manager.add_element_id(run_uuid) def test_observer_and_consumption(self): - with Flowcept(self.interceptor): run_uuid = self.test_pure_run_mlflow() # sleep(3) @@ -76,11 +75,12 @@ def test_observer_and_consumption(self): @unittest.skip("Skipping this test as we need to debug it further.") def test_multiple_tasks(self): - run_ids = [] with Flowcept(self.interceptor): for i in range(1, 10): - run_ids.append(self.test_pure_run_mlflow(epochs=i*10, batch_size=i*2)) + run_ids.append( + self.test_pure_run_mlflow(epochs=i * 10, batch_size=i * 2) + ) sleep(3) for run_id in run_ids: @@ -89,7 +89,9 @@ def test_multiple_tasks(self): # ) assert assert_by_querying_tasks_until( - {"task_id": run_id}, max_trials=60, max_time=120, + {"task_id": run_id}, + max_trials=60, + max_time=120, ) From 1009824172e9259ebc678e87f00fe5c5613f8668 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 15 Oct 2024 15:47:13 -0400 Subject: [PATCH 9/9] Adding AMD info to README.md --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2e6e0391..e18f1108 100644 --- a/README.md +++ b/README.md @@ -100,8 +100,19 @@ If you are doing extensive performance evaluation experiments using this softwar ## Install AMD GPU Lib On the machines that have AMD GPUs, we use the official AMD ROCM library to capture GPU runtime data. -See [AMD Requirements](extra_requirements/amd-requirements.txt). +Unfortunately, this library is not available as a pypi/conda package, so you must manually install it. See instructions in the link: https://rocm.docs.amd.com/projects/amdsmi/en/latest/ + +Here is a summary: + +1. Install the AMD drivers on the machine (check if they are available already under `/opt/rocm-*`). +2. Suppose it is /opt/rocm-6.2.0. Then, make sure it has a share/amd_smi subdirectory and pyproject.toml or setup.py in it. +3. Copy the amd_smi to your home directory: `cp -r /opt/rocm-6.2.0/share/amd_smi ~` +4. cd ~/amd_smi +5. In your python environment, do a pip install . + +Current code is compatible with this version: amdsmi==24.6.2+2b02a07 +Which was installed using Frontier's /opt/rocm-6.2.0/share/amd_smi ## See also