From 8128207b0471f19ae2e616253f0f512edfa1c0ad Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 17 Dec 2024 18:48:26 -0500 Subject: [PATCH] Typo --- .github/workflows/run_examples.sh | 2 +- .../instrumented_loop_unmanaged_example.py | 56 +++++++++++++++++++ examples/llm_complex/llm_main.py | 26 --------- 3 files changed, 57 insertions(+), 27 deletions(-) create mode 100644 examples/instrumented_loop_unmanaged_example.py diff --git a/.github/workflows/run_examples.sh b/.github/workflows/run_examples.sh index 739ed0ca..8b1e2845 100644 --- a/.github/workflows/run_examples.sh +++ b/.github/workflows/run_examples.sh @@ -77,7 +77,7 @@ echo "Using examples directory: $EXAMPLES_DIR" echo "With Mongo? ${WITH_MONGO}" # Define the test cases -tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard" "llm_complex/llm_search") +tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard" "llm_complex/llm_main") # Iterate over the tests and run them for test_ in "${tests[@]}"; do diff --git a/examples/instrumented_loop_unmanaged_example.py b/examples/instrumented_loop_unmanaged_example.py new file mode 100644 index 00000000..550311e5 --- /dev/null +++ b/examples/instrumented_loop_unmanaged_example.py @@ -0,0 +1,56 @@ +import multiprocessing +import random +from time import sleep + +from flowcept import Flowcept, FlowceptLoop + +if __name__ == '__main__': # + + interceptor_id = Flowcept.start_instrumentation_interceptor() + + event = multiprocessing.Event() + process1 = multiprocessing.Process(target=Flowcept.start_persistence, args=(interceptor_id, event)) + process1.start() + sleep(1) + # Run loop + loop = FlowceptLoop(range(max_iterations := 3), workflow_id=interceptor_id) + for item in loop: + loss = random.random() + sleep(0.05) + print(item, loss) + # The following is optional, in case you want to capture values generated inside the loop. + loop.end_iter({"item": item, "loss": loss}) + + Flowcept.stop_instrumentation_interceptor() + + event.set() + process1.join() + + docs = Flowcept.db.query(filter={"workflow_id": interceptor_id}) + for d in docs: + print(d) + # assert len(docs) == max_iterations+1 # The whole loop itself is a task + + # + # + # @staticmethod + # def start_instrumentation_interceptor(): + # instance = InstrumentationInterceptor.get_instance() + # instance_id = id(instance) + # instance.start(bundle_exec_id=instance_id) + # return instance_id + # + # @staticmethod + # def stop_instrumentation_interceptor(): + # instance = InstrumentationInterceptor.get_instance() + # instance.stop() + # + # @staticmethod + # def start_persistence(interceptor_id, event): + # from flowcept.flowceptor.consumers.document_inserter import DocumentInserter + # inserter = DocumentInserter( + # check_safe_stops=True, + # bundle_exec_id=interceptor_id, + # ).start() + # event.wait() + # inserter.stop() diff --git a/examples/llm_complex/llm_main.py b/examples/llm_complex/llm_main.py index 38cfe04d..674e58e6 100644 --- a/examples/llm_complex/llm_main.py +++ b/examples/llm_complex/llm_main.py @@ -69,32 +69,6 @@ def generate_configs(params): return result - - -# def dataprep_workflow(tokenizer_type="basic_english", subset_size=None, campaign_id=None): -# from flowcept import WorkflowObject -# config = { -# "subset_size": subset_size, -# "tokenizer": tokenizer_type -# } -# dataset_prep_wf = WorkflowObject() -# dataset_prep_wf.used = config -# dataset_prep_wf.campaign_id = campaign_id -# dataset_prep_wf.name = "generate_wikitext_dataset" -# ntokens, train_data, val_data, test_data = get_wiki_text_dataset(tokenizer_type=tokenizer_type, -# subset_size=subset_size) -# dataset_prep_wf.generated = { -# "ntokens": ntokens, -# "dataset_ref": get_dataset_ref(campaign_id, train_data, val_data, test_data), -# "train_data_shape": list(train_data.shape), -# "val_data_shape": list(val_data.shape), -# "test_data_shape": list(test_data.shape), -# } -# Flowcept.db.insert_or_update_workflow(dataset_prep_wf) -# print(dataset_prep_wf) -# return dataset_prep_wf.workflow_id, ntokens, train_data, val_data, test_data - - def search_workflow(ntokens, input_data_dir, dataset_ref, exp_param_settings, max_runs, campaign_id=None): cluster = LocalCluster(n_workers=1) scheduler = cluster.scheduler