Skip to content

Commit

Permalink
Merge pull request #175 from ORNL/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
renan-souza authored Nov 27, 2024
2 parents 06cf474 + 0853840 commit 770a5aa
Show file tree
Hide file tree
Showing 21 changed files with 239 additions and 102 deletions.
91 changes: 91 additions & 0 deletions .github/workflows/run-tests-py11.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
name: Tests on py11
on: [pull_request]

jobs:

build:
runs-on: ubuntu-latest
timeout-minutes: 40
if: "!contains(github.event.head_commit.message, 'CI Bot')"

steps:
- uses: actions/checkout@v4

- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"

- name: Show OS Info
run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"'

- name: Start docker compose with redis
run: make services

- name: Upgrade pip
run: python -m pip install --upgrade pip

- name: Show Python version
run: python --version && pip --version

- name: Install default dependencies and run simple test
run: |
pip install .
python examples/simple_instrumented_script.py
- name: Install Dask dependencies alone and run a simple Dask test
run: |
pip uninstall flowcept -y
pip install .[dask]
python examples/dask_example.py
- name: Install MLFlow dependencies alone and run a simple MLFlow test
run: |
pip uninstall flowcept -y
pip install .[mlflow]
python examples/mlflow_example.py
- name: Install Tensorboard dependencies alone and run a simple Tensorboard test
run: |
pip uninstall flowcept -y
pip install .[tensorboard]
python examples/tensorboard_example.py
- name: Install all dependencies
run: |
python -m pip install --upgrade pip
python -m pip install .[all]
- name: List installed packages
run: pip list

- name: Test with pytest and redis
run: |
make tests
- name: Test notebooks with pytest and redis
run: make tests-notebooks

- name: Shut down docker compose
run: make services-stop

- name: Start docker compose with kafka
run: docker compose -f deployment/compose-kafka.yml up -d

- name: Wait for one minute
run: sleep 60

- name: Check liveness
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_PORT={MQ_PORT}")'
python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()'
- name: Run tests with kafka
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
# Ignoring heavy tests. They are executed with Kafka in another GH Action.
pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py
26 changes: 14 additions & 12 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: Unit, integration, and notebook tests
on: [push]
on:
push:
schedule:
- cron: '0 12 * * *' # Runs every day at 12 PM UTC (7 AM EST)

jobs:

Expand All @@ -21,30 +24,30 @@ jobs:
run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"'

- name: Start docker compose with redis
run: docker compose -f deployment/compose.yml up -d
run: make services

- name: Upgrade pip
run: python -m pip install --upgrade pip

- name: Install default dependencies and run simple test
run: |
run: |
pip install .
python examples/instrumentation/simple_script.py
python examples/simple_instrumented_script.py
- name: Install Dask dependencies alone and run a simple Dask test
run: |
run: |
pip uninstall flowcept -y
pip install .[dask]
python examples/dask_example.py
- name: Install MLFlow dependencies alone and run a simple MLFlow test
run: |
run: |
pip uninstall flowcept -y
pip install .[mlflow]
python examples/mlflow_example.py
- name: Install Tensorboard dependencies alone and run a simple Tensorboard test
run: |
run: |
pip uninstall flowcept -y
pip install .[tensorboard]
python examples/tensorboard_example.py
Expand All @@ -59,13 +62,13 @@ jobs:

- name: Test with pytest and redis
run: |
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
make tests
- name: Test notebooks with pytest and redis
run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
run: make tests-notebooks

- name: Shut down docker compose
run: docker compose -f deployment/compose.yml down
run: make services-stop

- name: Start docker compose with kafka
run: docker compose -f deployment/compose-kafka.yml up -d
Expand All @@ -84,5 +87,4 @@ jobs:
run: |
export MQ_TYPE=kafka
export MQ_PORT=9092
# Ignoring heavy tests. They are executed with Kafka in another GH Action.
pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py
make tests
23 changes: 23 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,36 @@
help:
@printf "\nCommands:\n"
@printf "\033[32mchecks\033[0m run ruff linter and formatter checks\n"
@printf "\033[32mreformat\033[0m run ruff linter and formatter\n"
@printf "\033[32mclean\033[0m remove cache directories and Sphinx build output\n"
@printf "\033[32mdocs\033[0m build HTML documentation using Sphinx\n"
@printf "\033[32mservices\033[0m run services using Docker\n"
@printf "\033[32mservices-stop\033[0m stop the running Docker services\n"
@printf "\033[32mtests\033[0m run unit tests with pytest\n"
@printf "\033[32mtests-all\033[0m run all unit tests with pytest, including very long-running ones\n"
@printf "\033[32mtests-notebooks\033[0m tests the notebooks, using pytest\n"


# Run linter and formatter checks using ruff
checks:
ruff check src
ruff format --check src

reformat:
ruff check src
ruff format src

# Remove cache directories and Sphinx build output
clean:
rm -rf .ruff_cache
rm -rf .pytest_cache
rm -rf mlruns
rm -rf mnist_data
rm -rf tensorboard_events
rm -f docs_dump_tasks_*
rm -f dump_test.json
rm -f flowcept.log
rm -f mlflow.db
sphinx-build -M clean docs docs/_build

# Build the HTML documentation using Sphinx
Expand All @@ -34,5 +48,14 @@ services-stop:
docker compose --file deployment/compose.yml down --volumes

# Run unit tests using pytest
.PHONY: tests
tests:
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests

.PHONY: tests-notebooks
tests-notebooks:
pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb

.PHONY: tests-all
tests-all:
pytest
1 change: 0 additions & 1 deletion deployment/compose-full.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.8'
name: flowcept
services:
flowcept_redis:
Expand Down
1 change: 0 additions & 1 deletion deployment/compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.8'
name: flowcept
services:
flowcept_redis:
Expand Down
1 change: 0 additions & 1 deletion deployment/compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.8'
name: flowcept
services:
flowcept_redis:
Expand Down
34 changes: 22 additions & 12 deletions examples/dask_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def sum_list(values):
scheduler = cluster.scheduler
client = Client(scheduler.address)

client.forward_logging()

# Registering Flowcept's worker and scheduler adapters
scheduler.add_plugin(FlowceptDaskSchedulerAdapter(scheduler))
client.register_plugin(FlowceptDaskWorkerAdapter())
Expand All @@ -32,29 +34,37 @@ def sum_list(values):
print(f"workflow_id={wf_id}")

# Start Flowcept's Dask observer
flowcept = Flowcept("dask").start()
t1 = client.submit(add, 1, 2)
t2 = client.submit(multiply, 3, 4)
t3 = client.submit(add, t1.result(), t2.result())
t4 = client.submit(sum_list, [t1, t2, t3])
result = t4.result()
print("Result:", result)

# Closing Dask and Flowcept
client.close()
cluster.close()
flowcept.stop()

with Flowcept("dask"): # Optionally: Flowcept("dask").start()

t1 = client.submit(add, 1, 2)
t2 = client.submit(multiply, 3, 4)
t3 = client.submit(add, t1.result(), t2.result())
t4 = client.submit(sum_list, [t1, t2, t3])
result = t4.result()
print("Result:", result)
assert result == 30

# Closing Dask and Flowcept
client.close() # This is to avoid generating errors
cluster.close() # This calls are needed closeouts to inform of workflow conclusion.

# Optionally: flowcept.stop()

# Querying Flowcept's database about this run
print(f"t1_key={t1.key}")
print("Getting first task only:")
task1 = Flowcept.db.query(filter={"task_id": t1.key})[0]
assert task1["workflow_id"] == wf_id
print(task1)
print("\n\n")
print("Getting all tasks from this workflow:")
all_tasks = Flowcept.db.query(filter={"workflow_id": wf_id})
assert len(all_tasks) == 4
assert all(t.get("finished") is True for t in all_tasks)
assert all_tasks[-1]["generated"]["arg0"] == 30, "Checking if the last result was saved."
print(all_tasks)
print("\n\n")
print("Getting workflow info:")
wf_info = Flowcept.db.query(filter={"workflow_id": wf_id}, type="workflow")[0]
assert wf_info["workflow_id"] == wf_id
Expand Down
File renamed without changes.
4 changes: 0 additions & 4 deletions src/flowcept/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
"""Commons subpackage."""

from flowcept.commons.flowcept_logger import FlowceptLogger

logger = FlowceptLogger()
26 changes: 11 additions & 15 deletions src/flowcept/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
import types
import numpy as np

import flowcept.commons
from flowcept import configs
from flowcept.configs import (
PERF_LOG,
)
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.configs import PERF_LOG
from flowcept.commons.flowcept_dataclasses.task_object import Status


Expand Down Expand Up @@ -42,11 +40,11 @@ def get_utc_minutes_ago(minutes_ago=1):
return rounded.timestamp()


def perf_log(func_name, t0: float):
def perf_log(func_name, t0: float, logger=FlowceptLogger()):
"""Configure the performance log."""
if PERF_LOG:
t1 = time()
flowcept.commons.logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}")
logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}")
return t1
return None

Expand All @@ -71,6 +69,7 @@ def assert_by_querying_tasks_until(
"""Assert by query."""
from flowcept.flowcept_api.task_query_api import TaskQueryAPI

logger = FlowceptLogger()
query_api = TaskQueryAPI()
start_time = time()
trials = 0
Expand All @@ -79,24 +78,20 @@ def assert_by_querying_tasks_until(
docs = query_api.query(filter)
if condition_to_evaluate is None:
if docs is not None and len(docs):
flowcept.commons.logger.debug("Query conditions have been met! :D")
logger.debug("Query conditions have been met! :D")
return True
else:
try:
if condition_to_evaluate(docs):
flowcept.commons.logger.debug("Query conditions have been met! :D")
logger.debug("Query conditions have been met! :D")
return True
except Exception:
pass

trials += 1
flowcept.commons.logger.debug(
f"Task Query condition not yet met. Trials={trials}/{max_trials}."
)
logger.debug(f"Task Query condition not yet met. Trials={trials}/{max_trials}.")
sleep(1)
flowcept.commons.logger.debug(
"We couldn't meet the query conditions after all trials or timeout! :("
)
logger.debug("We couldn't meet the query conditions after all trials or timeout! :(")
return False


Expand All @@ -109,6 +104,7 @@ def chunked(iterable, size):
# TODO: consider reusing this function in the function assert_by_querying_task_collections_until
def evaluate_until(evaluation_condition: Callable, max_trials=30, max_time=60, msg=""):
"""Evaluate something."""
logger = FlowceptLogger()
start_time = time()
trials = 0

Expand All @@ -117,7 +113,7 @@ def evaluate_until(evaluation_condition: Callable, max_trials=30, max_time=60, m
return True # Condition met

trials += 1
flowcept.commons.logger.debug(f"Condition not yet met. Trials={trials}/{max_trials}. {msg}")
logger.debug(f"Condition not yet met. Trials={trials}/{max_trials}. {msg}")
sleep(1)

return False # Condition not met within max_trials or max_time
Expand Down
Loading

0 comments on commit 770a5aa

Please sign in to comment.