Skip to content

Commit

Permalink
Changes in DBDAO and modularizing llm wf
Browse files Browse the repository at this point in the history
  • Loading branch information
renan-souza committed Dec 17, 2024
1 parent 524bde1 commit 40f4952
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ time.txt
tmp/
deployment/data
**/*output_data*

examples/llm_complex/input_data
149 changes: 149 additions & 0 deletions examples/llm_complex/llm_dataprep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from time import time

import torch
import os

from torch.utils.data import Subset
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from datasets import load_dataset


# Define a function to yield tokens from the dataset
def yield_tokens(tokenizer, data_iter):
for item in data_iter:
if len(item["text"]):
yield tokenizer(item["text"])


# Define a function to process the raw text and convert it to tensors
def data_process(tokenizer, vocab, raw_text_iter):
data = [
torch.tensor(
[vocab[token] for token in tokenizer(item["text"])],
dtype=torch.long,
)
for item in raw_text_iter
]
return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))


def get_dataset_ref(campaign_id, train_data, val_data, test_data):
dataset_ref = f"{campaign_id}_train_shape_{train_data.shape}_val_shape_{val_data.shape}_test_shape_{test_data.shape}"
return dataset_ref

def get_wiki_text_dataset(data_dir):
# Load the WikiText2 dataset
t0 = time()
train_data = torch.load(os.path.join(data_dir, "train_data.tensor"))
val_data = torch.load(os.path.join(data_dir, "val_data.tensor"))
test_data = torch.load(os.path.join(data_dir, "test_data.tensor"))
t1 = time()
t_disk_load = t1 - t0

try:
if torch.cuda.is_available():
device = torch.device("gpu")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")

t2 = time()
t_device_available = t2 - t1
train_data = train_data.to(device)
val_data = val_data.to(device)
test_data = test_data.to(device)
t_gpu_load = time() - t2
except:
raise Exception("Couldn't send data to device")

print("Train data", train_data.shape)
print("Validation data", val_data.shape)
print("Test data", test_data.shape)
return (
train_data,
val_data,
test_data,
t_disk_load,
t_device_available,
t_gpu_load,
)

def save_workflow(ntokens, train_data, val_data, test_data, dataset_ref, subset_size=None, tokenizer_type=None, campaign_id=None):
from flowcept import WorkflowObject, Flowcept
config = {
"subset_size": subset_size,
"tokenizer_type": tokenizer_type
}
dataset_prep_wf = WorkflowObject()
dataset_prep_wf.used = config
dataset_prep_wf.campaign_id = campaign_id
dataset_prep_wf.name = "generate_wikitext_dataset"

dataset_prep_wf.generated = {
"ntokens": ntokens,
"dataset_ref": dataset_ref,
"train_data_shape": list(train_data.shape),
"val_data_shape": list(val_data.shape),
"test_data_shape": list(test_data.shape),
}
Flowcept.db.insert_or_update_workflow(dataset_prep_wf)
print(dataset_prep_wf)
return dataset_prep_wf.workflow_id, dataset_ref


def dataprep_workflow(data_dir="input_data",
tokenizer_type="basic_english", # spacy, moses, toktok, revtok, subword
subset_size=None,
campaign_id=None):

os.makedirs(data_dir, exist_ok=True)

print("Downloading dataset")
dataset = load_dataset("wikitext", "wikitext-2-v1")
print("Ok, now saving it into the current directory")
dataset.save_to_disk(os.path.join(data_dir, "wikitext-2-v1.data"))

test_dataset = dataset["test"]
train_dataset = dataset["train"]
validation_dataset = dataset["validation"]

if subset_size is not None and subset_size > 0:
test_dataset = Subset(test_dataset, range(subset_size))
train_dataset = Subset(train_dataset, range(subset_size))
validation_dataset = Subset(validation_dataset, range(subset_size))

# Build the vocabulary from the training dataset
tokenizer = get_tokenizer(tokenizer_type)
vocab = build_vocab_from_iterator(yield_tokens(tokenizer, train_dataset))
vocab.set_default_index(vocab["<unk>"])
ntokens = len(vocab)

# Process the train, validation, and test datasets
train_data = data_process(tokenizer, vocab, train_dataset)
val_data = data_process(tokenizer, vocab, validation_dataset)
test_data = data_process(tokenizer, vocab, test_dataset)

train_data_path = os.path.join(data_dir, "train_data.tensor")
val_data_path = os.path.join(data_dir, "val_data.tensor")
test_data_path = os.path.join(data_dir, "test_data.tensor")

torch.save(train_data, train_data_path)
torch.save(val_data, val_data_path)
torch.save(test_data, test_data_path)

print(f"Saved files in {data_dir}. Now running some asserts.")

train_data_loaded = torch.load(train_data_path)
val_data_loaded = torch.load(val_data_path)
test_data_loaded = torch.load(test_data_path)

assert all(train_data == train_data_loaded)
assert all(val_data == val_data_loaded)
assert all(test_data == test_data_loaded)

dataset_ref = get_dataset_ref(campaign_id, train_data, val_data, test_data)
wf_id = save_workflow(ntokens, train_data, val_data, test_data, dataset_ref, subset_size=subset_size, tokenizer_type=tokenizer_type, campaign_id=campaign_id)
return wf_id, dataset_ref, ntokens

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import torch
from distributed import LocalCluster, Client

from examples.llm_complex.llm_model import model_train, get_wiki_text, TransformerModel
from examples.llm_complex.llm_dataprep import dataprep_workflow
from examples.llm_complex.llm_model import model_train, TransformerModel

from flowcept.configs import MONGO_ENABLED, INSTRUMENTATION
from flowcept import Flowcept
Expand Down Expand Up @@ -68,47 +69,43 @@ def generate_configs(params):
return result


def get_dataset_ref(campaign_id, train_data, val_data, test_data):
dataset_ref = f"{campaign_id}_train_shape_{train_data.shape}_val_shape_{val_data.shape}_test_shape_{test_data.shape}"
return dataset_ref


def dataprep_workflow(tokenizer_type="basic_english", subset_size=None, campaign_id=None):
from flowcept import WorkflowObject
config = {
"subset_size": subset_size,
"tokenizer": tokenizer_type
}
dataset_prep_wf = WorkflowObject()
dataset_prep_wf.used = config
dataset_prep_wf.campaign_id = campaign_id
dataset_prep_wf.name = "generate_wikitext_dataset"
ntokens, train_data, val_data, test_data = get_wiki_text(tokenizer_type=tokenizer_type,
subset_size=subset_size)
dataset_prep_wf.generated = {
"ntokens": ntokens,
"dataset_ref": get_dataset_ref(campaign_id, train_data, val_data, test_data),
"train_data_shape": list(train_data.shape),
"val_data_shape": list(val_data.shape),
"test_data_shape": list(test_data.shape),
}
Flowcept.db.insert_or_update_workflow(dataset_prep_wf)
print(dataset_prep_wf)
return dataset_prep_wf.workflow_id, ntokens, train_data, val_data, test_data


def search_workflow(ntokens, train_data, val_data, test_data, exp_param_settings, max_runs, campaign_id=None):
get_dataset_ref(campaign_id, train_data, val_data, test_data)
# def dataprep_workflow(tokenizer_type="basic_english", subset_size=None, campaign_id=None):
# from flowcept import WorkflowObject
# config = {
# "subset_size": subset_size,
# "tokenizer": tokenizer_type
# }
# dataset_prep_wf = WorkflowObject()
# dataset_prep_wf.used = config
# dataset_prep_wf.campaign_id = campaign_id
# dataset_prep_wf.name = "generate_wikitext_dataset"
# ntokens, train_data, val_data, test_data = get_wiki_text_dataset(tokenizer_type=tokenizer_type,
# subset_size=subset_size)
# dataset_prep_wf.generated = {
# "ntokens": ntokens,
# "dataset_ref": get_dataset_ref(campaign_id, train_data, val_data, test_data),
# "train_data_shape": list(train_data.shape),
# "val_data_shape": list(val_data.shape),
# "test_data_shape": list(test_data.shape),
# }
# Flowcept.db.insert_or_update_workflow(dataset_prep_wf)
# print(dataset_prep_wf)
# return dataset_prep_wf.workflow_id, ntokens, train_data, val_data, test_data


def search_workflow(ntokens, input_data_dir, dataset_ref, exp_param_settings, max_runs, campaign_id=None):
cluster = LocalCluster(n_workers=1)
scheduler = cluster.scheduler
client = Client(scheduler.address)
client.forward_logging()
# Registering Flowcept's worker and scheduler adapters
scheduler.add_plugin(FlowceptDaskSchedulerAdapter(scheduler))
client.register_plugin(FlowceptDaskWorkerAdapter())
dataset_ref = get_dataset_ref(campaign_id, train_data, val_data, test_data)
exp_param_settings["dataset_ref"] = dataset_ref
exp_param_settings["max_runs"] = max_runs
exp_param_settings["input_data_dir"] = input_data_dir
# Registering a Dask workflow in Flowcept's database
search_wf_id = register_dask_workflow(client, used=exp_param_settings,
workflow_name="model_search",
Expand All @@ -117,8 +114,7 @@ def search_workflow(ntokens, train_data, val_data, test_data, exp_param_settings

configs = generate_configs(exp_param_settings)
configs = [
{**c, "ntokens": ntokens, "train_data": train_data, "val_data": val_data,
"test_data": test_data, "workflow_id": search_wf_id, "campaign_id": campaign_id}
{**c, "ntokens": ntokens, "input_data_dir": input_data_dir, "workflow_id": search_wf_id, "campaign_id": campaign_id}
for c in configs
]
# Start Flowcept's Dask observer
Expand All @@ -139,7 +135,7 @@ def main():

_campaign_id = str(uuid.uuid4())
print(f"Campaign id={_campaign_id}")

input_data_dir = "input_data"
tokenizer_type = "basic_english"
subset_size = 10
max_runs = 1
Expand All @@ -156,12 +152,13 @@ def main():
"pos_encoding_max_len": [5000],
}

_dataprep_wf_id, ntokens, train_data, val_data, test_data = dataprep_workflow(
_dataprep_wf_id, dataset_ref, ntokens = dataprep_workflow(
data_dir="input_data",
campaign_id=_campaign_id,
tokenizer_type=tokenizer_type,
subset_size=subset_size)

_search_wf_id = search_workflow(ntokens, train_data, val_data, test_data, exp_param_settings, max_runs, campaign_id=_campaign_id)
_search_wf_id = search_workflow(ntokens, input_data_dir, dataset_ref, exp_param_settings, max_runs, campaign_id=_campaign_id)

return _campaign_id, _dataprep_wf_id, _search_wf_id

Expand Down Expand Up @@ -271,9 +268,8 @@ def run_asserts_and_exports(campaign_id, output_dir="output_data"):
model_args.pop("eval_batch_size", None)
model_args.pop("epochs", None)
model_args.pop("lr", None)
model_args.pop("train_data", None)
model_args.pop("test_data", None)
model_args.pop("val_data", None)
model_args.pop("input_data_dir", None)

loaded_model = TransformerModel(**model_args, save_workflow=False)
doc = Flowcept.db.load_torch_model(loaded_model, best_model_obj_id)
print(doc)
Expand Down
Loading

0 comments on commit 40f4952

Please sign in to comment.