Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements in JSON serialization #195

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ services-stop-mongo:
build:
bash deployment/build-image.sh

# To use run, you must run make services first.
run:
docker run --rm -v $(shell pwd):/flowcept -e KVDB_HOST=flowcept_redis -e MQ_HOST=flowcept_redis -e MONGO_HOST=flowcept_mongo --network flowcept_default -it flowcept

Expand Down
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
[![Code Formatting](https://github.com/ORNL/flowcept/actions/workflows/run-checks.yml/badge.svg)](https://github.com/ORNL/flowcept/actions/workflows/run-checks.yml)
[![License: MIT](https://img.shields.io/github/license/ORNL/flowcept)](LICENSE)

# FlowCept
# Flowcept

FlowCept is a runtime data integration system that captures and queries workflow provenance with minimal or no code changes. It unifies data across diverse workflows and tools, enabling integrated analysis and insights, especially in federated environments. Designed for scenarios involving critical data from multiple workflows, FlowCept seamlessly integrates data at runtime, providing a unified view for end-to-end monitoring and analysis and enhanced support for Machine Learning (ML) workflows.
Flowcept is a runtime data integration system that captures and queries workflow provenance with minimal or no code changes. It unifies data across diverse workflows and tools, enabling integrated analysis and insights, especially in federated environments. Designed for scenarios involving critical data from multiple workflows, Flowcept seamlessly integrates data at runtime, providing a unified view for end-to-end monitoring and analysis and enhanced support for Machine Learning (ML) workflows.

Other capabilities include:

Expand Down Expand Up @@ -41,37 +41,37 @@ Refer to [Contributing](CONTRIBUTING.md) for adding new adapters. Note: The term

## Install and Setup:

1. Install FlowCept:
1. Install Flowcept:

`pip install .[all]` in this directory (or `pip install flowcept[all]`) if you want to install all dependencies.

For convenience, this will install all dependencies for all adapters. But it can install dependencies for adapters you will not use. For this reason, you may want to install like this: `pip install .[extra_dep1,extra_dep2]` for the adapters we have implemented, e.g., `pip install .[dask]`.
Currently, the optional dependencies available are:

```
pip install flowcept[mongo] # To install FlowCept with MongoDB
pip install flowcept[mongo] # To install Flowcept with MongoDB
pip install flowcept[mlflow] # To install mlflow's adapter.
pip install flowcept[dask] # To install dask's adapter.
pip install flowcept[tensorboard] # To install tensorboaard's adapter.
pip install flowcept[kafka] # To utilize Kafka as the MQ, instead of Redis.
pip install flowcept[nvidia] # To capture NVIDIA GPU runtime information.
pip install flowcept[analytics] # For extra analytics features.
pip install flowcept[dev] # To install FlowCept's developer dependencies.
pip install flowcept[dev] # To install Flowcept's developer dependencies.
```

You do not need to install any optional dependency to run Flowcept without any adapter, e.g., if you want to use simple instrumentation (see below). In this case, you need to remove the adapter part from the [settings.yaml](resources/settings.yaml) file.

2. Start the MQ System:

To use FlowCept, one needs to start a MQ system `$> make services`. This will start up Redis but see other options in the [deployment](deployment) directory and see [Data Persistence](#data-persistence) notes below.
To use Flowcept, one needs to start a MQ system `$> make services`. This will start up Redis but see other options in the [deployment](deployment) directory and see [Data Persistence](#data-persistence) notes below.

3. Optionally, define custom settings (e.g., routes and ports) accordingly in a settings.yaml file. There is a sample file [here](resources/sample_settings.yaml), which can be used as basis. Then, set an environment variable `FLOWCEPT_SETTINGS_PATH` with the absolute path to the yaml file. If you do not follow this step, the default values defined [here](resources/sample_settings.yaml) will be used.

4. See the [Jupyter Notebooks](notebooks) and [Examples directory](examples) for utilization examples.

## Installing and Running with Docker

To use containers instead of installing FlowCept's dependencies on your host system, we provide a [Dockerfile](deployment/Dockerfile) alongside a [docker-compose.yml](deployment/compose.yml) for dependent services (e.g., Redis, MongoDB).
To use containers instead of installing Flowcept's dependencies on your host system, we provide a [Dockerfile](deployment/Dockerfile) alongside a [docker-compose.yml](deployment/compose.yml) for dependent services (e.g., Redis, MongoDB).

#### Notes:
- As seen in the steps below, there are [Makefile](Makefile) commands to build and run the image. Please use them instead of running the Docker commands to build and run the image.
Expand Down Expand Up @@ -102,7 +102,7 @@ To use containers instead of installing FlowCept's dependencies on your host sys

### Simple Example with Decorators Instrumentation

In addition to existing adapters to Dask, MLFlow, and others (it's extensible for any system that generates data), FlowCept also offers instrumentation via @decorators.
In addition to existing adapters to Dask, MLFlow, and others (it's extensible for any system that generates data), Flowcept also offers instrumentation via @decorators.

```python
from flowcept import Flowcept, flowcept_task
Expand All @@ -128,16 +128,16 @@ print(Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}))

## Data Persistence

FlowCept uses an ephemeral message queue (MQ) with a pub/sub system to flush observed data. For optional data persistence, you can choose between:
Flowcept uses an ephemeral message queue (MQ) with a pub/sub system to flush observed data. For optional data persistence, you can choose between:

- [LMDB](https://lmdb.readthedocs.io/) (default): A lightweight, file-based database requiring no external services (but note it might require `gcc`). Ideal for simple tests or cases needing basic data persistence without query capabilities. Data stored in LMDB can be loaded into tools like Pandas for complex analysis. FlowCept's database API provides methods to export data in LMDB into Pandas DataFrames.
- [MongoDB](https://www.mongodb.com/): A robust, service-based database with advanced query capabilities. Required to use FlowCept's Query API (i.e., `flowcept.Flowcept.db`) to run more complex queries and other features like ML model management or runtime queries (i.e., query while writing). To use MongoDB, initialize the service with `make services-mongo`.
- [LMDB](https://lmdb.readthedocs.io/) (default): A lightweight, file-based database requiring no external services (but note it might require `gcc`). Ideal for simple tests or cases needing basic data persistence without query capabilities. Data stored in LMDB can be loaded into tools like Pandas for complex analysis. Flowcept's database API provides methods to export data in LMDB into Pandas DataFrames.
- [MongoDB](https://www.mongodb.com/): A robust, service-based database with advanced query capabilities. Required to use Flowcept's Query API (i.e., `flowcept.Flowcept.db`) to run more complex queries and other features like ML model management or runtime queries (i.e., query while writing). To use MongoDB, initialize the service with `make services-mongo`.

You can use both of them, meaning that the data pruducers will write data into both, none of them, or each of them. All is customizable in the settings file.

If data persistence is disabled, captured data is sent to the MQ without any default consumer subscribing to persist it. In this case, querying the data requires creating a custom consumer to subscribe to the MQ.

However, for querying, FlowCept Database API uses only one at a time. If both are enabled in the settings file, MongoDB will be used. If none is enable, an error is thrown.
However, for querying, Flowcept Database API uses only one at a time. If both are enabled in the settings file, MongoDB will be used. If none is enable, an error is thrown.

Data stored in MongoDB and LMDB are interchangeable. You can switch between them by transferring data from one to the other as needed.

Expand Down Expand Up @@ -179,11 +179,11 @@ Which was installed using Frontier's /opt/rocm-6.2.0/share/amd_smi

## Torch Dependencies

Some unit tests utilize `torch==2.2.2`, `torchtext=0.17.2`, and `torchvision==0.17.2`. They are only really needed to run some tests and will be installed if you run `pip install flowcept[ml_dev]` or `pip install flowcept[all]`. If you want to use FlowCept with Torch, please adapt torch dependencies according to your project's dependencies.
Some unit tests utilize `torch==2.2.2`, `torchtext=0.17.2`, and `torchvision==0.17.2`. They are only really needed to run some tests and will be installed if you run `pip install flowcept[ml_dev]` or `pip install flowcept[all]`. If you want to use Flowcept with Torch, please adapt torch dependencies according to your project's dependencies.

## Cite us

If you used FlowCept in your research, consider citing our paper.
If you used Flowcept in your research, consider citing our paper.

```
Towards Lightweight Data Integration using Multi-workflow Provenance and Data Observability
Expand All @@ -208,7 +208,7 @@ R. Souza, T. Skluzacek, S. Wilkinson, M. Ziatdinov, and R. da Silva

## Disclaimer & Get in Touch

Please note that this a research software. We encourage you to give it a try and use it with your own stack. We are continuously working on improving documentation and adding more examples and notebooks, but we are still far from a good documentation covering the whole system. If you are interested in working with FlowCept in your own scientific project, we can give you a jump start if you reach out to us. Feel free to [create an issue](https://github.com/ORNL/flowcept/issues/new), [create a new discussion thread](https://github.com/ORNL/flowcept/discussions/new/choose) or drop us an email (we trust you'll find a way to reach out to us :wink:).
Please note that this a research software. We encourage you to give it a try and use it with your own stack. We are continuously working on improving documentation and adding more examples and notebooks, but we are still far from a good documentation covering the whole system. If you are interested in working with Flowcept in your own scientific project, we can give you a jump start if you reach out to us. Feel free to [create an issue](https://github.com/ORNL/flowcept/issues/new), [create a new discussion thread](https://github.com/ORNL/flowcept/discussions/new/choose) or drop us an email (we trust you'll find a way to reach out to us :wink:).

## Acknowledgement

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ dev = [
"pika",
"pytest",
"ruff",
"pyyaml"
"pyyaml",
"psutil>=6.1.1"
]
# Torch and some other ml-specific libs, only used for dev purposes, require the following specific versions.
ml_dev = [
Expand Down
9 changes: 7 additions & 2 deletions src/flowcept/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,13 @@ def replace_non_serializable(obj):
else:
return obj
else:
# Replace non-serializable values with id()
return f"{obj.__class__.__name__}_instance_id_{id(obj)}"
if hasattr(obj, "to_dict"):
return obj.to_dict()
elif hasattr(obj, "to_flowcept_dict"):
return obj.to_flowcept_dict()
else:
# Replace non-serializable values with id()
return f"{obj.__class__.__name__}_instance_id_{id(obj)}"


def get_gpu_vendor():
Expand Down
2 changes: 1 addition & 1 deletion src/flowcept/flowcept_api/flowcept_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _init_persistence(self, mq_host=None, mq_port=None):
def stop(self):
"""Stop it."""
if not self.is_started or not self.enabled:
self.logger.warning("Flowcept is already stopped!")
self.logger.warning("Flowcept is already stopped or may never have been started!")
return
if self._interceptors and len(self._interceptors):
for interceptor in self._interceptors:
Expand Down
25 changes: 0 additions & 25 deletions src/flowcept/flowceptor/adapters/dask/dask_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,6 @@ def get_run_spec_data(task_msg: TaskObject, run_spec):
task_msg.used.update(kwargs)
task_msg.used.pop("workflow_id", None)

#
# arg_val = _get_arg("args")
# if arg_val is not None:
# picked_args = pickle.loads(arg_val)
# # pickled_args is always a tuple
# i = 0
# for arg in picked_args:
# task_msg.used[f"arg{i}"] = arg
# i += 1
#
# arg_val = _get_arg("kwargs")
# if arg_val is not None:
# picked_kwargs = pickle.loads(arg_val)
# if "workflow_id" in picked_kwargs:
# task_msg.workflow_id = picked_kwargs.pop("workflow_id")
# if len(picked_kwargs):
# task_msg.used.update(picked_kwargs)

# arg_val = _get_arg("task") # This happens in case of client.map
# if arg_val is not None and type(arg_val) == tuple:
# task_obj = _parse_dask_tuple(arg_val)
# if "workflow_id" in task_obj:
# task_msg.workflow_id = task_obj.pop("workflow_id")
# task_msg.used = task_obj["value"]

if REPLACE_NON_JSON_SERIALIZABLE:
task_msg.used = replace_non_serializable(task_msg.used)

Expand Down
18 changes: 17 additions & 1 deletion tests/misc_tests/log_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import os.path
import unittest

from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.configs import PROJECT_NAME
from flowcept.configs import PROJECT_NAME, LOG_FILE_LEVEL, LOG_FILE_PATH


class TestLog(unittest.TestCase):
Expand Down Expand Up @@ -32,3 +33,18 @@ def test_log(self):
_logger.v = "test_val"
self.assertEqual(_logger2.v, "test_val")

# def test_file_empty(self):
# if os.path.exists(LOG_FILE_PATH):
# os.remove(LOG_FILE_PATH)
# assert not os.path.exists(LOG_FILE_PATH)
#
# _logger = FlowceptLogger()
# _logger.debug("Debug Test Msg")
# _logger.error("Error Test Msg")
# _logger.critical("Critical Test Msg")
#
# if LOG_FILE_LEVEL == "DISABLE":
# assert not os.path.exists(LOG_FILE_PATH), "Log file exists, but it shouldn't."
# else:
# assert os.path.exists(LOG_FILE_PATH), "Log file does not exist, but it should."

Loading