diff --git a/.github/workflows/create-release-n-publish.yml b/.github/workflows/create-release-n-publish.yml index dc8bef4c..bf674c6b 100644 --- a/.github/workflows/create-release-n-publish.yml +++ b/.github/workflows/create-release-n-publish.yml @@ -2,7 +2,7 @@ name: Release and Publish on: push: # It has to be push, otherwise error happens in code below. branches: [ "main" ] - # branches: [ "main", "dev" ] # use this only to test the CI + # branches: [ "main", "dev" ] # use this only to test the CI. If testing this CI, consider commenting out the automatic version updates and manually adjust the patch version. #branches: [ "disabled" ] jobs: build: @@ -24,7 +24,7 @@ jobs: - name: Update version.py run: | export PYTHONPATH=$PYTHONPATH:flowcept - export BRANCH_NAME="${{ steps.branch-name.outputs.current_branch }}" + export BRANCH_NAME="${{ steps.branch-name.outputs.current_branch }}" python .github/workflows/version_bumper.py - name: Commit new version run: | @@ -108,15 +108,10 @@ jobs: - name: Run Docker Compose run: docker compose -f deployment/compose.yml up -d - name: Test with pytest - run: | - mkdir -p ~/.flowcept - cp resources/sample_settings.yaml ~/.flowcept/settings.yaml - export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml - cat $FLOWCEPT_SETTINGS_PATH - pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + run: pytest --ignore=tests/decorator_tests/ml_tests/llm_tests - name: Test notebooks run: | - export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml - python flowcept/flowcept_webserver/app.py & + # export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml + python src/flowcept/flowcept_webserver/app.py & sleep 3 pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb diff --git a/.github/workflows/run-tests-kafka.yml b/.github/workflows/run-tests-kafka.yml index c5c044f4..fa33b7e1 100644 --- a/.github/workflows/run-tests-kafka.yml +++ b/.github/workflows/run-tests-kafka.yml @@ -20,11 +20,11 @@ jobs: python-version: "3.10" cache: "pip" - - name: Copy settings file - run: | - mkdir ~/.flowcept - cp resources/sample_settings.yaml ~/.flowcept - mv ~/.flowcept/sample_settings.yaml ~/.flowcept/settings.yaml +# - name: Copy settings file +# run: | +# mkdir ~/.flowcept +# cp resources/sample_settings.yaml ~/.flowcept +# mv ~/.flowcept/sample_settings.yaml ~/.flowcept/settings.yaml - name: Install package and dependencies run: | @@ -58,7 +58,7 @@ jobs: python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_PORT={MQ_PORT}")' python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' - python flowcept/flowcept_webserver/app.py & + python src/flowcept/flowcept_webserver/app.py & sleep 3 - export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml + # export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index e821a028..e0c37869 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -20,13 +20,24 @@ jobs: - name: Show OS Info run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' - - name: Copy settings file - run: | - mkdir ~/.flowcept - cp resources/sample_settings.yaml ~/.flowcept - mv ~/.flowcept/sample_settings.yaml ~/.flowcept/settings.yaml +# - name: Copy settings file +# run: | +# mkdir ~/.flowcept +# cp resources/sample_settings.yaml ~/.flowcept +# mv ~/.flowcept/sample_settings.yaml ~/.flowcept/settings.yaml + + - name: Start docker compose with redis + run: docker compose -f deployment/compose.yml up -d + + - name: Upgrade pip + run: python -m pip install --upgrade pip - - name: Install package and dependencies + - name: Install default dependencies and run simple test + run: | + pip install . + python examples/instrumentation/simple_script.py + + - name: Install all dependencies run: | python -m pip install --upgrade pip python -m pip install .[all] @@ -34,9 +45,6 @@ jobs: - name: List installed packages run: pip list - - name: Start docker compose with redis - run: docker compose -f deployment/compose.yml up -d - - name: Test with pytest and redis run: | pytest --ignore=tests/decorator_tests/ml_tests/llm_tests @@ -45,7 +53,7 @@ jobs: run: | python src/flowcept/flowcept_webserver/app.py & sleep 3 - export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml + # export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb - name: Shut down docker compose diff --git a/README.md b/README.md index e1348200..813c8a6d 100644 --- a/README.md +++ b/README.md @@ -51,18 +51,19 @@ pip install flowcept[dev] # To install dev dependencies. You do not need to install any optional dependency to run Flowcept without any adapter, e.g., if you want to use simple instrumentation (see below). In this case, you need to remove the adapter part from the [settings.yaml](resources/settings.yaml) file. -2. Start MongoDB and Redis: +2. Start the Database and MQ System: -To enable the full advantages of FlowCept, one needs to start a Redis and MongoDB instances. -FlowCept uses Redis as its message queue system and Mongo for its persistent database. -For convenience, we set up a [docker-compose file](deployment/compose.yml) deployment file for this. Run `docker-compose -f deployment/compose.yml up`. +To use FlowCept, one needs to start a database and a MQ system. Currently, FlowCept supports MongoDB as its database and it supports both Redis and Kafka as the MQ system. -3. Define the settings (e.g., routes and ports) accordingly in the [settings.yaml](resources/settings.yaml) file. -You may need to set the environment variable `FLOWCEPT_SETTINGS_PATH` with the absolute path to the settings file. +For convenience, the default needed services can be started using a [docker-compose file](deployment/compose.yml) deployment file. +You can start them using `$> docker-compose -f deployment/compose.yml up`. -4. Start the observation using the Controller API, as shown in the [Jupyter Notebooks](notebooks). +3. Optionally, define custom settings (e.g., routes and ports) accordingly in a settings.yaml file. There is a sample file [here](resources/sample_settings.yaml), which can be used as basis. +Then, set an environment var `FLOWCEPT_SETTINGS_PATH` with the absolute path to the yaml file. +If you do not follow this step, the default values defined [here](resources/sample_settings.yaml) will be used. + +4. See the [Jupyter Notebooks](notebooks) and [Examples directory](examples) for utilization examples. -5. To use FlowCept's Query API, see utilization examples in the notebooks. ### Simple Example with Decorators Instrumentation @@ -90,8 +91,6 @@ with Flowcept(workflow_name='test_workflow'): print(Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id})) ``` - - ## Performance Tuning for Performance Evaluation In the settings.yaml file, the following variables might impact interception performance: diff --git a/examples/instrumentation/simple_script.py b/examples/instrumentation/simple_script.py new file mode 100644 index 00000000..8c25838b --- /dev/null +++ b/examples/instrumentation/simple_script.py @@ -0,0 +1,20 @@ +from flowcept import Flowcept, flowcept_task + +@flowcept_task +def sum_one(n): + return n + 1 + + +@flowcept_task +def mult_two(n): + return n * 2 + + +with Flowcept(workflow_name='test_workflow'): + n = 3 + o1 = sum_one(n) + o2 = mult_two(o1) + print(o2) + +print(Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id})) + diff --git a/pyproject.toml b/pyproject.toml index c78cc9e8..c58ce844 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,3 +102,9 @@ ignore = ["D200", "D212"] [tool.ruff.lint.pydocstyle] convention = "numpy" + +[tool.hatch.build.targets.wheel] +packages = ["src/flowcept"] + +[tool.hatch.build.targets.wheel.force-include] +"resources/sample_settings.yaml" = "resources/sample_settings.yaml" diff --git a/src/flowcept/__init__.py b/src/flowcept/__init__.py index a8dc26f7..a12fa51b 100644 --- a/src/flowcept/__init__.py +++ b/src/flowcept/__init__.py @@ -1,73 +1,52 @@ """Flowcept package.""" -import flowcept - from flowcept.configs import SETTINGS_PATH - from flowcept.version import __version__ - -from flowcept.commons.vocabulary import Vocabulary - - from flowcept.flowcept_api.flowcept_controller import Flowcept -from flowcept.flowcept_api.task_query_api import TaskQueryAPI from flowcept.instrumentation.decorators.flowcept_task import flowcept_task from flowcept.commons.flowcept_dataclasses.workflow_object import ( WorkflowObject, ) -# These resp_ai imports below are adding long wait in flowcept imports! -# try: -# from flowcept.instrumentation.decorators.responsible_ai import ( -# #model_explainer, -# #model_profiler, -# ) -# except: -# pass -if Vocabulary.Settings.ZAMBEZE_KIND in flowcept.configs.ADAPTERS: - try: - from flowcept.flowceptor.adapters.zambeze.zambeze_interceptor import ( - ZambezeInterceptor, +def __getattr__(name): + if name == "MLFlowInterceptor": + from flowcept.flowceptor.adapters.mlflow.mlflow_interceptor import ( + MLFlowInterceptor, ) - except Exception as _exp: - flowcept.commons.logger.error( - flowcept.commons.get_adapter_exception_msg(Vocabulary.Settings.ZAMBEZE_KIND) + + return MLFlowInterceptor + elif name == "FlowceptDaskSchedulerAdapter": + from flowcept.flowceptor.adapters.dask.dask_plugins import ( + FlowceptDaskSchedulerAdapter, + ) + + return FlowceptDaskSchedulerAdapter + elif name == "FlowceptDaskWorkerAdapter": + from flowcept.flowceptor.adapters.dask.dask_plugins import ( + FlowceptDaskWorkerAdapter, ) - flowcept.commons.logger.exception(_exp) -if Vocabulary.Settings.TENSORBOARD_KIND in flowcept.configs.ADAPTERS: - try: + return FlowceptDaskWorkerAdapter + elif name == "TensorboardInterceptor": from flowcept.flowceptor.adapters.tensorboard.tensorboard_interceptor import ( TensorboardInterceptor, ) - except Exception as _exp: - flowcept.commons.logger.error( - flowcept.commons.get_adapter_exception_msg(Vocabulary.Settings.TENSORBOARD_KIND) - ) - flowcept.commons.logger.exception(_exp) -if Vocabulary.Settings.MLFLOW_KIND in flowcept.configs.ADAPTERS: - try: - from flowcept.flowceptor.adapters.mlflow.mlflow_interceptor import ( - MLFlowInterceptor, - ) - except Exception as _exp: - flowcept.commons.logger.error( - flowcept.commons.get_adapter_exception_msg(Vocabulary.Settings.MLFLOW_KIND) + return TensorboardInterceptor + elif name == "ZambezeInterceptor": + from flowcept.flowceptor.adapters.zambeze.zambeze_interceptor import ( + ZambezeInterceptor, ) - flowcept.commons.logger.exception(_exp) -if Vocabulary.Settings.DASK_KIND in flowcept.configs.ADAPTERS: - try: - from flowcept.flowceptor.adapters.dask.dask_plugins import ( - FlowceptDaskSchedulerAdapter, - FlowceptDaskWorkerAdapter, - ) - except Exception as _exp: - flowcept.commons.get_adapter_exception_msg(Vocabulary.Settings.DASK_KIND) - flowcept.commons.logger.exception(_exp) + return ZambezeInterceptor + elif name == "TaskQueryAPI": + from flowcept.flowcept_api.task_query_api import TaskQueryAPI + + return TaskQueryAPI + raise AttributeError(f"module '{__name__}' has no attribute '{name}'") + __all__ = [ "FlowceptDaskWorkerAdapter", @@ -75,9 +54,9 @@ "MLFlowInterceptor", "TensorboardInterceptor", "ZambezeInterceptor", + "TaskQueryAPI", "WorkflowObject", "flowcept_task", - "TaskQueryAPI", "Flowcept", "__version__", "SETTINGS_PATH", diff --git a/src/flowcept/commons/utils.py b/src/flowcept/commons/utils.py index 87247cb7..da3ff80c 100644 --- a/src/flowcept/commons/utils.py +++ b/src/flowcept/commons/utils.py @@ -7,15 +7,15 @@ import os import platform import subprocess - +import types import numpy as np import flowcept.commons +from flowcept import configs from flowcept.configs import ( PERF_LOG, SETTINGS_PATH, ) -from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.flowcept_dataclasses.task_object import Status @@ -47,8 +47,7 @@ def perf_log(func_name, t0: float): """Configure the performance log.""" if PERF_LOG: t1 = time() - logger = FlowceptLogger() - logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}") + flowcept.commons.logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}") return t1 return None @@ -223,6 +222,18 @@ def get_gpu_vendor(): return None +def get_current_config_values(): + """Get current config values.""" + _vars = {} + for var_name in dir(configs): + if not var_name.startswith("_"): + val = getattr(configs, var_name) + if not isinstance(val, types.ModuleType): + _vars[var_name] = val + _vars["ADAPTERS"] = list(_vars.get("ADAPTERS", [])) + return _vars + + class GenericJSONDecoder(json.JSONDecoder): """JSON decoder class.""" diff --git a/src/flowcept/configs.py b/src/flowcept/configs.py index 6b288614..d3f29c8c 100644 --- a/src/flowcept/configs.py +++ b/src/flowcept/configs.py @@ -12,31 +12,28 @@ ######################## PROJECT_NAME = os.getenv("PROJECT_NAME", "flowcept") -SETTINGS_PATH = os.getenv("FLOWCEPT_SETTINGS_PATH", None) -SETTINGS_DIR = os.path.expanduser(f"~/.{PROJECT_NAME}") -if SETTINGS_PATH is None: - SETTINGS_PATH = os.path.join(SETTINGS_DIR, "settings.yaml") +_SETTINGS_DIR = os.path.expanduser(f"~/.{PROJECT_NAME}") +SETTINGS_PATH = os.getenv("FLOWCEPT_SETTINGS_PATH", f"{_SETTINGS_DIR}/settings.yaml") if not os.path.exists(SETTINGS_PATH): - raise Exception( - f"Settings file {SETTINGS_PATH} was not found. " - f"You should either define the " - f"environment variable FLOWCEPT_SETTINGS_PATH with its path or " - f"install Flowcept's package to create the directory " - f"~/.flowcept with the file in it.\n" - "A sample settings file is found in the 'resources' directory " - "under the project's root path." - ) - -settings = OmegaConf.load(SETTINGS_PATH) + SETTINGS_PATH = None + import importlib.resources + + with importlib.resources.open_text("resources", "sample_settings.yaml") as f: + settings = OmegaConf.load(f) + +else: + settings = OmegaConf.load(SETTINGS_PATH) + ######################## # Log Settings # ######################## + LOG_FILE_PATH = settings["log"].get("log_path", "default") if LOG_FILE_PATH == "default": - LOG_FILE_PATH = os.path.join(SETTINGS_DIR, f"{PROJECT_NAME}.log") + LOG_FILE_PATH = f"{PROJECT_NAME}.log" # Possible values below are the typical python logging levels. LOG_FILE_LEVEL = settings["log"].get("log_file_level", "debug").upper() @@ -54,6 +51,7 @@ ###################### # MQ Settings # ###################### + MQ_URI = settings["mq"].get("uri", None) MQ_INSTANCES = settings["mq"].get("instances", None) @@ -83,6 +81,7 @@ ###################### # MongoDB Settings # ###################### + MONGO_URI = settings["mongodb"].get("uri", os.environ.get("MONGO_URI", None)) MONGO_HOST = settings["mongodb"].get("host", os.environ.get("MONGO_HOST", "localhost")) MONGO_PORT = int(settings["mongodb"].get("port", os.environ.get("MONGO_PORT", "27017"))) @@ -226,7 +225,7 @@ ###################### # Web Server # ###################### - +settings.setdefault("web_server", {}) _webserver_settings = settings.get("web_server", {}) WEBSERVER_HOST = _webserver_settings.get("host", "0.0.0.0") WEBSERVER_PORT = int(_webserver_settings.get("port", 5000)) @@ -237,16 +236,16 @@ ANALYTICS = settings.get("analytics", None) +#################### +# INSTRUMENTATION # +#################### -#### - -INSTRUMENTATION = settings.get("instrumentation", None) -INSTRUMENTATION_ENABLED = False -if INSTRUMENTATION: - INSTRUMENTATION_ENABLED = INSTRUMENTATION.get("enabled", False) - -################# Enabled ADAPTERS +INSTRUMENTATION = settings.get("instrumentation", {}) +INSTRUMENTATION_ENABLED = INSTRUMENTATION.get("enabled", False) +#################### +# Enabled ADAPTERS # +#################### ADAPTERS = set() for adapter in settings.get("adapters", set()): diff --git a/src/flowcept/instrumentation/decorators/flowcept_torch.py b/src/flowcept/instrumentation/decorators/flowcept_torch.py index 7c779526..f8224c91 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_torch.py +++ b/src/flowcept/instrumentation/decorators/flowcept_torch.py @@ -23,11 +23,6 @@ ) -# class FrequencyCount: -# counter = 0 -# MAX = INSTRUMENTATION["torch"]["max_frequency"] - - def _inspect_torch_tensor(tensor: torch.Tensor): _id = id(tensor) tensor_inspection = {"id": _id} @@ -58,7 +53,7 @@ def _inspect_torch_tensor(tensor: torch.Tensor): def full_torch_task(func=None): - """Get pytorch task.""" + """Generate pytorch task.""" def decorator(func): @wraps(func) @@ -215,8 +210,12 @@ def wrapper(*args, **kwargs): def torch_task(): - """Get the torch task.""" - mode = INSTRUMENTATION["torch"]["mode"] + """Pick the torch_task function.""" + torch_instrumentation = INSTRUMENTATION.get("torch") + if torch_instrumentation is None: + return lambda _: _ + + mode = torch_instrumentation.get("mode", None) if mode is None: return lambda _: _ if "telemetry" in mode and TELEMETRY_CAPTURE is None: diff --git a/src/flowcept/version.py b/src/flowcept/version.py index 7303c4fe..3330f7b6 100644 --- a/src/flowcept/version.py +++ b/src/flowcept/version.py @@ -4,4 +4,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.6.7" +__version__ = "0.6.8" diff --git a/tests/adapters/test_dask.py b/tests/adapters/test_dask.py index ee0c2b78..30624486 100644 --- a/tests/adapters/test_dask.py +++ b/tests/adapters/test_dask.py @@ -5,7 +5,7 @@ from dask.distributed import Client, LocalCluster -from flowcept import Flowcept, TaskQueryAPI +from flowcept import Flowcept from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import ( assert_by_querying_tasks_until, diff --git a/tests/adapters/test_mlflow.py b/tests/adapters/test_mlflow.py index e0ea7990..73abd3e3 100644 --- a/tests/adapters/test_mlflow.py +++ b/tests/adapters/test_mlflow.py @@ -6,7 +6,8 @@ import mlflow from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept import MLFlowInterceptor, Flowcept +from flowcept import MLFlowInterceptor +from flowcept import Flowcept from flowcept.commons.utils import ( assert_by_querying_tasks_until, evaluate_until, @@ -25,7 +26,8 @@ def setUpClass(cls): TestMLFlow.interceptor = MLFlowInterceptor() if os.path.exists(TestMLFlow.interceptor.settings.file_path): os.remove(TestMLFlow.interceptor.settings.file_path) - open(TestMLFlow.interceptor.settings.file_path, "w") + with open(TestMLFlow.interceptor.settings.file_path, "w") as f: + f.write("") sleep(1) mlflow.set_tracking_uri(f"sqlite:///{TestMLFlow.interceptor.settings.file_path}") mlflow.delete_experiment(mlflow.create_experiment("starter")) diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index b94b835c..2127418a 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -4,7 +4,7 @@ Flowcept, flowcept_task, ) -from flowcept.commons.utils import assert_by_querying_tasks_until +from flowcept.commons.utils import assert_by_querying_tasks_until, get_current_config_values @flowcept_task @@ -28,6 +28,11 @@ def mult_two_(y): class FlowceptAPITest(unittest.TestCase): + + def test_configs(self): + current_configs = get_current_config_values() + assert "LOG_FILE_PATH" in current_configs + def test_simple_workflow(self): assert Flowcept.services_alive()