Skip to content

Commit

Permalink
Merge pull request #137 from ORNL/generic_mq
Browse files Browse the repository at this point in the history
Enabling kafka
  • Loading branch information
renan-souza authored Sep 23, 2024
2 parents a949285 + 0d76c4f commit 73597ea
Show file tree
Hide file tree
Showing 43 changed files with 771 additions and 364 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/create-release-n-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ jobs:
- name: Install dev dependencies
run: |
pip install -r extra_requirements/dev-requirements.txt
- name: Run Docker Compose
run: docker compose -f deployment/compose-full.yml up -d
- name: Test with pytest
run: |
mkdir -p ~/.flowcept
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/run-tests-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: All tests on Kafka MQ
on: [push]
# branches: [ "disabled" ]

jobs:

build:
runs-on: ubuntu-latest
timeout-minutes: 60
if: "!contains(github.event.head_commit.message, 'CI Bot')"
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.9
uses: actions/setup-python@v3
with:
python-version: "3.9"
- name: Check python version
run: python --version
- name: Install our dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[fulldev]
- name: Run Docker Compose
run: docker compose -f deployment/compose-kafka.yml up -d
- name: Wait 1 min
run: sleep 60
- name: Check liveness
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")'
python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()'
- name: Run Tests with Kafka
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
- name: Test notebooks
run: |
pip install -e .[full]
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")'
python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()'
python flowcept/flowcept_webserver/app.py &
sleep 3
export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml
pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
31 changes: 25 additions & 6 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,36 @@ jobs:
- name: Install our dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[full]
pip install -r extra_requirements/dev-requirements.txt
- name: Run Docker Compose
run: docker compose -f deployment/compose.yml up -d
- name: Test with pytest
pip install -e .[fulldev] # This will install all dependencies, for all adapters and dev deps.
- name: Pip list
run: pip list
- name: Start Docker Compose with Redis
run: docker compose -f deployment/compose-full.yml up -d
- name: Test with pytest with Redis
run: |
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
- name: Test notebooks
- name: Test notebooks with Redis
run: |
pip install -e .
python flowcept/flowcept_webserver/app.py &
sleep 3
export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml
pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
- name: Shut down compose
run: docker compose -f deployment/compose-full.yml down
- name: Start Docker Compose with Kafka
run: docker compose -f deployment/compose-kafka.yml up -d
- name: Wait 1 min
run: sleep 60
- name: Check liveness
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")'
python -c 'from flowcept import FlowceptConsumerAPI; assert FlowceptConsumerAPI.services_alive()'
- name: Run Tests with Kafka
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
# Ignoring heavy tests. They are executed with Kafka in another GH Action.
pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py
2 changes: 1 addition & 1 deletion .github/workflows/test-python-310-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
- name: Run Docker Compose
run: |
docker compose version
docker compose -f deployment/compose.yml up -d
docker compose -f deployment/compose-full.yml up -d
- name: Test with pytest
run: |
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests/
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-python-310.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
pip install -e .[full]
pip install -r extra_requirements/dev-requirements.txt
- name: Run Docker Compose
run: docker compose -f deployment/compose.yml up -d
run: docker compose -f deployment/compose-full.yml up -d
- name: Test with pytest
run: |
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-python-311.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
pip install -e .[full]
pip install -r extra_requirements/dev-requirements.txt
- name: Run Docker Compose
run: docker compose -f deployment/compose.yml up -d
run: docker compose -f deployment/compose-full.yml up -d
- name: Test with pytest
run: |
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests/
Expand Down
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,47 @@ You may need to set the environment variable `FLOWCEPT_SETTINGS_PATH` with the a

5. To use FlowCept's Query API, see utilization examples in the notebooks.

### Simple Example with Decorators Instrumentation

In addition to existing adapters to Dask, MLFlow, and others (it's extensible for any system that generates data), FlowCept also offers instrumentation via @decorators.

```python
from uuid import uuid4

from flowcept import (
FlowceptConsumerAPI,
WorkflowObject,
DBAPI,
flowcept_task,
INSTRUMENTATION
)


@flowcept_task
def sum_one(n, workflow_id=None):
return n + 1


@flowcept_task
def mult_two(n, workflow_id=None):
return n * 2


db = DBAPI()
wf_id = str(uuid4())
with FlowceptConsumerAPI(INSTRUMENTATION):
# The next line is optional
db.insert_or_update_workflow(WorkflowObject(workflow_id=wf_id))
n = 3
o1 = sum_one(n, workflow_id=wf_id)
o2 = mult_two(o1, workflow_id=wf_id)

print(db.query(filter={"workflow_id": wf_id}))

```



## Performance Tuning for Performance Evaluation

In the settings.yaml file, the following variables might impact interception performance:
Expand Down
32 changes: 32 additions & 0 deletions deployment/compose-full.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version: '3.8'
name: flowcept
services:
flowcept_redis:
container_name: flowcept_redis
image: redis
ports:
- 6379:6379

flowcept_mongo:
container_name: flowcept_mongo
image: mongo:latest
# volumes:
# - /Users/rsr/Downloads/mongo_data/db:/data/db
ports:
- 27017:27017


# # This is just for the cases where one does not want to use the same Redis instance for caching and messaging, but
# # it's not required to have separate instances.
# # local_interceptor_cache:
# # container_name: local_interceptor_cache
# # image: redis
# # ports:
# # - 60379:6379

zambeze_rabbitmq:
container_name: zambeze_rabbitmq
image: rabbitmq:3.11-management
ports:
- 5672:5672
- 15672:15672
57 changes: 57 additions & 0 deletions deployment/compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
version: '3.8'
name: flowcept
services:
flowcept_redis:
container_name: flowcept_redis
image: redis
ports:
- 6379:6379

flowcept_mongo:
container_name: flowcept_mongo
image: mongo:latest
# volumes:
# - /Users/rsr/Downloads/mongo_data/db:/data/db
ports:
- 27017:27017

zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

# reachable on 9092 from the host and on 29092 from inside docker compose
kafka:
image: confluentinc/cp-kafka:6.1.1
depends_on:
- zookeeper
ports:
- '9092:9092'
expose:
- '29092'
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_MIN_INSYNC_REPLICAS: '1'

init-kafka:
image: confluentinc/cp-kafka:6.1.1
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic interception --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
7 changes: 1 addition & 6 deletions deployment/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ services:
- 27017:27017



# # This is just for the cases where one does not want to use the same Redis instance for caching and messaging, but
# # it's not required to have separate instances.
# # local_interceptor_cache:
Expand All @@ -24,9 +25,3 @@ services:
# # ports:
# # - 60379:6379

zambeze_rabbitmq:
container_name: zambeze_rabbitmq
image: rabbitmq:3.11-management
ports:
- 5672:5672
- 15672:15672
1 change: 0 additions & 1 deletion extra_requirements/data_augmentation-requirements.txt

This file was deleted.

7 changes: 3 additions & 4 deletions extra_requirements/dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
pytest==6.2.4
flake8==5.0.4
black==23.1.0
numpy==1.23.4
numpy<2.0.0
bokeh==2.4.2
jupyterlab==3.6.1
nbmake==1.4
cluster_experiment_utils
jupyterlab
nbmake
# Pytorch models stuff:
torch
torchvision
Expand Down
1 change: 1 addition & 0 deletions extra_requirements/kafka-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
confluent-kafka==2.5.3
2 changes: 1 addition & 1 deletion extra_requirements/mlflow-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mlflow-skinny==2.1.1
mlflow-skinny>2.1.1,<=2.16.2
SQLAlchemy==1.4.42
alembic==1.8.1
watchdog==2.2.1
4 changes: 2 additions & 2 deletions extra_requirements/tensorboard-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
tensorboard==2.13.0
tensorflow==2.13.0
tensorboard
tensorflow
tbparse==0.0.7
6 changes: 6 additions & 0 deletions flowcept/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import flowcept

from flowcept.configs import SETTINGS_PATH

from flowcept.version import __version__

from flowcept.commons.vocabulary import Vocabulary


from flowcept.flowcept_api.consumer_api import FlowceptConsumerAPI
from flowcept.flowcept_api.task_query_api import TaskQueryAPI
from flowcept.flowcept_api.db_api import DBAPI
from flowcept.instrumentation.decorators.flowcept_task import flowcept_task

INSTRUMENTATION = FlowceptConsumerAPI.INSTRUMENTATION

from flowcept.commons.flowcept_dataclasses.workflow_object import (
WorkflowObject,
Expand Down
4 changes: 4 additions & 0 deletions flowcept/analytics/data_augmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import pandas as pd

from h2o.automl import H2OAutoML
from typing_extensions import deprecated

h2o.init()


@deprecated
def train_model(
df,
x_cols: List[str],
Expand All @@ -26,6 +28,7 @@ def train_model(
return aml


@deprecated
def augment_df_linearly(df, N, cols_to_augment, seed=1234):
np.random.seed(seed)
new_df = df.copy()
Expand All @@ -48,6 +51,7 @@ def augment_df_linearly(df, N, cols_to_augment, seed=1234):
return appended_df


@deprecated
def augment_data(df, N, augmentation_model: H2OAutoML, x_cols, y_col):
new_df = augment_df_linearly(df, N, x_cols)
h2odf = h2o.H2OFrame(new_df.loc[new_df["original"] == 0][x_cols])
Expand Down
26 changes: 2 additions & 24 deletions flowcept/commons/daos/autoflush_buffer.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,7 @@
from queue import Queue
from typing import Union, List, Dict, Callable
from typing import Callable

import msgpack
from redis import Redis
from redis.client import PubSub
from threading import Thread, Lock, Event
from time import time, sleep

import flowcept.commons
from flowcept.commons.daos.keyvalue_dao import KeyValueDAO
from flowcept.commons.utils import perf_log
from threading import Thread, Event
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.configs import (
REDIS_HOST,
REDIS_PORT,
REDIS_CHANNEL,
REDIS_PASSWORD,
JSON_SERIALIZER,
REDIS_BUFFER_SIZE,
REDIS_INSERTION_BUFFER_TIME,
PERF_LOG,
REDIS_URI,
)

from flowcept.commons.utils import GenericJSONEncoder


class AutoflushBuffer:
Expand Down
Loading

0 comments on commit 73597ea

Please sign in to comment.