Skip to content

Commit

Permalink
implement run train. Notice changed interface (1 add config; 2 use li…
Browse files Browse the repository at this point in the history
…st instead of set for init_data). add UTs.
  • Loading branch information
Han Wang committed Feb 5, 2022
1 parent 84b3431 commit 1ca9aaa
Show file tree
Hide file tree
Showing 9 changed files with 761 additions and 13 deletions.
2 changes: 2 additions & 0 deletions dpgen2/flow/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def block_cl(
"type_map" : InputParameter(),
"numb_models": InputParameter(type=int),
"template_script" : InputParameter(),
"train_config" : InputParameter(),
"lmp_task_grp" : InputParameter(),
"conf_selector" : InputParameter(),
"fp_inputs" : InputParameter(),
Expand All @@ -68,6 +69,7 @@ def block_cl(
name + '-prep-run-dp-train',
template = prep_run_dp_train_op,
parameters={
"train_config" : block_steps.inputs.parameters['train_config'],
"numb_models": block_steps.inputs.parameters['numb_models'],
"template_script": block_steps.inputs.parameters['template_script'],
},
Expand Down
5 changes: 5 additions & 0 deletions dpgen2/flow/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def loop (
"type_map" : InputParameter(),
"numb_models": InputParameter(type=int),
"template_script" : InputParameter(),
"train_config" : InputParameter(),
"lmp_task_grp" : InputParameter(),
"conf_selector" : InputParameter(),
"fp_inputs" : InputParameter(),
Expand Down Expand Up @@ -146,6 +147,7 @@ def loop (
"type_map" : steps.inputs.parameters["type_map"],
"numb_models" : steps.inputs.parameters["numb_models"],
"template_script" : steps.inputs.parameters["template_script"],
"train_config" : steps.inputs.parameters["train_config"],
"lmp_task_grp" : steps.inputs.parameters["lmp_task_grp"],
"conf_selector" : steps.inputs.parameters["conf_selector"],
"fp_inputs" : steps.inputs.parameters["fp_inputs"],
Expand Down Expand Up @@ -198,6 +200,7 @@ def loop (
"type_map" : steps.inputs.parameters["type_map"],
"numb_models" : steps.inputs.parameters["numb_models"],
"template_script" : steps.inputs.parameters["template_script"],
"train_config" : steps.inputs.parameters["train_config"],
"lmp_task_grp" : scheduler_step.outputs.parameters["lmp_task_grp"],
"conf_selector" : scheduler_step.outputs.parameters["conf_selector"],
"fp_inputs" : steps.inputs.parameters["fp_inputs"],
Expand Down Expand Up @@ -245,6 +248,7 @@ def dpgen(
"type_map" : InputParameter(),
"numb_models": InputParameter(type=int),
"template_script" : InputParameter(),
"train_config" : InputParameter(),
"fp_inputs" : InputParameter(),
"exploration_scheduler" : InputParameter(),
},
Expand Down Expand Up @@ -305,6 +309,7 @@ def dpgen(
"type_map" : steps.inputs.parameters['type_map'],
"numb_models" : steps.inputs.parameters['numb_models'],
"template_script" : steps.inputs.parameters['template_script'],
"train_config" : steps.inputs.parameters['train_config'],
"lmp_task_grp" : scheduler_step.outputs.parameters['lmp_task_grp'],
"conf_selector" : scheduler_step.outputs.parameters['conf_selector'],
"fp_inputs" : steps.inputs.parameters['fp_inputs'],
Expand Down
8 changes: 6 additions & 2 deletions dpgen2/flow/prep_run_dp_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ def prep_run_dp_train(
name : str,
prep_train_op : OP,
run_train_op : OP,
prep_train_image : str = "dflow:v1.0",
run_train_image : str = "dflow:v1.0",
):
train_steps = Steps(
name=name,
inputs=Inputs(
parameters={
"numb_models": InputParameter(type=int),
"template_script" : InputParameter(),
"train_config" : InputParameter(),
},
artifacts={
"init_models" : InputArtifact(),
Expand All @@ -56,7 +59,7 @@ def prep_run_dp_train(
'prep-train',
template=PythonOPTemplate(
prep_train_op,
image="dflow:v1.0",
image=prep_train_image,
output_artifact_archive={
"task_paths": None
},
Expand All @@ -75,7 +78,7 @@ def prep_run_dp_train(
'run-train',
template=PythonOPTemplate(
run_train_op,
image="dflow:v1.0",
image=run_train_image,
slices = Slices(
"{{item}}",
input_parameter = ["task_name"],
Expand All @@ -85,6 +88,7 @@ def prep_run_dp_train(
python_packages = "..//dpgen2",
),
parameters={
"config" : train_steps.inputs.parameters["train_config"],
"task_name" : prep_train.outputs.parameters["task_names"],
},
artifacts={
Expand Down
236 changes: 230 additions & 6 deletions dpgen2/op/run_dp_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,28 @@
OP,
OPIO,
OPIOSign,
Artifact
Artifact,
TransientError,
FatalError,
)
import os, json, dpdata
from typing import (
Tuple,
List,
)
import os, json
from typing import Tuple, List, Set
from pathlib import Path
from dpgen2.constants import (
train_task_pattern,
train_script_name,
)
from dpgen2.utils.run_command import run_command
from dargs import (
dargs,
Argument,
Variant,
ArgumentEncoder,
)


class RunDPTrain(OP):
r"""Execute a DP training task. Train and freeze a DP model.
Expand All @@ -21,10 +38,11 @@ class RunDPTrain(OP):
@classmethod
def get_input_sign(cls):
return OPIOSign({
"config" : dict,
"task_name" : str,
"task_path" : Artifact(Path),
"init_model" : Artifact(Path),
"init_data" : Artifact(Set[Path]),
"init_data" : Artifact(List[Path]),
"iter_data" : Artifact(List[Path]),
})

Expand All @@ -49,10 +67,11 @@ def execute(
ip : dict
Input dict with components:
- `task_name`: (`dict`) The config of training task. Check `RunDPTrain._training_args` for definitions.
- `task_name`: (`str`) The name of training task.
- `task_path`: (`Artifact(Path)`) The path that contains all input files prepareed by `PrepDPTrain`.
- `init_model`: (`Artifact(Path)`) A frozen model to initialize the training.
- `init_data`: (`Artifact(set[Path])`) Initial training data.
- `init_data`: (`Artifact(List[Path])`) Initial training data.
- `iter_data`: (`Artifact(List[Path])`) Training data generated in the DPGEN iterations.
Returns
Expand All @@ -69,5 +88,210 @@ def execute(
FatalError
On the failure of training or freezing. Human intervention needed.
"""
raise NotImplementedError
config = ip['config'] if ip['config'] is not None else {}
config = RunDPTrain.normalize_config(config)
task_name = ip['task_name']
task_path = ip['task_path']
init_model = ip['init_model']
init_data = ip['init_data']
iter_data = ip['iter_data']

# update the input script
input_script = Path(task_path)/train_script_name
with open(input_script) as fp:
train_dict = json.load(fp)
if "systems" in train_dict['training']:
major_version = "1"
else:
major_version = "2"

# auto prob style
do_init_model = RunDPTrain.decide_init_model(config, init_model, init_data, iter_data)
auto_prob_str = "prob_sys_size"
if do_init_model:
old_ratio = config['init_model_old_ratio']
numb_old = len(init_data) + len(iter_data[:-1])
auto_prob_str = f"prob_sys_size; 0:{numb_old}:{old_ratio}; {numb_old}:{numb_old+1}:{1.-old_ratio:g}"

# update the input dict
train_dict = RunDPTrain.write_data_to_input_script(
train_dict, init_data, iter_data, auto_prob_str, major_version)
train_dict = RunDPTrain.write_other_to_input_script(
train_dict, config, do_init_model, major_version)

# mkdir output dir
work_dir = Path(task_name)
cwd = os.getcwd()
work_dir.mkdir(exist_ok=True, parents=True)
os.chdir(work_dir)
# open log
fplog = open('train.log', 'w')
def clean_before_quit():
fplog.close()
os.chdir(cwd)

# dump train script
with open(train_script_name, 'w') as fp:
json.dump(train_dict, fp, indent=4)

# train model
if do_init_model:
command = ['dp', 'train', '--init-frz-model', str(init_model), train_script_name]
else:
command = ['dp', 'train', train_script_name]
ret, out, err = run_command(command)
if ret != 0:
clean_before_quit()
raise FatalError('dp train failed')
fplog.write(out)

# freeze model
ret, out, err = run_command(['dp', 'freeze', '-o', 'frozen_model.pb'])
if ret != 0:
clean_before_quit()
raise FatalError('dp freeze failed')
fplog.write(out)

clean_before_quit()

return OPIO({
"script" : work_dir / train_script_name,
"model" : work_dir / "frozen_model.pb",
"lcurve" : work_dir / "lcurve.out",
"log" : work_dir / "train.log",
})


@staticmethod
def write_data_to_input_script(
idict : dict,
init_data : List[Path],
iter_data : List[Path],
auto_prob_str : str = "prob_sys_size",
major_version : str = "1",
):
odict = idict.copy()
data_list = [str(ii) for ii in init_data] + [str(ii) for ii in iter_data]
if major_version == "1":
# v1 behavior
odict['training']['systems'] = data_list
odict['training']['batch_size'] = "auto"
odict['training']['auto_prob_style'] = auto_prob_str
elif major_version == "2":
# v2 behavior
odict['training']['training_data']['systems'] = data_list
odict['training']['training_data']['batch_size'] = "auto"
odict['training']['training_data']['auto_prob'] = auto_prob_str
odict['training'].pop('validation_data', None)
else:
raise RuntimeError('unsupported DeePMD-kit major version', major_version)
return odict

@staticmethod
def write_other_to_input_script(
idict,
config,
do_init_model,
major_version : str = "1",
):
odict = idict.copy()
odict['training']['disp_file'] = "lcurve.out"
if do_init_model:
odict['learning_rate']['start_lr'] = config['init_model_start_lr']
odict['loss']['start_pref_e'] = config['init_model_start_pref_e']
odict['loss']['start_pref_f'] = config['init_model_start_pref_f']
odict['loss']['start_pref_v'] = config['init_model_start_pref_v']
if major_version == "1":
odict['training']['stop_batch'] = config['init_model_numb_steps']
elif major_version == "2":
odict['training']['numb_steps'] = config['init_model_numb_steps']
else:
raise RuntimeError('unsupported DeePMD-kit major version', major_version)
return odict

@staticmethod
def decide_init_model(
config,
init_model,
init_data,
iter_data,
):
do_init_model = False
# decide if we do init-model
## cases we do definitely not
if init_model is None or \
iter_data is None or \
len(iter_data) == 0 :
do_init_model = False
## cases controlled by the policy
else:
if config['init_model_policy'] == 'no':
do_init_model = False
elif config['init_model_policy'] == 'yes':
do_init_model = True
elif 'old_data_larger_than' in config['init_model_policy']:
old_data_size_level = int(config['init_model_policy'].split(':')[-1])
init_data_size = _get_data_size_of_all_systems(init_data)
iter_data_old_size = _get_data_size_of_all_mult_sys(iter_data[:-1])
old_data_size = init_data_size + iter_data_old_size
if old_data_size > old_data_size_level:
do_init_model = True
return do_init_model


@staticmethod
def _training_args():
doc_init_model_prolicy = "The policy of init-model training. It can be\n\n\
- 'no': No init-model training. Traing from scratch.\n\n\
- 'yes': Do init-model training.\n\n\
- 'old_data_larger_than:XXX': Do init-model if the training data size of the previous model is larger than XXX. XXX is an int number."
doc_init_model_old_ratio = "The frequency ratio of old data over new data"
doc_init_model_numb_steps = "The number of training steps when init-model"
doc_init_model_start_lr = "The start learning rate when init-model"
doc_init_model_start_pref_e = "The start energy prefactor in loss when init-model"
doc_init_model_start_pref_f = "The start force prefactor in loss when init-model"
doc_init_model_start_pref_v = "The start virial prefactor in loss when init-model"

return [
Argument("init_model_policy", str, optional=True, default='no', doc=doc_init_model_prolicy),
Argument("init_model_old_ratio", float, optional=True, default=0.9, doc=doc_init_model_old_ratio),
Argument("init_model_numb_steps", int, optional=True, default=400000, doc=doc_init_model_numb_steps, alias = ['init_model_stop_batch']),
Argument("init_model_start_lr", float, optional=True, default=1e-4, doc=doc_init_model_start_lr),
Argument("init_model_start_pref_e", float, optional=True, default=0.1, doc=doc_init_model_start_pref_e),
Argument("init_model_start_pref_f", float, optional=True, default=100, doc=doc_init_model_start_pref_f),
Argument("init_model_start_pref_v", float, optional=True, default=0.0, doc=doc_init_model_start_pref_v),
]


@staticmethod
def normalize_config(data = {}):
ta = RunDPTrain._training_args()

base = Argument("base", dict, ta)
data = base.normalize_value(data, trim_pattern="_*")
base.check_value(data, strict=True)

return data


def _get_data_size_of_system(data_dir):
ss = dpdata.System(data_dir, fmt='deepmd/npy')
return ss.get_nframes()

def _get_data_size_of_all_systems(data_dirs):
count = 0
for ii in data_dirs:
count += _get_data_size_of_system(ii)
return count

def _get_data_size_of_mult_sys(data_dir):
ms = dpdata.MultiSystems()
ms.from_deepmd_npy(data_dir)
return ms.get_nframes()

def _get_data_size_of_all_mult_sys(data_dirs):
count = 0
for ii in data_dirs:
count += _get_data_size_of_mult_sys(ii)
return count

Loading

0 comments on commit 1ca9aaa

Please sign in to comment.