diff --git a/.gitignore b/.gitignore index cbbe3d5f..c45751b3 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ test.py time.txt tmp/ deployment/data +examples/llm_complex/output_data diff --git a/examples/llm_complex/instrumented_llm_dask_example2.py b/examples/llm_complex/instrumented_llm_dask_example2.py deleted file mode 100644 index 430d95f5..00000000 --- a/examples/llm_complex/instrumented_llm_dask_example2.py +++ /dev/null @@ -1,510 +0,0 @@ -# The code in this file is based on: -# https://blog.paperspace.com/build-a-language-model-using-pytorch/ -import math -import sys -import uuid -from functools import wraps -from time import time, sleep -from types import MethodType - -import pandas as pd -import torch -import torch.nn as nn -import torch.optim as optim -from distributed import LocalCluster, Client -from torch.nn import Embedding, Linear, TransformerEncoder, TransformerEncoderLayer, Dropout -from torch.utils.data import Subset -from torchtext.data.utils import get_tokenizer -from torchtext.vocab import build_vocab_from_iterator -from datasets import load_dataset - -from flowcept.commons.vocabulary import Status -from flowcept.configs import N_GPUS, MONGO_ENABLED, INSTRUMENTATION -from flowcept.flowcept_api.flowcept_controller import Flowcept -from flowcept.flowceptor.adapters.dask.dask_plugins import FlowceptDaskSchedulerAdapter, \ - FlowceptDaskWorkerAdapter, register_dask_workflow -from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor -from flowcept.instrumentation.decorators.flowcept_loop import FlowceptLoop - -from flowcept.instrumentation.decorators.flowcept_torch import ( - flowcept_torch, -) -from flowcept.instrumentation.decorators.responsible_ai import model_profiler - -TORCH_CAPTURE = INSTRUMENTATION.get("torch").get("what") - - -# Define a function to batchify the data -def batchify(data, bsz): - nbatch = data.size(0) // bsz - data = data.narrow(0, 0, nbatch * bsz) - data = data.view(bsz, -1).t().contiguous() - return data - - -# Define a function to yield tokens from the dataset -def yield_tokens(tokenizer, data_iter): - for item in data_iter: - if len(item["text"]): - yield tokenizer(item["text"]) - - -# Define a function to process the raw text and convert it to tensors -def data_process(tokenizer, vocab, raw_text_iter): - data = [ - torch.tensor( - [vocab[token] for token in tokenizer(item["text"])], - dtype=torch.long, - ) - for item in raw_text_iter - ] - return torch.cat(tuple(filter(lambda t: t.numel() > 0, data))) - - -def get_batch(source, i, bptt=35): - seq_len = min(bptt, len(source) - 1 - i) - data = source[i : i + seq_len] - target = source[i + 1 : i + 1 + seq_len].view(-1) - return data, target - - -def get_wiki_text( - tokenizer_type="basic_english", - subset_size=None -): # spacy, moses, toktok, revtok, subword - # Load the WikiText2 dataset - dataset = load_dataset("wikitext", "wikitext-2-v1") - test_dataset = dataset["test"] - train_dataset = dataset["train"] - validation_dataset = dataset["validation"] - - if subset_size is not None and subset_size > 0: - test_dataset = Subset(test_dataset, range(subset_size)) - train_dataset = Subset(train_dataset, range(subset_size)) - validation_dataset = Subset(validation_dataset, range(subset_size)) - - # Build the vocabulary from the training dataset - tokenizer = get_tokenizer(tokenizer_type) - vocab = build_vocab_from_iterator(yield_tokens(tokenizer, train_dataset)) - vocab.set_default_index(vocab[""]) - ntokens = len(vocab) - - # Process the train, validation, and test datasets - train_data = data_process(tokenizer, vocab, train_dataset) - val_data = data_process(tokenizer, vocab, validation_dataset) - test_data = data_process(tokenizer, vocab, test_dataset) - - device = torch.device("cpu") - if torch.cuda.is_available(): - device = torch.device("gpu") - elif torch.backends.mps.is_available(): - device = torch.device("mps") - - if device.type != 'cpu': - train_data = train_data.to(device) - val_data = val_data.to(device) - test_data = test_data.to(device) - - print("Train data", train_data.shape) - print("Validation data", val_data.shape) - print("Test data", test_data.shape) - return ntokens, train_data, val_data, test_data - - -@flowcept_torch -class TransformerModel(nn.Module): - - def __init__( - self, - ntokens, - emsize, - nhead, - nhid, - nlayers, - dropout=0.5, - pos_encoding_max_len=5000, - parent_task_id=None, # These arguments seem unused but are used in the wrapper - parent_workflow_id=None, - custom_metadata: dict = None, - get_profile: bool = False, - save_workflow: bool = True, - *args, - **kwargs - ): - super().__init__(*args, **kwargs) - self.model_type = "Transformer" - self.src_mask = None - self.pos_encoder = PositionalEncoding( - emsize, - dropout, - max_len=pos_encoding_max_len, - ) - encoder_layers = TransformerEncoderLayer(emsize, nhead, nhid, dropout) - self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers) - self.encoder = Embedding(ntokens, emsize) - self.decoder = Linear(emsize, ntokens) - self.d_model = emsize - - # ##Generate a mask for the input sequence - def _generate_square_subsequent_mask(self, sz): - mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) - ## Change all the zeros to negative infinity and all the ones to zeros as follows: - mask = mask.float().masked_fill(mask == 0, float("-inf")).masked_fill(mask == 1, float(0.0)) - return mask - - def forward(self, src): - if self.src_mask is None or self.src_mask.size(0) != len(src): - device = src.device - mask = self._generate_square_subsequent_mask(len(src)).to(device) - self.src_mask = mask - - src = self.encoder(src) * math.sqrt(self.d_model) - src = self.pos_encoder(src) - output = self.transformer_encoder(src, self.src_mask) - output = self.decoder(output) - return output - -# Define the PositionalEncoding class -class PositionalEncoding(nn.Module): - def __init__( - self, - emsize, - dropout=0.1, - max_len=5000, - ): - super(PositionalEncoding, self).__init__() - self.dropout = Dropout(p=dropout) - - pe = torch.zeros(max_len, emsize) - position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) - div_term = torch.exp(torch.arange(0, emsize, 2).float() * (-math.log(10000.0) / emsize)) - pe[:, 0::2] = torch.sin(position * div_term) - pe[:, 1::2] = torch.cos(position * div_term) - pe = pe.unsqueeze(0).transpose(0, 1) - self.register_buffer("pe", pe) - - def forward(self, x): - x = x + self.pe[: x.size(0), :] - return self.dropout(x) - -def train_epoch(ntokens, model, train_data, criterion, optimizer, bptt=35): - model.train() # Set the model to training mode - total_loss = 0.0 # Initialize the total loss to 0 - - # Iterate through the mini-batches of data - for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)): - data, targets = get_batch( - train_data, i, bptt - ) # Get the input data and targets for the current mini-batch - optimizer.zero_grad() # Reset the gradients to zero before the next backward pass - output = model(data) # Forward pass: compute the output of the model given the input data - - loss = criterion( - output.view(-1, ntokens), targets - ) # Calculate the loss between the model output and the targets - loss.backward() # Backward pass: compute the gradients of the loss with respect to the model parameters - optimizer.step() # Update the model parameters using the computed gradients - total_loss += loss.item() # Accumulate the total loss - - return total_loss / (batch + 1) # Return the average loss per mini-batch - - -def evaluate(ntokens, model, data_source, criterion, bptt=35): - model.eval() # Set the model to evaluation mode - total_loss = 0.0 # Initialize the total loss to 0 - - # Use torch.no_grad() to disable gradient calculation during evaluation - with torch.no_grad(): - # Iterate through the mini-batches of data - for i in range(0, data_source.size(0) - 1, bptt): - data, targets = get_batch( - data_source, i, bptt - ) # Get the input data and targets for the current mini-batch - output = model( - data - ) # Forward pass: compute the output of the model given the input data - loss = criterion( - output.view(-1, ntokens), targets - ) # Calculate the loss between the model output and the targets - total_loss += loss.item() # Accumulate the total loss - - return total_loss / (i + 1) # Return the average loss per mini-batch - - -def model_train( - ntokens, - train_data, - val_data, - test_data, - batch_size, - eval_batch_size, - epochs, - emsize, - nhead, - nhid, - nlayers, - dropout, - lr, - pos_encoding_max_len, - workflow_id=None, - *args, - **kwargs -): - try: - from distributed.worker import thread_state - main_task_id = thread_state.key if hasattr(thread_state, "key") else None - except: - main_task_id = None - torch.manual_seed(0) # TODO: parametrize and save it - #from distributed.worker import thread_state - #dask_task_id = thread_state.key if hasattr(thread_state, "key") else None - train_data = batchify(train_data, batch_size) - val_data = batchify(val_data, eval_batch_size) - test_data = batchify(test_data, eval_batch_size) - - if torch.cuda.is_available(): - device = torch.device("gpu") - elif torch.backends.mps.is_available(): - device = torch.device("mps") - else: - device = torch.device("cpu") - - model = TransformerModel( - ntokens, - emsize, - nhead, - nhid, - nlayers, - dropout, - pos_encoding_max_len, - parent_workflow_id=workflow_id, - get_profile=True, - custom_metadata={"model_step": "train", "cuda_visible": N_GPUS}, - ).to(device) - criterion = nn.CrossEntropyLoss() - optimizer = optim.Adam(model.parameters(), lr=lr) - best_val_loss = float("inf") # Initialize the best validation loss to infinity - # Iterate through the epochs - t0 = time() - - epochs_loop = FlowceptLoop(range(1, epochs + 1), "epochs_loop", "epoch", parent_task_id=main_task_id) - for epoch in epochs_loop: - print(f"Starting training for epoch {epoch}/{epochs}") - # Train the model on the training data and calculate the training loss - model.set_parent_task_id(epochs_loop.current_iteration_task.get("task_id", None)) - train_loss = train_epoch(ntokens, model, train_data, criterion, optimizer, batch_size) - - # Evaluate the model on the validation data and calculate the validation loss - val_loss = evaluate(ntokens, model, val_data, criterion, eval_batch_size) - - # Print the training and validation losses for the current epoch - print(f"Epoch: {epoch}, Train loss: {train_loss:.2f}, Validation loss: {val_loss:.2f}") # TODO revisit loop because var epoch here is none? - - # If the validation loss has improved, save the model's state - if val_loss < best_val_loss: - best_val_loss = val_loss - torch.save(model.state_dict(), "transformer_wikitext2.pth") - - best_obj_id = Flowcept.db.save_torch_model(model, task_id=epochs_loop.current_iteration_task.get("task_id", None), workflow_id=workflow_id, custom_metadata={"best_val_loss": best_val_loss}) - - epochs_loop.end_iter({"train_loss": train_loss, "val_loss": val_loss}) - - print("Finished training") - t1 = time() - - # Load the best model's state - best_m = TransformerModel( - ntokens, - emsize, - nhead, - nhid, - nlayers, - dropout, - parent_workflow_id=workflow_id, - custom_metadata={ - "model_step": "test", - "cuda_visible": N_GPUS, - }, - parent_task_id=main_task_id - ).to(device) - print("Loading model") # TODO: load from db - torch_loaded = torch.load("transformer_wikitext2.pth") - best_m.load_state_dict(torch_loaded) - #best_m.update_parent_task_id(task_id) - print("Evaluating") - # Evaluate the best model on the test dataset - test_loss = evaluate(ntokens, best_m, test_data, criterion, eval_batch_size) - print(f"Test loss: {test_loss:.2f}") - with open("time.txt", "w") as f: - f.write(str(t1 - t0)) - return { - "test_loss": test_loss, - "train_loss": train_loss, - "val_loss": val_loss, - "training_time": t1 - t0, - "best_obj_id": best_obj_id - } - -if __name__ == "__main__": - - if not MONGO_ENABLED: - print("This test is only available if Mongo is enabled.") - sys.exit(0) - - subset_size = None - EPOCHS = 1 - tokenizer_type = "basic_english" - conf = { - "batch_size": 20, - "eval_batch_size": 10, - "emsize": 200, - "nhid": 200, - "nlayers": 2, # 2 - "nhead": 2, - "dropout": 0.2, - "epochs": EPOCHS, - "lr": 0.1, - "pos_encoding_max_len": 5000, - } - - ntokens, train_data, val_data, test_data = get_wiki_text(tokenizer_type=tokenizer_type, subset_size=subset_size) - conf["ntokens"] = ntokens - conf_prov = conf.copy() - conf_prov["tokenizer_type"] = tokenizer_type - conf_prov["subset_size"] = subset_size - conf.update({ - "train_data": train_data, - "val_data": val_data, - "test_data": test_data, - }) - - confs = [] - confs.append(conf) - MAX_RUNS = 1 - - cluster = LocalCluster(n_workers=1) - scheduler = cluster.scheduler - client = Client(scheduler.address) - - client.forward_logging() - - # Registering Flowcept's worker and scheduler adapters - scheduler.add_plugin(FlowceptDaskSchedulerAdapter(scheduler)) - client.register_plugin(FlowceptDaskWorkerAdapter()) - - # Registering a Dask workflow in Flowcept's database - main_wf_id = register_dask_workflow(client, used=conf_prov) - print(f"workflow_id={main_wf_id}") - - # Start Flowcept's Dask observer - with Flowcept("dask") as f: - for conf in confs[:MAX_RUNS]: # Edit here to enable more runs - conf["workflow_id"] = main_wf_id - t = client.submit(model_train, **conf) - print(t.result()) - - print("Done main loop. Closing dask...") - client.close() # This is to avoid generating errors - cluster.close() # These calls are needed closeouts to inform of workflow conclusion. - print("Closed Dask. Closing Flowcept...") - - print("Closed.") - print("Now running all asserts...") - """ - So far, this works as follows: - Workflows: - Main workflow -> - Module Layer Forward Train Workflow - Module Layer Forward Test Workflow - - Tasks: - Main workflow . Main model_train task -> - Main workflow . Epochs Whole Loop - Main workflow . Loop Iteration Task - Module Layer Forward Train Workflow . Parent module forward tasks - Module Layer Forward Train Workflow . Children modules forward - Module Layer Forward Test Workflow . Parent module forward tasks - Module Layer Forward Test Workflow . Children modules forward tasks - """ - workflows_data = [] - main_wf = Flowcept.db.query({"workflow_id": main_wf_id}, collection="workflows")[0] - assert main_wf["used"]["subset_size"] == subset_size - workflows_data.append(main_wf) - n_tasks_expected = 0 - model_train_tasks = Flowcept.db.query({"workflow_id": main_wf_id, "activity_id": "model_train"}) - assert len(model_train_tasks) == MAX_RUNS - for t in model_train_tasks: - n_tasks_expected += 1 - assert t["status"] == Status.FINISHED.value - - whole_loop = Flowcept.db.query({"parent_task_id": t["task_id"], "custom_metadata.subtype": "whole_loop"})[0] - assert whole_loop["status"] == Status.FINISHED.value - n_tasks_expected += 1 - iteration_tasks = Flowcept.db.query({"parent_task_id": whole_loop["task_id"], "activity_id": "epochs_loop_iteration"}) - assert len(iteration_tasks) == EPOCHS # number of epochs - - iteration_ids = set() - for iteration_task in iteration_tasks: - n_tasks_expected += 1 - iteration_ids.add(iteration_task["task_id"]) - assert iteration_task["status"] == Status.FINISHED.value - - if "parent" in TORCH_CAPTURE: - - parent_module_wfs = Flowcept.db.query({"parent_workflow_id": main_wf_id}, collection="workflows") - assert len(parent_module_wfs) == 2 # train and test - - for parent_module_wf in parent_module_wfs: - workflows_data.append(parent_module_wf) - parent_module_wf_id = parent_module_wf["workflow_id"] - - parent_forwards = Flowcept.db.query({"workflow_id": parent_module_wf_id, "activity_id": "TransformerModel"}) - - assert len(parent_forwards) - - for parent_forward in parent_forwards: - n_tasks_expected += 1 - assert parent_forward["workflow_id"] == parent_module_wf_id - assert parent_forward["used"] - assert parent_forward["generated"] - assert parent_forward["status"] == Status.FINISHED.value - if parent_module_wf['custom_metadata']['model_step'] == 'test': - assert parent_forward["parent_task_id"] == t["task_id"] - elif parent_module_wf['custom_metadata']['model_step'] == 'train': - assert parent_module_wf["custom_metadata"]["model_profile"] - assert parent_forward["parent_task_id"] in iteration_ids # TODO: improve to test exact value - - if "children" in TORCH_CAPTURE: - children_forwards = Flowcept.db.query({"parent_task_id": parent_forward["task_id"]}) - assert len(children_forwards) == 4 # there are four children submodules - for child_forward in children_forwards: - n_tasks_expected += 1 - assert child_forward["status"] == Status.FINISHED.value - assert child_forward["workflow_id"] == parent_module_wf_id - - # TODO: The wrapper is conflicting with the init arguments, that's why we need to copy & remove extra args. Needs to debug to improve. - model_args = t["used"].copy() - model_args.pop("batch_size", None) - model_args.pop("eval_batch_size", None) - model_args.pop("epochs", None) - model_args.pop("lr", None) - model_args.pop("train_data", None) - model_args.pop("test_data", None) - model_args.pop("val_data", None) - - loaded_model = TransformerModel(**model_args, save_workflow=False) - - loaded_model, doc = Flowcept.db.load_torch_model(loaded_model, t["generated"]["best_obj_id"]) - print("Best model in this configs", doc) - - print("Exporting data to JSON on disk.") - tasks_data = Flowcept.db.get_tasks_recursive(main_wf_id) - assert len(tasks_data) == n_tasks_expected - assert len(workflows_data) - - df_tasks = pd.DataFrame(tasks_data) - df_workflows = pd.DataFrame(workflows_data) - df_tasks.to_json("tasks.json", orient="records", lines=True) - df_workflows.to_json("workflows.json", orient="records", lines=True) - print("Alright! Congrats.") - diff --git a/pyproject.toml b/pyproject.toml index 5224a6ab..4f3c6961 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,8 @@ dependencies = [ "py-cpuinfo", "redis", "requests", - "lmdb" + "lmdb", + "pyarrow" ] authors = [{name = "Oak Ridge National Laboratory"}] description = "Capture and query workflow provenance data using data observability" diff --git a/resources/sample_settings.yaml b/resources/sample_settings.yaml index 0d22339f..58d8bf75 100644 --- a/resources/sample_settings.yaml +++ b/resources/sample_settings.yaml @@ -25,7 +25,8 @@ telemetry_capture: instrumentation: enabled: true torch: - mode: telemetry_and_tensor_inspection # tensor_inspection, telemetry, telemetry_and_tensor_inspection, full, ~ + what: parent_and_children # parent_only, parent_and_children, ~ + children_mode: lightweight # lightweight, tensor_inspection, telemetry, telemetry_and_tensor_inspection save_models: True experiment: @@ -75,7 +76,7 @@ databases: path: flowcept_lmdb mongodb: - enabled: false + enabled: true host: localhost port: 27017 db: flowcept diff --git a/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py b/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py index 57248389..2711a2ee 100644 --- a/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py +++ b/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py @@ -267,6 +267,14 @@ def dump_to_file(self, collection_name, filter, output_file, export_format, shou """ raise NotImplementedError + @abstractmethod + def get_tasks_recursive(self, workflow_id): + raise NotImplementedError + + @abstractmethod + def dump_tasks_to_file_recursive(self, workflow_id, output_file="tasks.parquet"): + raise NotImplementedError + @abstractmethod def save_object( self, diff --git a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py index e066baa3..b29ba606 100644 --- a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py +++ b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py @@ -309,7 +309,13 @@ def object_query(self, filter): """Query objects collection.""" raise NotImplementedError - def dump_to_file(self, collection_name, filter, output_file, export_format, should_zip): + def get_tasks_recursive(self, workflow_id): + raise NotImplementedError + + def dump_tasks_to_file_recursive(self, workflow_id): + raise NotImplementedError + + def dump_to_file(self, collection, filter, output_file, export_format, should_zip): """Dump data to file.""" raise NotImplementedError diff --git a/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py b/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py index 65ca6a6c..84313631 100644 --- a/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py +++ b/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py @@ -1,5 +1,5 @@ """Document DB interaction module.""" - +import os from typing import List, Dict, Tuple, Any import io import json @@ -9,6 +9,10 @@ import zipfile import pandas as pd +import pyarrow.parquet as pq +import pyarrow as pa +import pymongo + from bson import ObjectId from bson.json_util import dumps from pymongo import MongoClient, UpdateOne @@ -37,6 +41,8 @@ class MongoDBDAO(DocumentDBDAO): various collections (`tasks`, `workflows`, `objects`). """ + pymongo.ASCENDING + def __new__(cls, *args, **kwargs) -> "MongoDBDAO": """Singleton creator for MongoDBDAO.""" # Check if an instance already exists @@ -76,11 +82,15 @@ def _create_indices(self): self._tasks_collection.create_index(TaskObject.task_id_field(), unique=True) if TaskObject.workflow_id_field() not in existing_indices: self._tasks_collection.create_index(TaskObject.workflow_id_field()) + if "parent_task_id" not in existing_indices: + self._tasks_collection.create_index("parent_task_id") # Creating workflow collection indices: existing_indices = [list(x["key"].keys())[0] for x in self._wfs_collection.list_indexes()] if WorkflowObject.workflow_id_field() not in existing_indices: self._wfs_collection.create_index(WorkflowObject.workflow_id_field(), unique=True) + if "parent_workflow_id" not in existing_indices: + self._wfs_collection.create_index("parent_workflow_id") # Creating objects collection indices: existing_indices = [list(x["key"].keys())[0] for x in self._obj_collection.list_indexes()] @@ -424,16 +434,16 @@ def to_df(self, collection="tasks", filter=None) -> pd.DataFrame: def dump_to_file( self, - collection_name="tasks", + collection="tasks", filter=None, output_file=None, export_format="json", should_zip=False, ): """Dump it to file.""" - if collection_name == "tasks": + if collection == "tasks": _collection = self._tasks_collection - elif collection_name == "workflows": + elif collection == "workflows": _collection = self._wfs_collection else: msg = "Only tasks and workflows " @@ -443,7 +453,7 @@ def dump_to_file( raise Exception("Sorry, only JSON is currently supported.") if output_file is None: - output_file = f"docs_dump_{collection_name}_{get_utc_now_str()}" + output_file = f"docs_dump_{collection}_{get_utc_now_str()}" output_file += ".zip" if should_zip else ".json" try: @@ -727,26 +737,69 @@ def close(self): setattr(self, "_initialized", False) self._client.close() - def get_tasks_recursive(self, workflow_id): try: result = [] parent_tasks = self._tasks_collection.find({"workflow_id": workflow_id}, projection={"_id": 0}) for parent_task in parent_tasks: result.append(parent_task) - self._get_children_tasks_recursive(parent_task["task_id"], result) + self._get_children_tasks_iterative(parent_task["task_id"], result) return result except Exception as e: - raise Exception(e) # TODO - - def _get_children_tasks_recursive(self, parent_task_id, result): - # Query for tasks with the current parent_task_id - tasks = list(self._tasks_collection.find({"parent_task_id": parent_task_id}, projection={"_id": 0})) + raise Exception(e) - # Add these tasks to the result list - result.extend(tasks) + def dump_tasks_to_file_recursive(self, workflow_id, output_file="tasks.parquet"): + try: + tasks = self.get_tasks_recursive(workflow_id) + + chunk_size = 100_000 + output_dir = "temp_chunks" + os.makedirs(output_dir, exist_ok=True) + # Write chunks to temporary Parquet files + chunk = [] + file_count = 0 + for idx, record in enumerate(tasks): + chunk.append(record) + if (idx + 1) % chunk_size == 0: + df = pd.DataFrame(chunk) + table = pa.Table.from_pandas(df) + pq.write_table(table, f"{output_dir}/chunk_{file_count}.parquet") + file_count += 1 + chunk = [] # Clear the chunk + + # Write remaining rows + if chunk: + df = pd.DataFrame(chunk) + table = pa.Table.from_pandas(df) + pq.write_table(table, f"{output_dir}/chunk_{file_count}.parquet") + + # Merge all chunked files into a single Parquet file + chunk_files = [f"{output_dir}/chunk_{i}.parquet" for i in range(file_count + 1)] + tables = [pq.read_table(f) for f in chunk_files] + merged_table = pa.concat_tables(tables) + pq.write_table(merged_table, output_file) + + # Cleanup temporary files + for f in chunk_files: + os.remove(f) + os.rmdir(output_dir) - # Now, for each child task, recursively find its own children - for task in tasks: - task_id = task["task_id"] - self._get_children_tasks_recursive(task_id, result) + except Exception as e: + self.logger.exception(e) + raise e + def _get_children_tasks_iterative(self, parent_task_id, result, max_iter=9999): # todo revist + stack = [parent_task_id] # Use a stack to manage tasks to process + i = 0 + while stack and i < max_iter: + # Pop the next parent task id + current_parent_id = stack.pop() + + # Query for tasks with the current parent_task_id + tasks = list(self._tasks_collection.find({"parent_task_id": current_parent_id}, projection={"_id": 0})) + + # Add these tasks to the result list + result.extend(tasks) + + # Add the task ids of these tasks to the stack + stack.extend(task["task_id"] for task in tasks) + i += 1 \ No newline at end of file diff --git a/src/flowcept/commons/flowcept_dataclasses/workflow_object.py b/src/flowcept/commons/flowcept_dataclasses/workflow_object.py index 872e02fe..fe36d816 100644 --- a/src/flowcept/commons/flowcept_dataclasses/workflow_object.py +++ b/src/flowcept/commons/flowcept_dataclasses/workflow_object.py @@ -36,7 +36,6 @@ class WorkflowObject: environment_id: str = None sys_name: str = None extra_metadata: str = None - # parent_task_id: str = None used: Dict = None generated: Dict = None diff --git a/src/flowcept/flowcept_api/db_api.py b/src/flowcept/flowcept_api/db_api.py index a3514b47..d5e3209c 100644 --- a/src/flowcept/flowcept_api/db_api.py +++ b/src/flowcept/flowcept_api/db_api.py @@ -14,6 +14,9 @@ class DBAPI(object): """DB API class.""" + ASCENDING = 1 + DESCENDING = -1 + # TODO: consider making all methods static def __init__(self): self.logger = FlowceptLogger() @@ -83,15 +86,23 @@ def task_query( return None return results - def get_tasks_recursive(self, *args, **kwargs): + def get_tasks_recursive(self, workflow_id): try: - return DBAPI._dao.get_tasks_recursive(*args, **kwargs) + return DBAPI._dao.get_tasks_recursive(workflow_id) except Exception as e: - pass + self.logger.exception(e) + raise e + + def dump_tasks_to_file_recursive(self, workflow_id, output_file="tasks.parquet"): + try: + return DBAPI._dao.dump_tasks_to_file_recursive(workflow_id, output_file) + except Exception as e: + self.logger.exception(e) + raise e def dump_to_file( self, - collection_name="tasks", + collection="tasks", filter=None, output_file=None, export_format="json", @@ -105,7 +116,7 @@ def dump_to_file( return False try: DBAPI._dao.dump_to_file( - collection_name, + collection, filter, output_file, export_format, @@ -222,4 +233,4 @@ def load_torch_model(self, model, object_id: str): state_dict = torch.load(buffer, weights_only=True) model.load_state_dict(state_dict) - return model, doc + return doc diff --git a/src/flowcept/flowceptor/adapters/dask/dask_plugins.py b/src/flowcept/flowceptor/adapters/dask/dask_plugins.py index 3a59204f..db69527b 100644 --- a/src/flowcept/flowceptor/adapters/dask/dask_plugins.py +++ b/src/flowcept/flowceptor/adapters/dask/dask_plugins.py @@ -18,6 +18,8 @@ def _set_workflow_on_scheduler( dask_scheduler=None, workflow_id=None, custom_metadata: dict = None, + campaign_id: str = None, + workflow_name: str = None, used: dict = None, ): custom_metadata = custom_metadata or {} @@ -35,12 +37,16 @@ def _set_workflow_on_scheduler( ) wf_obj.custom_metadata = custom_metadata wf_obj.used = used + wf_obj.campaign_id = campaign_id + wf_obj.name = workflow_name setattr(dask_scheduler, "current_workflow", wf_obj) def register_dask_workflow( dask_client: Client, workflow_id=None, + campaign_id=None, + workflow_name=None, custom_metadata: dict = None, used: dict = None, ): @@ -53,6 +59,8 @@ def register_dask_workflow( "workflow_id": workflow_id, "custom_metadata": custom_metadata, "used": used, + "workflow_name": workflow_name, + "campaign_id": campaign_id }, ) return workflow_id diff --git a/src/flowcept/instrumentation/decorators/flowcept_loop.py b/src/flowcept/instrumentation/decorators/flowcept_loop.py index d4c342b6..38c0fff3 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_loop.py +++ b/src/flowcept/instrumentation/decorators/flowcept_loop.py @@ -93,7 +93,7 @@ def __len__(self): return self._max def __next__(self): - self._next_func() + return self._next_func() def _begin_loop(self): self.logger.debug("Capturing loop init.") diff --git a/src/flowcept/instrumentation/decorators/flowcept_torch.py b/src/flowcept/instrumentation/decorators/flowcept_torch.py index 85930b63..c0cae46e 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_torch.py +++ b/src/flowcept/instrumentation/decorators/flowcept_torch.py @@ -6,7 +6,7 @@ import numpy as np from flowcept.commons.utils import replace_non_serializable -from typing import List, Dict +from typing import Dict import uuid import torch @@ -66,7 +66,7 @@ def __init__(self, *args, **kwargs): self._custom_metadata = kwargs.get("custom_metadata", None) self._parent_task_id = kwargs.get("parent_task_id", None) # to be used by forward layers self._parent_workflow_id = kwargs.get("parent_workflow_id", None) - + self._campaign_id = kwargs.get("campaign_id", None) if kwargs.get("save_workflow", True): self._workflow_id = self._register_as_workflow() @@ -128,7 +128,7 @@ def _register_as_workflow(self): if not REGISTER_WORKFLOW: return workflow_obj.workflow_id workflow_obj.name = cls.__name__ - + workflow_obj.campaign_id = self._campaign_id workflow_obj.parent_workflow_id = self._parent_workflow_id _custom_metadata = self._custom_metadata _custom_metadata["workflow_type"] = "TorchModule" diff --git a/tests/api/db_api_test.py b/tests/api/db_api_test.py index d19bcc24..b65bf859 100644 --- a/tests/api/db_api_test.py +++ b/tests/api/db_api_test.py @@ -1,6 +1,8 @@ import unittest from uuid import uuid4 +import pandas as pd + from flowcept.commons.flowcept_dataclasses.task_object import TaskObject from flowcept import Flowcept, WorkflowObject from flowcept.configs import MONGO_ENABLED @@ -94,3 +96,35 @@ def test_dump(self): Flowcept.db._dao.delete_tasks_with_filter(_filter) c1 = Flowcept.db._dao.count_tasks() assert c0 == c1 + + + def test_dump_recursive(self): + workflow_id = "06f7a51e-c700-4a7e-96ec-f16319f77599" + print("Retrieving the data") + tasks_data = Flowcept.db.get_tasks_recursive(workflow_id) + print("Writing parquet file") + # Define the chunk size + chunk_size = 100_000 + + # Output file path + output_file = "output2.parquet" + + # Process and write in chunks + first_chunk = True + chunk = [] + for idx, record in enumerate(tasks_data): + chunk.append(record) + if (idx + 1) % chunk_size == 0: + print(f"writing at {idx}") + df = pd.DataFrame(chunk) + df.to_parquet(output_file, index=False, engine="pyarrow", compression="snappy", + mode="a" if not first_chunk else "w") + first_chunk = False + chunk = [] # Clear the chunk + + # Handle remaining rows + if chunk: + df = pd.DataFrame(chunk) + df.to_parquet(output_file, index=False, engine="pyarrow", compression="snappy", + mode="a") +