Skip to content

Commit

Permalink
Merge pull request #140 from ORNL/generic_mq
Browse files Browse the repository at this point in the history
Dev < Generic mq < Flowcept API
  • Loading branch information
renan-souza authored Sep 24, 2024
2 parents 73597ea + 585f40f commit 492b7cb
Show file tree
Hide file tree
Showing 29 changed files with 202 additions and 170 deletions.
11 changes: 8 additions & 3 deletions .github/workflows/run-tests-kafka.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: All tests on Kafka MQ
on: [push]
on:
pull_request:
branches: [ "dev", "main" ]
types: [opened, synchronize, reopened]
# branches: [ "disabled" ]

jobs:
Expand All @@ -20,6 +23,8 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -e .[fulldev]
- name: Pip list
run: pip list
- name: Run Docker Compose
run: docker compose -f deployment/compose-kafka.yml up -d
- name: Wait 1 min
Expand All @@ -29,7 +34,7 @@ jobs:
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")'
python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()'
python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()'
- name: Run Tests with Kafka
run: |
export MQ_TYPE=kafka
Expand All @@ -41,7 +46,7 @@ jobs:
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")'
python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()'
python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()'
python flowcept/flowcept_webserver/app.py &
sleep 3
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ jobs:
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")'
python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()'
python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()'
- name: Run Tests with Kafka
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
# Ignoring heavy tests. They are executed with Kafka in another GH Action.
pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py
pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py
29 changes: 8 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,25 @@ You may need to set the environment variable `FLOWCEPT_SETTINGS_PATH` with the a
In addition to existing adapters to Dask, MLFlow, and others (it's extensible for any system that generates data), FlowCept also offers instrumentation via @decorators.

```python
from uuid import uuid4

from flowcept import (
FlowceptConsumerAPI,
WorkflowObject,
DBAPI,
flowcept_task,
INSTRUMENTATION
)

from flowcept import Flowcept, flowcept_task

@flowcept_task
def sum_one(n, workflow_id=None):
def sum_one(n):
return n + 1


@flowcept_task
def mult_two(n, workflow_id=None):
def mult_two(n):
return n * 2


db = DBAPI()
wf_id = str(uuid4())
with FlowceptConsumerAPI(INSTRUMENTATION):
# The next line is optional
db.insert_or_update_workflow(WorkflowObject(workflow_id=wf_id))
with Flowcept(workflow_name='test_workflow'):
n = 3
o1 = sum_one(n, workflow_id=wf_id)
o2 = mult_two(o1, workflow_id=wf_id)

print(db.query(filter={"workflow_id": wf_id}))
o1 = sum_one(n)
o2 = mult_two(o1)
print(o2)

print(Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}))
```


Expand Down
5 changes: 1 addition & 4 deletions flowcept/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
from flowcept.commons.vocabulary import Vocabulary


from flowcept.flowcept_api.consumer_api import FlowceptConsumerAPI
from flowcept.flowcept_api.flowcept_controller import Flowcept
from flowcept.flowcept_api.task_query_api import TaskQueryAPI
from flowcept.flowcept_api.db_api import DBAPI
from flowcept.instrumentation.decorators.flowcept_task import flowcept_task

INSTRUMENTATION = FlowceptConsumerAPI.INSTRUMENTATION

from flowcept.commons.flowcept_dataclasses.workflow_object import (
WorkflowObject,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from typing import List, Union
from time import sleep

from flowcept.commons.flowcept_dataclasses.workflow_object import (
WorkflowObject,
)

import flowcept.instrumentation.decorators
from flowcept.commons import logger
from flowcept.commons.daos.document_db_dao import DocumentDBDao
from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao
from flowcept.configs import MQ_INSTANCES
from flowcept.flowcept_api.db_api import DBAPI
from flowcept.flowceptor.consumers.document_inserter import DocumentInserter
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor


# TODO: :code-reorg: This may not be considered an API anymore as it's doing critical things for the good functioning of the system.
class FlowceptConsumerAPI(object):
INSTRUMENTATION = "instrumentation"
class Flowcept(object):
db = DBAPI()

current_workflow_id = None

def __init__(
self,
Expand All @@ -22,7 +28,23 @@ def __init__(
] = None,
bundle_exec_id=None,
start_doc_inserter=True,
workflow_id: str = None,
workflow_name: str = None,
workflow_args: str = None,
):
"""
Flowcept controller.
This class controls the interceptors, including instrumentation.
If using for instrumentation, we assume one instance of this class
per workflow is being utilized.
Parameters
----------
interceptors - list of Flowcept interceptors. If none, instrumentation will be used. If a string is passed, no interceptor will be started. # TODO: improve clarity for the documentation.
bundle_exec_id - A way to group interceptors.
start_doc_inserter - Whether you want to start consuming MQ messages to inject in the DB.
"""
self.logger = FlowceptLogger()

self._document_inserters: List[DocumentInserter] = []
Expand All @@ -31,13 +53,26 @@ def __init__(
self._bundle_exec_id = id(self)
else:
self._bundle_exec_id = bundle_exec_id
if interceptors == FlowceptConsumerAPI.INSTRUMENTATION:
interceptors = (
flowcept.instrumentation.decorators.instrumentation_interceptor
if isinstance(interceptors, str):
self._interceptors = None
else:
if interceptors is None:
interceptors = [
flowcept.instrumentation.decorators.instrumentation_interceptor
]
elif not isinstance(interceptors, list):
interceptors = [interceptors]
self._interceptors: List[BaseInterceptor] = interceptors

if workflow_id or workflow_args or workflow_name:
wf_obj = WorkflowObject(
workflow_id, workflow_name, used=workflow_args
)
if interceptors is not None and type(interceptors) != list:
interceptors = [interceptors]
self._interceptors: List[BaseInterceptor] = interceptors
Flowcept.db.insert_or_update_workflow(wf_obj)
Flowcept.current_workflow_id = wf_obj.workflow_id
else:
Flowcept.current_workflow_id = None

self.is_started = False

def start(self):
Expand Down
4 changes: 4 additions & 0 deletions flowcept/instrumentation/decorators/flowcept_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from time import time
from functools import wraps
import flowcept.commons
from flowcept import Flowcept
from flowcept.commons.flowcept_dataclasses.task_object import (
TaskObject,
Status,
Expand All @@ -25,6 +26,9 @@ def default_args_handler(task_message: TaskObject, *args, **kwargs):
"workflow_id", None
)
args_handled.update(kwargs)
task_message.workflow_id = (
task_message.workflow_id or Flowcept.current_workflow_id
)
if REPLACE_NON_JSON_SERIALIZABLE:
args_handled = replace_non_serializable(args_handled)
return args_handled
Expand Down
4 changes: 2 additions & 2 deletions flowcept/instrumentation/decorators/responsible_ai.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from functools import wraps
import numpy as np
from torch import nn
from flowcept import DBAPI
from flowcept import Flowcept
from flowcept.commons.utils import replace_non_serializable
from flowcept.configs import REPLACE_NON_JSON_SERIALIZABLE, INSTRUMENTATION

Expand Down Expand Up @@ -110,7 +110,7 @@ def wrapper(*args, **kwargs):
if INSTRUMENTATION.get("torch", False) and INSTRUMENTATION[
"torch"
].get("save_models", False):
obj_id = DBAPI().save_torch_model(
obj_id = Flowcept.db.save_torch_model(
model, custom_metadata=ret["responsible_ai_metadata"]
)
ret["object_id"] = obj_id
Expand Down
4 changes: 2 additions & 2 deletions flowcept/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys

from flowcept import (
FlowceptConsumerAPI,
Flowcept,
ZambezeInterceptor,
MLFlowInterceptor,
TensorboardInterceptor,
Expand Down Expand Up @@ -36,7 +36,7 @@ def main():
)
interceptors.append(interceptor)

consumer = FlowceptConsumerAPI(interceptors)
consumer = Flowcept(interceptors)
consumer.start()


Expand Down
2 changes: 1 addition & 1 deletion flowcept/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# This file is supposed to be automatically modified by the CI Bot.
# The expected format is: <Major>.<Minor>.<Patch>
# See .github/workflows/version_bumper.py
__version__ = "0.4.1"
__version__ = "0.5.0"
5 changes: 2 additions & 3 deletions notebooks/analytics.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
" \"\"\"\n",
" import json\n",
" from uuid import uuid4\n",
" from flowcept import DBAPI\n",
" db_api = DBAPI()\n",
" from flowcept import Flowcept\n",
" test_data_path = '../tests/api/sample_data_with_telemetry_and_rai.json' # This sample data contains a workflow composed of 9 tasks.\n",
" with open(test_data_path) as f:\n",
" base_data = json.loads(f.read())\n",
Expand All @@ -47,7 +46,7 @@
" new_doc[\"workflow_id\"] = wf_id\n",
" docs.append(new_doc)\n",
" \n",
" inserted_ids = db_api._dao.insert_many(docs)\n",
" inserted_ids = Flowcept.db._dao.insert_many(docs)\n",
" assert len(inserted_ids) == len(base_data)\n",
" return wf_id"
]
Expand Down
Loading

0 comments on commit 492b7cb

Please sign in to comment.