Skip to content

Commit

Permalink
Adds one possible way to run batch
Browse files Browse the repository at this point in the history
The experiment tracker assumes it's based on inputs, config
and overrides. So if we want to
batch submit a bunch of input permutations and we want
each one tracked, we need to use a distinct driver for each one,
hence the Hamilton within Hamilton.

This means that we are not able to create a single
DAG (driver chaining is a roadmap item) to view it all.

Otherwise:

 - does anything need to be parallel within the inner DAG?

A possible feature to help here would be to make
@subdag take in distinct adapters... or we implement
driver chaining...

Anyway let me know what you think. If you want a single
DAG we could use parameterized_subdag, but that would
come at the cost of visibility with the experiment tracker,
unless a whole permutation of inputs is a single experiment.

There is always of course modifying more of hamilton to
help here.
  • Loading branch information
skrawcz committed Mar 7, 2024
1 parent 52cb8d3 commit 73ec853
Showing 1 changed file with 89 additions and 49 deletions.
138 changes: 89 additions & 49 deletions example/run_batch1.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
Option 1: Use hamilton within Hamilton.
"""

from pathlib import Path
import build
import eq
Expand All @@ -7,58 +11,94 @@
from hamilton.io.materialization import to
from hamilton.experimental.h_cache import CachingGraphAdapter
from hamilton.plugins import h_experiments, matplotlib_extensions, pandas_extensions # noqa: F401
from hamilton.function_modifiers import tag, value, parameterize

from molexp.cmdline import CMDLineExecutionManager
from hamilton.execution import executors

tracker_hook = h_experiments.ExperimentTracker(
experiment_name="exp",
base_directory="./experiments",
)

execution_manager = CMDLineExecutionManager(
executors.SynchronousLocalTaskExecutor(),
executors.MultiThreadingExecutor(20), # max parallelism
)

dr = (
driver.Builder()
.with_modules(build, eq, tg)
# .with_config(config)
.with_adapters(tracker_hook, CachingGraphAdapter(".cache"))
.with_execution_manager(execution_manager)
.build()
)

inputs = dict(
{
# 'work_dir': ''
"repeat_unit": ["N", "M"],
"repeat": 1,
"n_chains": 20,
"density": 0.005,
}
)

materializers = [
to.pickle(
id="after_build",
dependencies=["submit"],
path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle",

# could use @resolve to dynamically create this via passed in configuration.
# this is statically declared here for now.
cross_product = {
f"{ru}x{r}": {"repeat_unit": value(ru), "repeat": value(r)}
for ru in ["NMNMP", "NMNMNMP"]
for r in [1, 4, 8]
}


@parameterize(**cross_product)
@tag(cmdline="slurm")
def experiment(repeat_unit: str, repeat: int) -> dict:
"""Node to run an experiment."""
tracker_hook = h_experiments.ExperimentTracker(
experiment_name="exp",
base_directory="./experiments",
)

execution_manager = CMDLineExecutionManager(
executors.SynchronousLocalTaskExecutor(),
executors.SynchronousLocalTaskExecutor(),
)
]

for repeat_unit in ["NMNMP", "NMNMNMP"]:
for repeat in [1, 4, 8]:
inputs["repeat_unit"] = list(repeat_unit)
inputs["repeat"] = repeat

inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}"
Path(inputs["work_dir"]).mkdir(exist_ok=True)
dr.visualize_materialization(
*materializers,
inputs=inputs,
output_file_path=f"{tracker_hook.run_directory}/dag",
render_kwargs=dict(view=False, format="png"),

dr = (
driver.Builder()
.with_modules(build, eq, tg)
# .with_config(config)
.with_adapters(tracker_hook, CachingGraphAdapter(".cache"))
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(execution_manager)
.build()
)

inputs = dict(
{
"n_chains": 20,
"density": 0.005,
}
)

materializers = [
to.pickle(
id="after_build",
dependencies=["submit"],
path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle",
)
dr.materialize(*materializers, inputs=inputs)
]

inputs["repeat_unit"] = list(repeat_unit)
inputs["repeat"] = repeat

inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}"
Path(inputs["work_dir"]).mkdir(exist_ok=True)
dr.visualize_materialization(
*materializers,
inputs=inputs,
output_file_path=f"{tracker_hook.run_directory}/dag",
render_kwargs=dict(view=False, format="png"),
)
meta, _ = dr.materialize(*materializers, inputs=inputs)
return meta


if __name__ == "__main__":
import run_batch1

# tracker_hook = h_experiments.ExperimentTracker(
# experiment_name="exp",
# base_directory="./experiments",
# )

execution_manager = CMDLineExecutionManager(
executors.SynchronousLocalTaskExecutor(),
executors.MultiThreadingExecutor(20),
)

dr = (
driver.Builder()
.with_modules(run_batch1)
# .with_config(config)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(execution_manager)
.build()
)
dr.display_all_functions("run_batch1.png")

0 comments on commit 73ec853

Please sign in to comment.