diff --git a/.github/workflows/run-tests-simple.yml b/.github/workflows/run-tests-simple.yml index 44511b74..1a77ea98 100644 --- a/.github/workflows/run-tests-simple.yml +++ b/.github/workflows/run-tests-simple.yml @@ -1,4 +1,4 @@ -name: (Without Mongo) Unit, integration, and notebook tests +name: (Without Mongo) Simple Tests on: push: schedule: @@ -35,7 +35,7 @@ jobs: run: python -m pip install --upgrade pip - name: Test examples - run: bash .github/workflows/run_examples.sh examples false # with mongo + run: bash .github/workflows/run_examples.sh examples false # without mongo - name: Install all dependencies run: | @@ -46,8 +46,7 @@ jobs: run: pip list - name: Test with pytest and redis - run: | - make tests + run: make tests - name: Test notebooks with pytest and redis run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore="notebooks/dask_from_CLI.ipynb" --ignore="notebooks/analytics.ipynb" diff --git a/.github/workflows/run_examples.sh b/.github/workflows/run_examples.sh index d83374c4..739ed0ca 100644 --- a/.github/workflows/run_examples.sh +++ b/.github/workflows/run_examples.sh @@ -28,7 +28,7 @@ run_test() { test_type="$1" with_mongo="$2" echo "Test type=${test_type}" - echo "Running $test_path" + echo "Starting $test_path" pip uninstall flowcept -y > /dev/null 2>&1 || true # Ignore errors during uninstall @@ -48,22 +48,24 @@ run_test() { echo "Installing tensorboard" pip install .[tensorboard] > /dev/null 2>&1 elif [[ "$test_type" =~ "llm_complex" ]]; then - echo "Installing dev dependencies" + echo "Installing ml_dev dependencies" pip install .[ml_dev] echo "Defining python path for llm_complex..." export PYTHONPATH=$PYTHONPATH:${EXAMPLES_DIR}/llm_complex echo $PYTHONPATH fi - # Run the test and capture output + echo "Running $test_path ..." python "$test_path" | tee output.log - + echo "Ok, ran $test_path." # Check for errors in the output if grep -iq "error" output.log; then echo "Test failed! See output.log for details." exit 1 fi + echo "Great, no errors to run $test_path." + # Clean up the log file rm output.log } diff --git a/examples/instrumented_simple_example.py b/examples/instrumented_simple_example.py index f40c8aa7..da2259ac 100644 --- a/examples/instrumented_simple_example.py +++ b/examples/instrumented_simple_example.py @@ -18,4 +18,5 @@ def mult_two(n): print(o2) docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}) print(len(docs)) +assert len(docs) == 2 diff --git a/examples/llm_complex/llm_model.py b/examples/llm_complex/llm_model.py index 7a33527f..5901bbc4 100644 --- a/examples/llm_complex/llm_model.py +++ b/examples/llm_complex/llm_model.py @@ -12,13 +12,9 @@ from torchtext.vocab import build_vocab_from_iterator from datasets import load_dataset +from flowcept import Flowcept, FlowceptLoop, flowcept_torch from flowcept.configs import N_GPUS -from flowcept.flowcept_api.flowcept_controller import Flowcept -from flowcept.instrumentation.decorators.flowcept_loop import FlowceptLoop -from flowcept.instrumentation.decorators.flowcept_torch import ( - flowcept_torch, -) # Define a function to batchify the data def batchify(data, bsz): @@ -292,7 +288,12 @@ def model_train( # If the validation loss has improved, save the model's state if val_loss < best_val_loss: best_val_loss = val_loss - best_obj_id = Flowcept.db.save_torch_model(model, task_id=epochs_loop.current_iteration_task.get("task_id", None), workflow_id=workflow_id, custom_metadata={"best_val_loss": best_val_loss}) + best_obj_id = Flowcept.db.save_torch_model( + model, + task_id=epochs_loop.current_iteration_task.get("task_id", None), + workflow_id=workflow_id, + custom_metadata={"best_val_loss": best_val_loss} + ) epochs_loop.end_iter({"train_loss": train_loss, "val_loss": val_loss}) diff --git a/examples/llm_complex/llm_search_example.py b/examples/llm_complex/llm_search_example.py index 6bc913dc..9aceeb1a 100644 --- a/examples/llm_complex/llm_search_example.py +++ b/examples/llm_complex/llm_search_example.py @@ -10,10 +10,9 @@ from distributed import LocalCluster, Client from examples.llm_complex.llm_model import model_train, get_wiki_text, TransformerModel -from flowcept import WorkflowObject -from flowcept.commons.vocabulary import Status + from flowcept.configs import MONGO_ENABLED, INSTRUMENTATION -from flowcept.flowcept_api.flowcept_controller import Flowcept +from flowcept import Flowcept from flowcept.flowceptor.adapters.dask.dask_plugins import FlowceptDaskSchedulerAdapter, \ FlowceptDaskWorkerAdapter, register_dask_workflow @@ -75,6 +74,7 @@ def get_dataset_ref(campaign_id, train_data, val_data, test_data): def dataprep_workflow(tokenizer_type="basic_english", subset_size=None, campaign_id=None): + from flowcept import WorkflowObject config = { "subset_size": subset_size, "tokenizer": tokenizer_type @@ -137,10 +137,6 @@ def search_workflow(ntokens, train_data, val_data, test_data, exp_param_settings def main(): - if not MONGO_ENABLED: - print("This test is only available if Mongo is enabled.") - sys.exit(0) - _campaign_id = str(uuid.uuid4()) print(f"Campaign id={_campaign_id}") @@ -171,6 +167,7 @@ def main(): def run_asserts_and_exports(campaign_id, output_dir="output_data"): + from flowcept.commons.vocabulary import Status print("Now running all asserts...") """ So far, this works as follows: @@ -300,6 +297,10 @@ def run_asserts_and_exports(campaign_id, output_dir="output_data"): if __name__ == "__main__": + if not MONGO_ENABLED: + print("This test is only available if Mongo is enabled.") + sys.exit(0) + campaign_id, dataprep_wf_id, model_search_wf_id = main() run_asserts_and_exports(campaign_id) print("Alright! Congrats.") diff --git a/notebooks/tensorboard.ipynb b/notebooks/tensorboard.ipynb index 9b0fc16f..6512d0a8 100644 --- a/notebooks/tensorboard.ipynb +++ b/notebooks/tensorboard.ipynb @@ -58,6 +58,13 @@ " (x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()\n", " x_train, x_test = x_train / 255.0, x_test / 255.0\n", "\n", + " # Reduce the dataset size for faster debugging\n", + " DEBUG_SAMPLES_TRAIN = 100 # Number of training samples to keep\n", + " DEBUG_SAMPLES_TEST = 20 # Number of test samples to keep\n", + " \n", + " x_train, y_train = x_train[:DEBUG_SAMPLES_TRAIN], y_train[:DEBUG_SAMPLES_TRAIN]\n", + " x_test, y_test = x_test[:DEBUG_SAMPLES_TEST], y_test[:DEBUG_SAMPLES_TEST]\n", + "\n", " HP_NUM_UNITS = hp.HParam(\"num_units\", hp.Discrete([16, 32]))\n", " HP_DROPOUT = hp.HParam(\"dropout\", hp.RealInterval(0.1, 0.2))\n", " HP_OPTIMIZER = hp.HParam(\"optimizer\", hp.Discrete([\"adam\", \"sgd\"]))\n", diff --git a/resources/sample_settings.yaml b/resources/sample_settings.yaml index 58d8bf75..06a2e03d 100644 --- a/resources/sample_settings.yaml +++ b/resources/sample_settings.yaml @@ -52,7 +52,7 @@ web_server: port: 5000 sys_metadata: - environment_id: "frontier" + environment_id: "laptop" extra_metadata: place_holder: "" diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/flowcept/__init__.py b/src/flowcept/__init__.py index 2d6b273b..8d43f7ba 100644 --- a/src/flowcept/__init__.py +++ b/src/flowcept/__init__.py @@ -15,17 +15,22 @@ def __getattr__(name): return Flowcept elif name == "flowcept_task": - from flowcept.instrumentation.decorators.flowcept_task import flowcept_task + from flowcept.instrumentation.flowcept_task import flowcept_task return flowcept_task + elif name == "flowcept_torch": + from flowcept.instrumentation.flowcept_torch import flowcept_torch + + return flowcept_torch + elif name == "FlowceptLoop": - from flowcept.instrumentation.decorators.flowcept_loop import FlowceptLoop + from flowcept.instrumentation.flowcept_loop import FlowceptLoop return FlowceptLoop elif name == "telemetry_flowcept_task": - from flowcept.instrumentation.decorators.flowcept_task import telemetry_flowcept_task + from flowcept.instrumentation.flowcept_task import telemetry_flowcept_task return telemetry_flowcept_task @@ -77,6 +82,7 @@ def __getattr__(name): "FlowceptLoop", "telemetry_flowcept_task", "Flowcept", + "flowcept_torch", "WorkflowObject", "__version__", "SETTINGS_PATH", diff --git a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py index 3af0843f..f6fdd3b3 100644 --- a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py +++ b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py @@ -22,17 +22,17 @@ class LMDBDAO(DocumentDBDAO): Provides methods for storing and retrieving task and workflow data. """ - def __new__(cls, *args, **kwargs) -> "LMDBDAO": - """Singleton creator for MongoDBDAO.""" - # Check if an instance already exists - if DocumentDBDAO._instance is None: - DocumentDBDAO._instance = super(LMDBDAO, cls).__new__(cls) - return DocumentDBDAO._instance + # def __new__(cls, *args, **kwargs) -> "LMDBDAO": + # """Singleton creator for LMDBDAO.""" + # # Check if an instance already exists + # if DocumentDBDAO._instance is None: + # DocumentDBDAO._instance = super(LMDBDAO, cls).__new__(cls) + # return DocumentDBDAO._instance def __init__(self): - if not hasattr(self, "_initialized"): - self._initialized = True - self._open() + #if not hasattr(self, "_initialized"): + self._initialized = True + self._open() def _open(self): """Open LMDB environment and databases.""" diff --git a/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py b/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py index 7630558e..a180decb 100644 --- a/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py +++ b/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py @@ -12,7 +12,6 @@ import pandas as pd import pyarrow.parquet as pq import pyarrow as pa -import pymongo from bson import ObjectId from bson.json_util import dumps @@ -42,39 +41,37 @@ class MongoDBDAO(DocumentDBDAO): various collections (`tasks`, `workflows`, `objects`). """ - pymongo.ASCENDING - - def __new__(cls, *args, **kwargs) -> "MongoDBDAO": - """Singleton creator for MongoDBDAO.""" - # Check if an instance already exists - if DocumentDBDAO._instance is None: - DocumentDBDAO._instance = super(MongoDBDAO, cls).__new__(cls) - return DocumentDBDAO._instance + # def __new__(cls, *args, **kwargs) -> "MongoDBDAO": + # """Singleton creator for MongoDBDAO.""" + # # Check if an instance already exists + # if DocumentDBDAO._instance is None: + # DocumentDBDAO._instance = super(MongoDBDAO, cls).__new__(cls) + # return DocumentDBDAO._instance def __init__(self, create_indices=MONGO_CREATE_INDEX): - if not hasattr(self, "_initialized"): - from flowcept.configs import ( - MONGO_HOST, - MONGO_PORT, - MONGO_DB, - MONGO_URI, - ) + #if not hasattr(self, "_initialized"): + from flowcept.configs import ( + MONGO_HOST, + MONGO_PORT, + MONGO_DB, + MONGO_URI, + ) - self._initialized = True - self.logger = FlowceptLogger() + self._initialized = True + self.logger = FlowceptLogger() - if MONGO_URI is not None: - self._client = MongoClient(MONGO_URI) - else: - self._client = MongoClient(MONGO_HOST, MONGO_PORT) - self._db = self._client[MONGO_DB] + if MONGO_URI is not None: + self._client = MongoClient(MONGO_URI) + else: + self._client = MongoClient(MONGO_HOST, MONGO_PORT) + self._db = self._client[MONGO_DB] - self._tasks_collection = self._db["tasks"] - self._wfs_collection = self._db["workflows"] - self._obj_collection = self._db["objects"] + self._tasks_collection = self._db["tasks"] + self._wfs_collection = self._db["workflows"] + self._obj_collection = self._db["objects"] - if create_indices: - self._create_indices() + if create_indices: + self._create_indices() def _create_indices(self): # Creating task collection indices: diff --git a/src/flowcept/flowcept_api/flowcept_controller.py b/src/flowcept/flowcept_api/flowcept_controller.py index c14665e6..56d4b4b4 100644 --- a/src/flowcept/flowcept_api/flowcept_controller.py +++ b/src/flowcept/flowcept_api/flowcept_controller.py @@ -9,7 +9,7 @@ ) from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao -from flowcept.configs import MQ_INSTANCES, INSTRUMENTATION_ENABLED, DATABASES, MONGO_ENABLED +from flowcept.configs import MQ_INSTANCES, INSTRUMENTATION_ENABLED, MONGO_ENABLED from flowcept.flowcept_api.db_api import DBAPI from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor @@ -54,12 +54,12 @@ def __init__( bundle_exec_id - A way to group interceptors. - enable_persistence - Whether you want to persist the messages in one of the DBs defined in + start_persistence - Whether you want to persist the messages in one of the DBs defined in the `databases` settings. """ self.logger = FlowceptLogger() self._enable_persistence = start_persistence - self._db_inserters: List = [] # TODO: typing + self._db_inserters: List = [] if bundle_exec_id is None: self._bundle_exec_id = id(self) else: @@ -131,17 +131,12 @@ def start(self): def _init_persistence(self, mq_host=None, mq_port=None): from flowcept.flowceptor.consumers.document_inserter import DocumentInserter - - for database in DATABASES: - if DATABASES[database].get("enabled", False): - self._db_inserters.append( - DocumentInserter( - check_safe_stops=True, - bundle_exec_id=self._bundle_exec_id, - mq_host=mq_host, - mq_port=mq_port, - ).start() - ) + self._db_inserters.append(DocumentInserter( + check_safe_stops=True, + bundle_exec_id=self._bundle_exec_id, + mq_host=mq_host, + mq_port=mq_port, + ).start()) def stop(self): """Stop it.""" @@ -165,8 +160,8 @@ def stop(self): interceptor.stop() if len(self._db_inserters): self.logger.info("Stopping DB Inserters...") - for database_inserter in self._db_inserters: - database_inserter.stop(bundle_exec_id=self._bundle_exec_id) + for db_inserter in self._db_inserters: + db_inserter.stop(bundle_exec_id=self._bundle_exec_id) self.is_started = False self.logger.debug("All stopped!") diff --git a/src/flowcept/flowceptor/consumers/document_inserter.py b/src/flowcept/flowceptor/consumers/document_inserter.py index 81d4df71..5eb1fd9e 100644 --- a/src/flowcept/flowceptor/consumers/document_inserter.py +++ b/src/flowcept/flowceptor/consumers/document_inserter.py @@ -110,17 +110,16 @@ def _set_buffer_size(self): def flush_function(buffer, doc_daos, logger): """Flush it.""" logger.info( - f"Current Doc buffer size: {len(buffer)}, " f"Gonna flush {len(buffer)} msgs to DocDB!" + f"Current Doc buffer size: {len(buffer)}, " f"Gonna flush {len(buffer)} msgs to DocDBs!" ) for dao in doc_daos: dao.insert_and_update_many_tasks(buffer, TaskObject.task_id_field()) logger.debug( - f"DocDao={id(dao)}; Flushed {len(buffer)} msgs to DocDB!" + f"DocDao={id(dao)},DocDaoClass={dao.__class__.__name__};" + f" Flushed {len(buffer)} msgs to this DocDB!" ) # TODO: add name def _handle_task_message(self, message: Dict): - # if DEBUG_MODE: - # message["debug"] = True if "task_id" not in message: message["task_id"] = str(uuid4()) @@ -135,7 +134,7 @@ def _handle_task_message(self, message: Dict): message.pop("type") self.logger.debug( - f"Received following msg in DocInserter:" f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" + f"Received following Task msg in DocInserter:" f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" ) if REMOVE_EMPTY_FIELDS: remove_empty_fields_from_dict(message) @@ -145,7 +144,7 @@ def _handle_task_message(self, message: Dict): def _handle_workflow_message(self, message: Dict): message.pop("type") self.logger.debug( - f"Received following msg in DocInserter:" f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" + f"Received following Workflow msg in DocInserter: \n\t[BEGIN_MSG]{message}\n[END_MSG]\t" ) if REMOVE_EMPTY_FIELDS: remove_empty_fields_from_dict(message) diff --git a/src/flowcept/instrumentation/decorators/__init__.py b/src/flowcept/instrumentation/decorators/__init__.py deleted file mode 100644 index 57aafba8..00000000 --- a/src/flowcept/instrumentation/decorators/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Decorators subpackage.""" diff --git a/src/flowcept/instrumentation/decorators/flowcept_loop.py b/src/flowcept/instrumentation/flowcept_loop.py similarity index 100% rename from src/flowcept/instrumentation/decorators/flowcept_loop.py rename to src/flowcept/instrumentation/flowcept_loop.py diff --git a/src/flowcept/instrumentation/decorators/flowcept_task.py b/src/flowcept/instrumentation/flowcept_task.py similarity index 96% rename from src/flowcept/instrumentation/decorators/flowcept_task.py rename to src/flowcept/instrumentation/flowcept_task.py index 1974e80f..1e91867a 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_task.py +++ b/src/flowcept/instrumentation/flowcept_task.py @@ -85,7 +85,8 @@ def wrapper(*args, **kwargs): result = func(*args, **kwargs) task_dict = dict( type="task", - # workflow_id=kwargs.pop("workflow_id", None), + # User must explicitly set workflow_id in kwargs to reduce overhead finding for it + #workflow_id=kwargs.pop("workflow_id", None), activity_id=func.__name__, used=kwargs, generated=result, diff --git a/src/flowcept/instrumentation/decorators/flowcept_torch.py b/src/flowcept/instrumentation/flowcept_torch.py similarity index 99% rename from src/flowcept/instrumentation/decorators/flowcept_torch.py rename to src/flowcept/instrumentation/flowcept_torch.py index 771df144..a0c64ea4 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_torch.py +++ b/src/flowcept/instrumentation/flowcept_torch.py @@ -199,7 +199,8 @@ def _our_parent_forward(self, *args, **kwargs): "parent_task_id": self._parent_task_id, # "custom_metadata": {"subtype": "parent_forward"}, "type": "task", - "status": "FINISHED", # it's ok. if an error happens, it will break before sending it + # Following is ok. if an error happens, it will break before sending it + "status": "FINISHED", } y = super().forward(*args, **kwargs) forward_task["generated"] = _inspect_torch_tensor(y) diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index b3237a8a..a95f400d 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -6,17 +6,15 @@ import pandas as pd from time import time, sleep -import flowcept.instrumentation.decorators from flowcept import Flowcept - +import flowcept import unittest from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until from flowcept.commons.vocabulary import Status -from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor -from flowcept.instrumentation.decorators.flowcept_loop import FlowceptLoop -from flowcept.instrumentation.decorators.flowcept_task import ( +from flowcept.instrumentation.flowcept_loop import FlowceptLoop +from flowcept.instrumentation.flowcept_task import ( flowcept_task, lightweight_flowcept_task, ) @@ -189,13 +187,16 @@ def test_lightweight_decorated_function(self): lightweight_decorated_static_function2(workflow_id=workflow_id) lightweight_decorated_static_function3(x=0.1, workflow_id=workflow_id) - sleep(3) + sleep(1) assert assert_by_querying_tasks_until( filter={"workflow_id": workflow_id}, condition_to_evaluate=lambda docs: len(docs) == 3, max_time=60, - max_trials=60, + max_trials=30, ) + tasks = Flowcept.db.query({"workflow_id": workflow_id}) + for t in tasks: + assert t["task_id"] def test_decorated_function(self): # Compare this with the test_lightweight_decorated_function; @@ -379,7 +380,6 @@ def test_flowcept_loop_generator(self): epochs = range(0, number_of_epochs) with Flowcept(): loop = FlowceptLoop(items=epochs, loop_name="epochs", item_name="epoch") - loop._interceptor = InstrumentationInterceptor.get_instance().start(id(self)) for e in loop: sleep(0.05) loss = random.random() diff --git a/tests/decorator_tests/ml_tests/dl_trainer.py b/tests/decorator_tests/ml_tests/dl_trainer.py index ea4e1e89..a2aace15 100644 --- a/tests/decorator_tests/ml_tests/dl_trainer.py +++ b/tests/decorator_tests/ml_tests/dl_trainer.py @@ -12,7 +12,7 @@ import threading -from flowcept.instrumentation.decorators.flowcept_torch import flowcept_torch +from flowcept import flowcept_torch thread_state = threading.local() diff --git a/tests/decorator_tests/ml_tests/ml_decorator_test.py b/tests/decorator_tests/ml_tests/ml_decorator_test.py index ded171d3..7463b010 100644 --- a/tests/decorator_tests/ml_tests/ml_decorator_test.py +++ b/tests/decorator_tests/ml_tests/ml_decorator_test.py @@ -1,11 +1,9 @@ -import uuid - import unittest from torch import nn from flowcept import Flowcept -from flowcept.configs import MONGO_ENABLED, INSTRUMENTATION +from flowcept.configs import MONGO_ENABLED from tests.decorator_tests.ml_tests.dl_trainer import ModelTrainer, MyNet