From 73ec85381652bb2a15c09732bf3f103b1f66e8c8 Mon Sep 17 00:00:00 2001 From: Stefan Krawczyk Date: Wed, 6 Mar 2024 21:31:52 -0800 Subject: [PATCH] Adds one possible way to run batch The experiment tracker assumes it's based on inputs, config and overrides. So if we want to batch submit a bunch of input permutations and we want each one tracked, we need to use a distinct driver for each one, hence the Hamilton within Hamilton. This means that we are not able to create a single DAG (driver chaining is a roadmap item) to view it all. Otherwise: - does anything need to be parallel within the inner DAG? A possible feature to help here would be to make @subdag take in distinct adapters... or we implement driver chaining... Anyway let me know what you think. If you want a single DAG we could use parameterized_subdag, but that would come at the cost of visibility with the experiment tracker, unless a whole permutation of inputs is a single experiment. There is always of course modifying more of hamilton to help here. --- example/run_batch1.py | 138 +++++++++++++++++++++++++++--------------- 1 file changed, 89 insertions(+), 49 deletions(-) diff --git a/example/run_batch1.py b/example/run_batch1.py index 8dcbd0f..99e442d 100644 --- a/example/run_batch1.py +++ b/example/run_batch1.py @@ -1,3 +1,7 @@ +""" +Option 1: Use hamilton within Hamilton. +""" + from pathlib import Path import build import eq @@ -7,58 +11,94 @@ from hamilton.io.materialization import to from hamilton.experimental.h_cache import CachingGraphAdapter from hamilton.plugins import h_experiments, matplotlib_extensions, pandas_extensions # noqa: F401 +from hamilton.function_modifiers import tag, value, parameterize from molexp.cmdline import CMDLineExecutionManager from hamilton.execution import executors -tracker_hook = h_experiments.ExperimentTracker( - experiment_name="exp", - base_directory="./experiments", -) - -execution_manager = CMDLineExecutionManager( - executors.SynchronousLocalTaskExecutor(), - executors.MultiThreadingExecutor(20), # max parallelism -) - -dr = ( - driver.Builder() - .with_modules(build, eq, tg) - # .with_config(config) - .with_adapters(tracker_hook, CachingGraphAdapter(".cache")) - .with_execution_manager(execution_manager) - .build() -) - -inputs = dict( - { - # 'work_dir': '' - "repeat_unit": ["N", "M"], - "repeat": 1, - "n_chains": 20, - "density": 0.005, - } -) - -materializers = [ - to.pickle( - id="after_build", - dependencies=["submit"], - path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle", + +# could use @resolve to dynamically create this via passed in configuration. +# this is statically declared here for now. +cross_product = { + f"{ru}x{r}": {"repeat_unit": value(ru), "repeat": value(r)} + for ru in ["NMNMP", "NMNMNMP"] + for r in [1, 4, 8] +} + + +@parameterize(**cross_product) +@tag(cmdline="slurm") +def experiment(repeat_unit: str, repeat: int) -> dict: + """Node to run an experiment.""" + tracker_hook = h_experiments.ExperimentTracker( + experiment_name="exp", + base_directory="./experiments", + ) + + execution_manager = CMDLineExecutionManager( + executors.SynchronousLocalTaskExecutor(), + executors.SynchronousLocalTaskExecutor(), ) -] - -for repeat_unit in ["NMNMP", "NMNMNMP"]: - for repeat in [1, 4, 8]: - inputs["repeat_unit"] = list(repeat_unit) - inputs["repeat"] = repeat - - inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}" - Path(inputs["work_dir"]).mkdir(exist_ok=True) - dr.visualize_materialization( - *materializers, - inputs=inputs, - output_file_path=f"{tracker_hook.run_directory}/dag", - render_kwargs=dict(view=False, format="png"), + + dr = ( + driver.Builder() + .with_modules(build, eq, tg) + # .with_config(config) + .with_adapters(tracker_hook, CachingGraphAdapter(".cache")) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_execution_manager(execution_manager) + .build() + ) + + inputs = dict( + { + "n_chains": 20, + "density": 0.005, + } + ) + + materializers = [ + to.pickle( + id="after_build", + dependencies=["submit"], + path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle", ) - dr.materialize(*materializers, inputs=inputs) + ] + + inputs["repeat_unit"] = list(repeat_unit) + inputs["repeat"] = repeat + + inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}" + Path(inputs["work_dir"]).mkdir(exist_ok=True) + dr.visualize_materialization( + *materializers, + inputs=inputs, + output_file_path=f"{tracker_hook.run_directory}/dag", + render_kwargs=dict(view=False, format="png"), + ) + meta, _ = dr.materialize(*materializers, inputs=inputs) + return meta + + +if __name__ == "__main__": + import run_batch1 + + # tracker_hook = h_experiments.ExperimentTracker( + # experiment_name="exp", + # base_directory="./experiments", + # ) + + execution_manager = CMDLineExecutionManager( + executors.SynchronousLocalTaskExecutor(), + executors.MultiThreadingExecutor(20), + ) + + dr = ( + driver.Builder() + .with_modules(run_batch1) + # .with_config(config) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_execution_manager(execution_manager) + .build() + ) + dr.display_all_functions("run_batch1.png")