Skip to content

Commit

Permalink
Merge pull request #187 from ORNL/ml_loops
Browse files Browse the repository at this point in the history
Ml loops
  • Loading branch information
renan-souza authored Dec 18, 2024
2 parents 91c988e + badb390 commit abd9d4f
Show file tree
Hide file tree
Showing 50 changed files with 1,985 additions and 1,255 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/create-release-n-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ jobs:
run: make services

- name: Test with pytest
run: pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
run: pytest

- name: Test notebooks
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run-tests-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
make tests
- name: Test notebooks
run: pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb

- name: Stop services
run: docker compose -f deployment/compose-kafka.yml down
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run-tests-py11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
export MQ_TYPE=kafka
export MQ_PORT=9092
# Ignoring heavy tests. They are executed with Kafka in another GH Action.
pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py
pytest
- name: Stop services
run: docker compose -f deployment/compose-kafka.yml down
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/run-tests-simple.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: (Without Mongo) Unit, integration, and notebook tests
name: (Without Mongo) Simple Tests
on:
push:
schedule:
Expand Down Expand Up @@ -35,7 +35,7 @@ jobs:
run: python -m pip install --upgrade pip

- name: Test examples
run: bash .github/workflows/run_examples.sh examples false # with mongo
run: bash .github/workflows/run_examples.sh examples false # without mongo

- name: Install all dependencies
run: |
Expand All @@ -46,8 +46,7 @@ jobs:
run: pip list

- name: Test with pytest and redis
run: |
make tests
run: make tests

- name: Test notebooks with pytest and redis
run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore="notebooks/dask_from_CLI.ipynb" --ignore="notebooks/analytics.ipynb"
Expand Down
39 changes: 34 additions & 5 deletions .github/workflows/run_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,31 @@
set -e
set -o pipefail

# Display usage/help message
usage() {
echo -e "\nUsage: $0 <examples_dir> <with_mongo>\n"
echo "Arguments:"
echo " examples_dir Path to the examples directory (Mandatory)"
echo " with_mongo Boolean flag (true/false) indicating whether to include MongoDB support (Mandatory)"
echo -e "\nExample:"
echo " $0 examples true"
echo " $0 examples false"
exit 1
}

# Check if the required arguments are provided
if [[ -z "$1" || -z "$2" ]]; then
echo "Error: Missing mandatory arguments!"
usage
fi

# Function to run tests with common steps
run_test() {
test_path="${EXAMPLES_DIR}/${1}_example.py"
test_type="$1"
with_mongo="$2"
echo "Test type=${test_type}"
echo "Running $test_path"
echo "Starting $test_path"

pip uninstall flowcept -y > /dev/null 2>&1 || true # Ignore errors during uninstall

Expand All @@ -29,17 +47,28 @@ run_test() {
elif [[ "$test_type" =~ "tensorboard" ]]; then
echo "Installing tensorboard"
pip install .[tensorboard] > /dev/null 2>&1
elif [[ "$test_type" =~ "single_layer_perceptron" ]]; then
echo "Installing ml_dev dependencies"
pip install .[ml_dev] > /dev/null 2>&1
elif [[ "$test_type" =~ "llm_complex" ]]; then
echo "Installing ml_dev dependencies"
pip install .[ml_dev]
echo "Defining python path for llm_complex..."
export PYTHONPATH=$PYTHONPATH:${EXAMPLES_DIR}/llm_complex
echo $PYTHONPATH
fi

# Run the test and capture output
echo "Running $test_path ..."
python "$test_path" | tee output.log

echo "Ok, ran $test_path."
# Check for errors in the output
if grep -iq "error" output.log; then
echo "Test failed! See output.log for details."
echo "Test $test_path failed! See output.log for details."
exit 1
fi

echo "Great, no errors to run $test_path."

# Clean up the log file
rm output.log
}
Expand All @@ -51,7 +80,7 @@ echo "Using examples directory: $EXAMPLES_DIR"
echo "With Mongo? ${WITH_MONGO}"

# Define the test cases
tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard")
tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard" "single_layer_perceptron" "llm_complex/llm_main")

# Iterate over the tests and run them
for test_ in "${tests[@]}"; do
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ test.py
time.txt
tmp/
deployment/data
**/*output_data*
examples/llm_complex/input_data
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ help:
@printf "\033[32mtests\033[0m run unit tests with pytest\n"
@printf "\033[32mtests-in-container\033[0m run unit tests with pytest inside Flowcept's container\n"
@printf "\033[32mtests-in-container-mongo\033[0m run unit tests inside container with MongoDB\n"
@printf "\033[32mtests-in-container-kafka\033[0m run unit tests inside container with Kafka and MongoDB\n"
@printf "\033[32mtests-all\033[0m run all unit tests with pytest, including long-running ones\n"
@printf "\033[32mtests-notebooks\033[0m test the notebooks using pytest\n"
@printf "\033[32mclean\033[0m remove cache directories and Sphinx build output\n"
Expand Down Expand Up @@ -43,6 +44,7 @@ clean:
find . -type d -name "mlruns" -exec rm -rf {} \; 2>/dev/null || true
find . -type d -name "__pycache__" -exec rm -rf {} \; 2>/dev/null || true
find . -type d -name "*tfevents*" -exec rm -rf {} \; 2>/dev/null || true
find . -type d -name "*output_data*" -exec rm -rf {} \; 2>/dev/null || true
# sphinx-build -M clean docs docs/_build This needs to be fixed.

# Build the HTML documentation using Sphinx
Expand Down Expand Up @@ -88,7 +90,7 @@ liveness:
# Run unit tests using pytest
.PHONY: tests
tests:
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
pytest

.PHONY: tests-notebooks
tests-notebooks:
Expand Down
2 changes: 1 addition & 1 deletion deployment/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN export FLOWCEPT_SETTINGS_PATH=$(realpath resources/sample_settings.yaml) \
RUN conda create -n flowcept python=3.11.10 -y \
&& echo "conda activate flowcept" >> ~/.bashrc

# The following command is an overkill and will install many things you might not need. Please see pyproject.toml and modify deployment/Dockerfile in case you do not need to install "all" dependencies.
# The following command is an overkill and will install many things you might not need. Please modify this Dockerfile in case you do not need to install "all" dependencies.
RUN conda run -n flowcept pip install -e .[all]

CMD ["bash"]
2 changes: 1 addition & 1 deletion examples/dask_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def sum_list(values):

# Closing Dask and Flowcept
client.close() # This is to avoid generating errors
cluster.close() # This calls are needed closeouts to inform of workflow conclusion.
cluster.close() # This call is needed closeout to inform of workflow conclusion.

# Optionally: flowcept.stop()

Expand Down
56 changes: 56 additions & 0 deletions examples/instrumented_loop_unmanaged_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import multiprocessing
import random
from time import sleep

from flowcept import Flowcept, FlowceptLoop

if __name__ == '__main__': #

interceptor_id = Flowcept.start_instrumentation_interceptor()

event = multiprocessing.Event()
process1 = multiprocessing.Process(target=Flowcept.start_persistence, args=(interceptor_id, event))
process1.start()
sleep(1)
# Run loop
loop = FlowceptLoop(range(max_iterations := 3), workflow_id=interceptor_id)
for item in loop:
loss = random.random()
sleep(0.05)
print(item, loss)
# The following is optional, in case you want to capture values generated inside the loop.
loop.end_iter({"item": item, "loss": loss})

Flowcept.stop_instrumentation_interceptor()

event.set()
process1.join()

docs = Flowcept.db.query(filter={"workflow_id": interceptor_id})
for d in docs:
print(d)
# assert len(docs) == max_iterations+1 # The whole loop itself is a task

#
#
# @staticmethod
# def start_instrumentation_interceptor():
# instance = InstrumentationInterceptor.get_instance()
# instance_id = id(instance)
# instance.start(bundle_exec_id=instance_id)
# return instance_id
#
# @staticmethod
# def stop_instrumentation_interceptor():
# instance = InstrumentationInterceptor.get_instance()
# instance.stop()
#
# @staticmethod
# def start_persistence(interceptor_id, event):
# from flowcept.flowceptor.consumers.document_inserter import DocumentInserter
# inserter = DocumentInserter(
# check_safe_stops=True,
# bundle_exec_id=interceptor_id,
# ).start()
# event.wait()
# inserter.stop()
1 change: 1 addition & 0 deletions examples/instrumented_simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ def mult_two(n):
print(o2)
docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id})
print(len(docs))
assert len(docs) == 2

149 changes: 149 additions & 0 deletions examples/llm_complex/llm_dataprep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from time import time

import torch
import os

from torch.utils.data import Subset
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from datasets import load_dataset


# Define a function to yield tokens from the dataset
def yield_tokens(tokenizer, data_iter):
for item in data_iter:
if len(item["text"]):
yield tokenizer(item["text"])


# Define a function to process the raw text and convert it to tensors
def data_process(tokenizer, vocab, raw_text_iter):
data = [
torch.tensor(
[vocab[token] for token in tokenizer(item["text"])],
dtype=torch.long,
)
for item in raw_text_iter
]
return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))


def get_dataset_ref(campaign_id, train_data, val_data, test_data):
dataset_ref = f"{campaign_id}_train_shape_{train_data.shape}_val_shape_{val_data.shape}_test_shape_{test_data.shape}"
return dataset_ref

def get_wiki_text_dataset(data_dir):
# Load the WikiText2 dataset
t0 = time()
train_data = torch.load(os.path.join(data_dir, "train_data.tensor"))
val_data = torch.load(os.path.join(data_dir, "val_data.tensor"))
test_data = torch.load(os.path.join(data_dir, "test_data.tensor"))
t1 = time()
t_disk_load = t1 - t0

try:
if torch.cuda.is_available():
device = torch.device("gpu")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")

t2 = time()
t_device_available = t2 - t1
train_data = train_data.to(device)
val_data = val_data.to(device)
test_data = test_data.to(device)
t_gpu_load = time() - t2
except:
raise Exception("Couldn't send data to device")

print("Train data", train_data.shape)
print("Validation data", val_data.shape)
print("Test data", test_data.shape)
return (
train_data,
val_data,
test_data,
t_disk_load,
t_device_available,
t_gpu_load,
)

def save_workflow(ntokens, train_data, val_data, test_data, dataset_ref, subset_size=None, tokenizer_type=None, campaign_id=None):
from flowcept import WorkflowObject, Flowcept
config = {
"subset_size": subset_size,
"tokenizer_type": tokenizer_type
}
dataset_prep_wf = WorkflowObject()
dataset_prep_wf.used = config
dataset_prep_wf.campaign_id = campaign_id
dataset_prep_wf.name = "generate_wikitext_dataset"

dataset_prep_wf.generated = {
"ntokens": ntokens,
"dataset_ref": dataset_ref,
"train_data_shape": list(train_data.shape),
"val_data_shape": list(val_data.shape),
"test_data_shape": list(test_data.shape),
}
Flowcept.db.insert_or_update_workflow(dataset_prep_wf)
print(dataset_prep_wf)
return dataset_prep_wf.workflow_id, dataset_ref


def dataprep_workflow(data_dir="input_data",
tokenizer_type="basic_english", # spacy, moses, toktok, revtok, subword
subset_size=None,
campaign_id=None):

os.makedirs(data_dir, exist_ok=True)

print("Downloading dataset")
dataset = load_dataset("wikitext", "wikitext-2-v1")
print("Ok, now saving it into the current directory")
dataset.save_to_disk(os.path.join(data_dir, "wikitext-2-v1.data"))

test_dataset = dataset["test"]
train_dataset = dataset["train"]
validation_dataset = dataset["validation"]

if subset_size is not None and subset_size > 0:
test_dataset = Subset(test_dataset, range(subset_size))
train_dataset = Subset(train_dataset, range(subset_size))
validation_dataset = Subset(validation_dataset, range(subset_size))

# Build the vocabulary from the training dataset
tokenizer = get_tokenizer(tokenizer_type)
vocab = build_vocab_from_iterator(yield_tokens(tokenizer, train_dataset))
vocab.set_default_index(vocab["<unk>"])
ntokens = len(vocab)

# Process the train, validation, and test datasets
train_data = data_process(tokenizer, vocab, train_dataset)
val_data = data_process(tokenizer, vocab, validation_dataset)
test_data = data_process(tokenizer, vocab, test_dataset)

train_data_path = os.path.join(data_dir, "train_data.tensor")
val_data_path = os.path.join(data_dir, "val_data.tensor")
test_data_path = os.path.join(data_dir, "test_data.tensor")

torch.save(train_data, train_data_path)
torch.save(val_data, val_data_path)
torch.save(test_data, test_data_path)

print(f"Saved files in {data_dir}. Now running some asserts.")

train_data_loaded = torch.load(train_data_path)
val_data_loaded = torch.load(val_data_path)
test_data_loaded = torch.load(test_data_path)

assert all(train_data == train_data_loaded)
assert all(val_data == val_data_loaded)
assert all(test_data == test_data_loaded)

dataset_ref = get_dataset_ref(campaign_id, train_data, val_data, test_data)
wf_id = save_workflow(ntokens, train_data, val_data, test_data, dataset_ref, subset_size=subset_size, tokenizer_type=tokenizer_type, campaign_id=campaign_id)
return wf_id, dataset_ref, ntokens

Loading

0 comments on commit abd9d4f

Please sign in to comment.