Skip to content

Commit

Permalink
Typo
Browse files Browse the repository at this point in the history
  • Loading branch information
renan-souza committed Dec 17, 2024
1 parent 40f4952 commit 8128207
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ echo "Using examples directory: $EXAMPLES_DIR"
echo "With Mongo? ${WITH_MONGO}"

# Define the test cases
tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard" "llm_complex/llm_search")
tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard" "llm_complex/llm_main")

# Iterate over the tests and run them
for test_ in "${tests[@]}"; do
Expand Down
56 changes: 56 additions & 0 deletions examples/instrumented_loop_unmanaged_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import multiprocessing
import random
from time import sleep

from flowcept import Flowcept, FlowceptLoop

if __name__ == '__main__': #

interceptor_id = Flowcept.start_instrumentation_interceptor()

event = multiprocessing.Event()
process1 = multiprocessing.Process(target=Flowcept.start_persistence, args=(interceptor_id, event))
process1.start()
sleep(1)
# Run loop
loop = FlowceptLoop(range(max_iterations := 3), workflow_id=interceptor_id)
for item in loop:
loss = random.random()
sleep(0.05)
print(item, loss)
# The following is optional, in case you want to capture values generated inside the loop.
loop.end_iter({"item": item, "loss": loss})

Flowcept.stop_instrumentation_interceptor()

event.set()
process1.join()

docs = Flowcept.db.query(filter={"workflow_id": interceptor_id})
for d in docs:
print(d)
# assert len(docs) == max_iterations+1 # The whole loop itself is a task

#
#
# @staticmethod
# def start_instrumentation_interceptor():
# instance = InstrumentationInterceptor.get_instance()
# instance_id = id(instance)
# instance.start(bundle_exec_id=instance_id)
# return instance_id
#
# @staticmethod
# def stop_instrumentation_interceptor():
# instance = InstrumentationInterceptor.get_instance()
# instance.stop()
#
# @staticmethod
# def start_persistence(interceptor_id, event):
# from flowcept.flowceptor.consumers.document_inserter import DocumentInserter
# inserter = DocumentInserter(
# check_safe_stops=True,
# bundle_exec_id=interceptor_id,
# ).start()
# event.wait()
# inserter.stop()
26 changes: 0 additions & 26 deletions examples/llm_complex/llm_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,6 @@ def generate_configs(params):
return result




# def dataprep_workflow(tokenizer_type="basic_english", subset_size=None, campaign_id=None):
# from flowcept import WorkflowObject
# config = {
# "subset_size": subset_size,
# "tokenizer": tokenizer_type
# }
# dataset_prep_wf = WorkflowObject()
# dataset_prep_wf.used = config
# dataset_prep_wf.campaign_id = campaign_id
# dataset_prep_wf.name = "generate_wikitext_dataset"
# ntokens, train_data, val_data, test_data = get_wiki_text_dataset(tokenizer_type=tokenizer_type,
# subset_size=subset_size)
# dataset_prep_wf.generated = {
# "ntokens": ntokens,
# "dataset_ref": get_dataset_ref(campaign_id, train_data, val_data, test_data),
# "train_data_shape": list(train_data.shape),
# "val_data_shape": list(val_data.shape),
# "test_data_shape": list(test_data.shape),
# }
# Flowcept.db.insert_or_update_workflow(dataset_prep_wf)
# print(dataset_prep_wf)
# return dataset_prep_wf.workflow_id, ntokens, train_data, val_data, test_data


def search_workflow(ntokens, input_data_dir, dataset_ref, exp_param_settings, max_runs, campaign_id=None):
cluster = LocalCluster(n_workers=1)
scheduler = cluster.scheduler
Expand Down

0 comments on commit 8128207

Please sign in to comment.