diff --git a/dpgen2/entrypoint/submit.py b/dpgen2/entrypoint/submit.py index 76f944c6..70dd73ae 100644 --- a/dpgen2/entrypoint/submit.py +++ b/dpgen2/entrypoint/submit.py @@ -63,6 +63,7 @@ ) from dpgen2.exploration.render import ( TrajRenderLammps, + TrajRenderLammpsSpin, ) from dpgen2.exploration.report import ( ExplorationReportTrustLevelsRandom, @@ -81,6 +82,7 @@ CustomizedLmpTemplateTaskGroup, ExplorationStage, ExplorationTask, + LmpSpinTaskGroup, LmpTemplateTaskGroup, NPTTaskGroup, caly_normalize, @@ -377,7 +379,10 @@ def make_lmp_naive_exploration_scheduler(config): # report conv_style = convergence.pop("type") report = conv_styles[conv_style](**convergence) - render = TrajRenderLammps(nopbc=output_nopbc, use_ele_temp=use_ele_temp) + if "spin" in conv_style: + render = TrajRenderLammpsSpin(nopbc=output_nopbc, use_ele_temp=use_ele_temp) + else: + render = TrajRenderLammps(nopbc=output_nopbc, use_ele_temp=use_ele_temp) # selector selector = ConfSelectorFrames( render, diff --git a/dpgen2/exploration/deviation/__init__.py b/dpgen2/exploration/deviation/__init__.py index 537c2f94..3f41fab4 100644 --- a/dpgen2/exploration/deviation/__init__.py +++ b/dpgen2/exploration/deviation/__init__.py @@ -1,6 +1,9 @@ from .deviation_manager import ( DeviManager, ) +from .deviation_spin import ( + DeviManagerSpin, +) from .deviation_std import ( DeviManagerStd, ) diff --git a/dpgen2/exploration/deviation/deviation_spin.py b/dpgen2/exploration/deviation/deviation_spin.py new file mode 100644 index 00000000..dc6b832a --- /dev/null +++ b/dpgen2/exploration/deviation/deviation_spin.py @@ -0,0 +1,127 @@ +from collections import ( + defaultdict, +) +from typing import ( + Dict, + List, + Optional, +) + +import numpy as np + +from .deviation_manager import ( + DeviManager, +) + + +class DeviManagerSpin(DeviManager): + r"""The class which is responsible for DeepSPIN model deviation management. + + This is the implementation of DeviManager for DeepSPIN model. Each deviation + (e.g. max_devi_af, max_devi_mf in file `model_devi.out`) is stored + as a List[Optional[np.ndarray]], where np.array is a one-dimensional + array. + A List[np.ndarray][ii][jj] is the force model deviation of the jj-th + frame of the ii-th trajectory. + The model deviation can be List[None], where len(List[None]) is + the number of trajectory files. + + """ + + MAX_DEVI_AF = "max_devi_af" + MIN_DEVI_AF = "min_devi_af" + AVG_DEVI_AF = "avg_devi_af" + MAX_DEVI_MF = "max_devi_mf" + MIN_DEVI_MF = "min_devi_mf" + AVG_DEVI_MF = "avg_devi_mf" + + def __init__(self): + super().__init__() + self._data = defaultdict(list) + + def _check_name(self, name: str): + assert name in ( + DeviManager.MAX_DEVI_V, + DeviManager.MIN_DEVI_V, + DeviManager.AVG_DEVI_V, + self.MAX_DEVI_AF, + self.MIN_DEVI_AF, + self.AVG_DEVI_AF, + self.MAX_DEVI_MF, + self.MIN_DEVI_MF, + self.AVG_DEVI_MF, + ), f"Error: unknown deviation name {name}" + + def _add(self, name: str, deviation: np.ndarray) -> None: + assert isinstance( + deviation, np.ndarray + ), f"Error: deviation(type: {type(deviation)}) is not a np.ndarray" + assert len(deviation.shape) == 1, ( + f"Error: deviation(shape: {deviation.shape}) is not a " + + f"one-dimensional array" + ) + self._data[name].append(deviation) + self.ntraj = max(self.ntraj, len(self._data[name])) + + def _get(self, name: str) -> List[Optional[np.ndarray]]: + if self.ntraj == 0: + return [] + elif len(self._data[name]) == 0: + return [None for _ in range(self.ntraj)] + else: + return self._data[name] + + def clear(self) -> None: + self.__init__() + return None + + def _check_data(self) -> None: + r"""Check if data is valid""" + model_devi_names = ( + DeviManager.MAX_DEVI_V, + DeviManager.MIN_DEVI_V, + DeviManager.AVG_DEVI_V, + self.MAX_DEVI_AF, + self.MIN_DEVI_AF, + self.AVG_DEVI_AF, + self.MAX_DEVI_MF, + self.MIN_DEVI_MF, + self.AVG_DEVI_MF, + ) + # check the length of model deviations + frames = {} + for name in model_devi_names: + if len(self._data[name]) > 0: + assert len(self._data[name]) == self.ntraj, ( + f"Error: the number of model deviation {name} " + + f"({len(self._data[name])}) and trajectory files ({self.ntraj}) " + + f"are not equal." + ) + for idx, ndarray in enumerate(self._data[name]): + assert isinstance(ndarray, np.ndarray), ( + f"Error: model deviation in {name} is not ndarray, " + + f"index: {idx}, type: {type(ndarray)}" + ) + + frames[name] = [arr.shape[0] for arr in self._data[name]] + if len(frames[name]) == 0: + frames.pop(name) + + # check if "max_devi_af" and "max_devi_mf" exist + assert ( + len(self._data[self.MAX_DEVI_AF]) == self.ntraj + ), f"Error: cannot find model deviation {self.MAX_DEVI_AF}" + assert ( + len(self._data[self.MAX_DEVI_MF]) == self.ntraj + ), f"Error: cannot find model deviation {self.MAX_DEVI_MF}" + + # check if the length of the arrays corresponding to the same + # trajectory has the same number of frames + non_empty_deviations = list(frames.keys()) + for name in non_empty_deviations[1:]: + assert frames[name] == frames[non_empty_deviations[0]], ( + f"Error: the number of frames in {name} is different " + + f"with that in {non_empty_deviations[0]}.\n" + + f"{name}: {frames[name]}\n" + + f"{non_empty_deviations[0]}: {frames[non_empty_deviations[0]]}\n" + ) diff --git a/dpgen2/exploration/render/__init__.py b/dpgen2/exploration/render/__init__.py index e926c484..44347341 100644 --- a/dpgen2/exploration/render/__init__.py +++ b/dpgen2/exploration/render/__init__.py @@ -4,3 +4,6 @@ from .traj_render_lammps import ( TrajRenderLammps, ) +from .traj_render_lammps_spin import ( + TrajRenderLammpsSpin, +) diff --git a/dpgen2/exploration/render/traj_render_lammps_spin.py b/dpgen2/exploration/render/traj_render_lammps_spin.py new file mode 100644 index 00000000..621ca65c --- /dev/null +++ b/dpgen2/exploration/render/traj_render_lammps_spin.py @@ -0,0 +1,76 @@ +from pathlib import ( + Path, +) +from typing import ( + TYPE_CHECKING, + List, + Optional, + Tuple, + Union, +) + +import dpdata +import numpy as np + +from ..deviation import ( + DeviManager, + DeviManagerSpin, +) +from .traj_render import ( + TrajRender, +) + +if TYPE_CHECKING: + from dpgen2.exploration.selector import ( + ConfFilters, + ) + + +class TrajRenderLammpsSpin(TrajRender): + def __init__( + self, + nopbc: bool = False, + use_ele_temp: int = 0, + ): + self.nopbc = nopbc + + def get_model_devi( + self, + files: List[Path], + ) -> DeviManagerSpin: + ntraj = len(files) + + model_devi = DeviManagerSpin() + for ii in range(ntraj): + self._load_one_model_devi(files[ii], model_devi) + + return model_devi + + def _load_one_model_devi(self, fname, model_devi): + dd = np.loadtxt(fname) + model_devi.add(DeviManagerSpin.MAX_DEVI_AF, dd[:, 4]) + model_devi.add(DeviManagerSpin.MIN_DEVI_AF, dd[:, 5]) + model_devi.add(DeviManagerSpin.AVG_DEVI_AF, dd[:, 6]) + model_devi.add(DeviManagerSpin.MAX_DEVI_MF, dd[:, 7]) + model_devi.add(DeviManagerSpin.MIN_DEVI_MF, dd[:, 8]) + model_devi.add(DeviManagerSpin.AVG_DEVI_MF, dd[:, 9]) + + def get_confs( + self, + trajs: List[Path], + id_selected: List[List[int]], + type_map: Optional[List[str]] = None, + conf_filters: Optional["ConfFilters"] = None, + optional_outputs: Optional[List[Path]] = None, + ) -> dpdata.MultiSystems: + del conf_filters # by far does not support conf filters + ntraj = len(trajs) + traj_fmt = "lammps/dump" + ms = dpdata.MultiSystems(type_map=type_map) + for ii in range(ntraj): + if len(id_selected[ii]) > 0: + ss = dpdata.System(trajs[ii], fmt=traj_fmt, type_map=type_map) + ss.nopbc = self.nopbc + ss = ss.sub_system(id_selected[ii]) + ms.append(ss) + return ms diff --git a/dpgen2/exploration/report/__init__.py b/dpgen2/exploration/report/__init__.py index 282068c2..afebba12 100644 --- a/dpgen2/exploration/report/__init__.py +++ b/dpgen2/exploration/report/__init__.py @@ -10,9 +10,13 @@ from .report_trust_levels_random import ( ExplorationReportTrustLevelsRandom, ) +from .report_trust_levels_spin import ( + ExplorationReportTrustLevelsSpin, +) conv_styles = { "fixed-levels": ExplorationReportTrustLevelsRandom, "fixed-levels-max-select": ExplorationReportTrustLevelsMax, + "fixed-levels-max-select-spin": ExplorationReportTrustLevelsSpin, "adaptive-lower": ExplorationReportAdaptiveLower, } diff --git a/dpgen2/exploration/report/report_trust_levels_spin.py b/dpgen2/exploration/report/report_trust_levels_spin.py new file mode 100644 index 00000000..b532333d --- /dev/null +++ b/dpgen2/exploration/report/report_trust_levels_spin.py @@ -0,0 +1,300 @@ +import random +from abc import ( + abstractmethod, +) +from typing import ( + List, + Optional, + Tuple, +) + +import numpy as np +from dargs import ( + Argument, +) +from dflow.python import ( + FatalError, +) + +from ..deviation import ( + DeviManagerSpin, +) +from . import ( + ExplorationReport, +) + + +class ExplorationReportTrustLevelsSpin(ExplorationReport): + def __init__( + self, + level_af_lo, + level_af_hi, + level_mf_lo, + level_mf_hi, + conv_accuracy=0.9, + ): + self.level_af_lo = level_af_lo + self.level_af_hi = level_af_hi + self.level_mf_lo = level_mf_lo + self.level_mf_hi = level_mf_hi + self.conv_accuracy = conv_accuracy + self.clear() + self.model_devi = None + + print_tuple = ( + "stage", + "id_stg.", + "iter.", + "accu.", + "cand.", + "fail.", + "lvl_af_lo", + "lvl_af_hi", + "lvl_mf_lo", + "lvl_mf_hi", + "cvged", + ) + spaces = [8, 8, 8, 10, 10, 10, 10, 10, 10, 10, 8] + self.fmt_str = " ".join([f"%{ii}s" for ii in spaces]) + self.fmt_flt = "%.4f" + self.header_str = "#" + self.fmt_str % print_tuple + + @staticmethod + def args() -> List[Argument]: + doc_level_af_lo = "The lower trust level of atomic force model deviation" + doc_level_af_hi = "The higher trust level of atomic force model deviation" + doc_level_mf_lo = "The lower trust level of magnetic force model deviation" + doc_level_mf_hi = "The higher trust level of magnetic force model deviation" + doc_conv_accuracy = "If the ratio of accurate frames is larger than this value, the stage is converged" + return [ + Argument("level_af_lo", float, optional=False, doc=doc_level_af_lo), + Argument("level_af_hi", float, optional=False, doc=doc_level_af_hi), + Argument("level_mf_lo", float, optional=False, doc=doc_level_mf_lo), + Argument("level_mf_hi", float, optional=False, doc=doc_level_mf_hi), + Argument( + "conv_accuracy", + float, + optional=True, + default=0.9, + doc=doc_conv_accuracy, + ), + ] + + def clear(self): + self.traj_nframes = [] + self.traj_cand = [] + self.traj_accu = [] + self.traj_fail = [] + self.traj_cand_picked = [] + self.model_devi = None + + def record( + self, + model_devi: DeviManagerSpin, + ): + ntraj = model_devi.ntraj + md_af = model_devi.get(DeviManagerSpin.MAX_DEVI_AF) + md_mf = model_devi.get(DeviManagerSpin.MAX_DEVI_MF) + + for ii in range(ntraj): + id_af_cand, id_af_accu, id_af_fail = self._get_indexes( + md_af[ii], self.level_af_lo, self.level_af_hi + ) + id_mf_cand, id_mf_accu, id_mf_fail = self._get_indexes( + md_mf[ii], self.level_mf_lo, self.level_mf_hi + ) + nframes, set_accu, set_cand, set_fail = self._record_one_traj( + id_af_accu, + id_af_cand, + id_af_fail, + id_mf_accu, + id_mf_cand, + id_mf_fail, + ) + # record + self.traj_nframes.append(nframes) + self.traj_cand.append(set_cand) + self.traj_accu.append(set_accu) + self.traj_fail.append(set_fail) + assert len(self.traj_nframes) == ntraj + assert len(self.traj_cand) == ntraj + assert len(self.traj_accu) == ntraj + assert len(self.traj_fail) == ntraj + self.model_devi = model_devi + + def _get_indexes( + self, + md, + level_lo, + level_hi, + ): + if (md is not None) and (level_hi is not None) and (level_lo is not None): + id_cand = np.where(np.logical_and(md >= level_lo, md < level_hi))[0] + id_accu = np.where(md < level_lo)[0] + id_fail = np.where(md >= level_hi)[0] + else: + id_cand = id_accu = id_fail = None + return id_cand, id_accu, id_fail + + def _record_one_traj( + self, + id_af_accu, + id_af_cand, + id_af_fail, + id_mf_accu, + id_mf_cand, + id_mf_fail, + ): + """ + Record one trajctory. inputs are the indexes of candidate, accurate and failed frames. + + """ + # check consistency + nframes = np.size(np.concatenate((id_af_cand, id_af_accu, id_af_fail))) + if nframes != np.size(np.concatenate((id_mf_cand, id_mf_accu, id_mf_fail))): + raise FatalError( + "the number of frames by atomic force and magnetic force is not consistent" + ) + # nframes to sets + set_af_accu = set(id_af_accu) + set_af_cand = set(id_af_cand) + set_af_fail = set(id_af_fail) + set_mf_accu = set(id_mf_accu) + set_mf_cand = set(id_mf_cand) + set_mf_fail = set(id_mf_fail) + # accu, cand, fail + set_accu = set_af_accu & set_mf_accu + set_cand = ( + (set_af_cand & set_mf_accu) + | (set_af_cand & set_mf_cand) + | (set_af_accu & set_mf_cand) + ) + set_fail = set_af_fail | set_mf_fail + # check size + assert nframes == len(set_accu | set_cand | set_fail) + assert 0 == len(set_accu & set_cand) + assert 0 == len(set_accu & set_fail) + assert 0 == len(set_cand & set_fail) + return nframes, set_accu, set_cand, set_fail + + def converged( + self, + reports: Optional[List[ExplorationReport]] = None, + ) -> bool: + return self.accurate_ratio() >= self.conv_accuracy + + def failed_ratio( + self, + tag=None, + ): + traj_nf = [len(ii) for ii in self.traj_fail] + return float(sum(traj_nf)) / float(sum(self.traj_nframes)) + + def accurate_ratio( + self, + tag=None, + ): + traj_nf = [len(ii) for ii in self.traj_accu] + return float(sum(traj_nf)) / float(sum(self.traj_nframes)) + + def candidate_ratio( + self, + tag=None, + ): + traj_nf = [len(ii) for ii in self.traj_cand] + return float(sum(traj_nf)) / float(sum(self.traj_nframes)) + + def get_candidate_ids( + self, + max_nframes: Optional[int] = None, + ) -> List[List[int]]: + ntraj = len(self.traj_nframes) + id_cand = self._get_candidates(max_nframes) + id_cand_list = [[] for ii in range(ntraj)] + for ii in id_cand: + id_cand_list[ii[0]].append(ii[1]) + return id_cand_list + + def _get_candidates( + self, + max_nframes: Optional[int] = None, + ) -> List[Tuple[int, int]]: + """ + Get candidates. If number of candidates is larger than `max_nframes`, + then select `max_nframes` frames with the largest `max_devi_mf` + from the candidates. + + Parameters + ---------- + max_nframes + The maximal number of frames of candidates. + + Returns + ------- + cand_frames List[Tuple[int,int]] + Candidate frames. A list of tuples: [(traj_idx, frame_idx), ...] + """ + self.traj_cand_picked = [] + for tidx, tt in enumerate(self.traj_cand): + for ff in tt: + self.traj_cand_picked.append((tidx, ff)) + if max_nframes is not None and max_nframes < len(self.traj_cand_picked): + # select by max magnetic force + max_devi_af = self.model_devi.get(DeviManagerSpin.MAX_DEVI_AF) + max_devi_mf = self.model_devi.get(DeviManagerSpin.MAX_DEVI_MF) + ret = sorted( + self.traj_cand_picked, + key=lambda x: max_devi_mf[x[0]][x[1]], + reverse=True, + ) + ret = ret[:max_nframes] + else: + ret = self.traj_cand_picked + return ret + + def print_header(self) -> str: + r"""Print the header of report""" + return self.header_str + + def print( + self, + stage_idx: int, + idx_in_stage: int, + iter_idx: int, + ) -> str: + r"""Print the report""" + fmt_str = self.fmt_str + fmt_flt = self.fmt_flt + print_tuple = ( + str(stage_idx), + str(idx_in_stage), + str(iter_idx), + fmt_flt % (self.accurate_ratio()), + fmt_flt % (self.candidate_ratio()), + fmt_flt % (self.failed_ratio()), + fmt_flt % (self.level_af_lo), + fmt_flt % (self.level_af_hi), + fmt_flt % (self.level_mf_lo), + fmt_flt % (self.level_mf_hi), + str(self.converged()), + ) + ret = " " + fmt_str % print_tuple + return ret + + @staticmethod + def doc() -> str: + def make_class_doc_link(key): + from dpgen2.entrypoint.args import ( + make_link, + ) + + return make_link( + key, f"explore[lmp]/convergence[fixed-levels-max-select-spin]/{key}" + ) + + level_af_hi_link = make_class_doc_link("level_af_hi") + level_af_lo_link = make_class_doc_link("level_af_lo") + level_mf_hi_link = make_class_doc_link("level_mf_hi") + level_mf_lo_link = make_class_doc_link("level_mf_lo") + conv_accuracy_link = make_class_doc_link("conv_accuracy") + return f"The configurations with atomic force model deviation between {level_af_lo_link}, {level_af_hi_link} or magnetic force model deviation between {level_mf_lo_link} and {level_mf_hi_link} are treated as candidates. The configurations with maximal magnetic force model deviation in the candidates are sent for FP calculations. If the ratio of accurate (below {level_af_lo_link} and {level_mf_lo_link}) is higher then {conv_accuracy_link}, the stage is treated as converged." diff --git a/dpgen2/exploration/task/__init__.py b/dpgen2/exploration/task/__init__.py index 534a8828..b60b0bce 100644 --- a/dpgen2/exploration/task/__init__.py +++ b/dpgen2/exploration/task/__init__.py @@ -10,6 +10,9 @@ from .diffcsp_task_group import ( DiffCSPTaskGroup, ) +from .lmp_spin_task_group import ( + LmpSpinTaskGroup, +) from .lmp_template_task_group import ( LmpTemplateTaskGroup, ) diff --git a/dpgen2/exploration/task/lmp_spin_task_group.py b/dpgen2/exploration/task/lmp_spin_task_group.py new file mode 100644 index 00000000..79016042 --- /dev/null +++ b/dpgen2/exploration/task/lmp_spin_task_group.py @@ -0,0 +1,126 @@ +import itertools +import random +from pathlib import ( + Path, +) +from typing import ( + List, + Optional, +) + +from dpgen2.constants import ( + lmp_conf_name, + lmp_input_name, + lmp_traj_name, + model_name_pattern, + plm_input_name, + plm_output_name, +) + +from .conf_sampling_task_group import ( + ConfSamplingTaskGroup, +) +from .lmp import ( + make_lmp_input, +) +from .task import ( + ExplorationTask, +) + + +class LmpSpinTaskGroup(ConfSamplingTaskGroup): + def __init__( + self, + ): + super().__init__() + self.lmp_set = False + self.plm_set = False + + def set_lmp( + self, + numb_models: int, + lmp_template_fname: str, + plm_template_fname: Optional[str] = None, + revisions: dict = {}, + ) -> None: + self.lmp_template = Path(lmp_template_fname).read_text().split("\n") + self.revisions = revisions + self.lmp_set = True + self.model_list = sorted([model_name_pattern % ii for ii in range(numb_models)]) + if plm_template_fname is not None: + self.plm_template = Path(plm_template_fname).read_text().split("\n") + self.plm_set = True + + def make_task( + self, + ) -> "LmpSpinTaskGroup": + if not self.conf_set: + raise RuntimeError("confs are not set") + if not self.lmp_set: + raise RuntimeError("Lammps SPIN template and revisions are not set") + # clear all existing tasks + self.clear() + confs = self._sample_confs() + templates = [self.lmp_template] + conts = self.make_cont(templates, self.revisions) + nconts = len(conts[0]) + for cc, ii in itertools.product(confs, range(nconts)): # type: ignore + self.add_task(self._make_lmp_task(cc, conts[0][ii])) + return self + + def make_cont( + self, + templates: list, + revisions: dict, + ): + keys = revisions.keys() + prod_vv = [revisions[kk] for kk in keys] + ntemplate = len(templates) + ret = [[] for ii in range(ntemplate)] + for vv in itertools.product(*prod_vv): + for ii in range(ntemplate): + tt = templates[ii].copy() + ret[ii].append("\n".join(revise_by_keys(tt, keys, vv))) + return ret + + def _make_lmp_task( + self, + conf: str, + lmp_cont: str, + plm_cont: Optional[str] = None, + ) -> ExplorationTask: + task = ExplorationTask() + task.add_file( + lmp_conf_name, + conf, + ).add_file( + lmp_input_name, + lmp_cont, + ) + if plm_cont is not None: + task.add_file( + plm_input_name, + plm_cont, + ) + return task + + +def find_only_one_key(lmp_lines, key): + found = [] + for idx in range(len(lmp_lines)): + words = lmp_lines[idx].split() + nkey = len(key) + if len(words) >= nkey and words[:nkey] == key: + found.append(idx) + if len(found) > 1: + raise RuntimeError("found %d keywords %s" % (len(found), key)) + if len(found) == 0: + raise RuntimeError("failed to find keyword %s" % (key)) + return found[0] + + +def revise_by_keys(lmp_lines, keys, values): + for kk, vv in zip(keys, values): # type: ignore + for ii in range(len(lmp_lines)): + lmp_lines[ii] = lmp_lines[ii].replace(kk, str(vv)) + return lmp_lines diff --git a/dpgen2/exploration/task/make_task_group_from_config.py b/dpgen2/exploration/task/make_task_group_from_config.py index 3b793c58..bdc7bde1 100644 --- a/dpgen2/exploration/task/make_task_group_from_config.py +++ b/dpgen2/exploration/task/make_task_group_from_config.py @@ -19,6 +19,9 @@ from dpgen2.exploration.task.customized_lmp_template_task_group import ( CustomizedLmpTemplateTaskGroup, ) +from dpgen2.exploration.task.lmp_spin_task_group import ( + LmpSpinTaskGroup, +) from dpgen2.exploration.task.lmp_template_task_group import ( LmpTemplateTaskGroup, ) @@ -286,11 +289,51 @@ def customized_lmp_template_task_group_args(): ] +def lmp_spin_task_group_args(): + doc_lmp_template_fname = "The file name of lammps input template" + doc_plm_template_fname = "The file name of plumed input template" + doc_revisions = "The revisions. Should be a dict providing the key - list of desired values pair. Key is the word to be replaced in the templates, and it may appear in both the lammps and plumed input templates. All values in the value list will be enmerated." + + return [ + Argument("conf_idx", list, optional=False, doc=doc_conf_idx, alias=["sys_idx"]), + Argument( + "n_sample", + int, + optional=True, + default=None, + doc=doc_n_sample, + ), + Argument( + "lmp_template_fname", + str, + optional=False, + doc=doc_lmp_template_fname, + alias=["lmp_template", "lmp"], + ), + Argument( + "plm_template_fname", + str, + optional=True, + default=None, + doc=doc_plm_template_fname, + alias=["plm_template", "plm"], + ), + Argument( + "revisions", + dict, + optional=True, + default={}, + doc=doc_revisions, + ), + ] + + def variant_task_group(): doc = "the type of the task group" doc_lmp_md = "Lammps MD tasks. DPGEN will generate the lammps input script" doc_lmp_template = "Lammps MD tasks defined by templates. User provide lammps (and plumed) template for lammps tasks. The variables in templates are revised by the revisions key. Notice that the lines for pair style, dump and plumed are reserved for the revision of dpgen2, and the users should not write these lines by themselves. Rather, users notify dpgen2 the poistion of the line for `pair_style` by writting 'pair_style deepmd', the line for `dump` by writting 'dump dpgen_dump'. If plumed is used, the line for `fix plumed` shouldbe written exactly as 'fix dpgen_plm'. " doc_customized_lmp_template = "Lammps MD tasks defined by user customized shell commands and templates. User provided shell script generates a series of folders, and each folder contains a lammps template task group. " + doc_lmp_spin = "Lammps SPIN tasks defined by templates. User provides lammps template and revision keys." return Variant( "type", [ @@ -309,6 +352,12 @@ def variant_task_group(): customized_lmp_template_task_group_args(), doc=doc_customized_lmp_template, ), + Argument( + "lmp-spin", + dict, + lmp_spin_task_group_args(), + doc=doc_lmp_spin, + ), ], doc=doc, ) @@ -644,6 +693,15 @@ def make_lmp_task_group_from_config( sh_cmd, **config, ) + elif config["type"] == "lmp-spin": + tgroup = LmpSpinTaskGroup() + config.pop("type") + lmp_spin_template = config.pop("lmp_template_fname") + tgroup.set_lmp( + numb_models, + lmp_spin_template, + **config, + ) else: raise RuntimeError("unknown task group type: ", config["type"]) return tgroup diff --git a/dpgen2/fp/__init__.py b/dpgen2/fp/__init__.py index 8784ed5c..1e46b916 100644 --- a/dpgen2/fp/__init__.py +++ b/dpgen2/fp/__init__.py @@ -13,6 +13,10 @@ PrepDeepmd, RunDeepmd, ) +from .deltaspin import ( + PrepDeltaSpin, + RunDeltaSpin, +) from .gaussian import ( GaussianInputs, PrepGaussian, @@ -50,4 +54,9 @@ "prep": PrepFpOpCp2k, "run": RunFpOpCp2k, }, + "deltaspin": { + "inputs": VaspInputs, + "prep": PrepDeltaSpin, + "run": RunDeltaSpin, + }, } diff --git a/dpgen2/fp/deltaspin.py b/dpgen2/fp/deltaspin.py new file mode 100644 index 00000000..6ac61207 --- /dev/null +++ b/dpgen2/fp/deltaspin.py @@ -0,0 +1,190 @@ +import logging +from pathlib import ( + Path, +) +from typing import ( + Dict, + List, + Optional, + Set, + Tuple, + Union, +) + +import dpdata +import numpy as np +from dargs import ( + Argument, + ArgumentEncoder, + Variant, + dargs, +) +from dflow.python import ( + OP, + OPIO, + Artifact, + BigParameter, + FatalError, + OPIOSign, + TransientError, +) + +from dpgen2.constants import ( + fp_default_log_name, + fp_default_out_data_name, +) +from dpgen2.utils.run_command import ( + run_command, +) + +from .prep_fp import ( + PrepFp, +) +from .run_fp import ( + RunFp, +) +from .vasp_input import ( + VaspInputs, + make_kspacing_kpoints, +) + +# global static variables +vasp_conf_name = "POSCAR" +vasp_input_name = "INCAR" +vasp_pot_name = "POTCAR" +vasp_kp_name = "KPOINTS" + + +class PrepDeltaSpin(PrepFp): + def prep_task( + self, + conf_frame: dpdata.System, + vasp_inputs: VaspInputs, + ): + r"""Define how one DeltaSpin task is prepared. + + Parameters + ---------- + conf_frame : dpdata.System + One frame of configuration in the dpdata format. + vasp_inputs : VaspInputs + The VaspInputs object handels all other input files of the task. + """ + + Path(vasp_input_name).write_text(vasp_inputs.incar_template) + conf_frame.to("vasp_deltaspin/poscar", vasp_conf_name) + # fix the case when some element have 0 atom, e.g. H0O2 + tmp_frame = dpdata.System(vasp_conf_name, fmt="vasp/poscar") + Path(vasp_pot_name).write_text(vasp_inputs.make_potcar(tmp_frame["atom_names"])) + Path(vasp_kp_name).write_text(vasp_inputs.make_kpoints(conf_frame["cells"][0])) # type: ignore + + +class RunDeltaSpin(RunFp): + def input_files(self) -> List[str]: + r"""The mandatory input files to run a DeltaSpin task. + + Returns + ------- + files: List[str] + A list of madatory input files names. + + """ + return [vasp_conf_name, vasp_input_name, vasp_pot_name, vasp_kp_name] + + def optional_input_files(self) -> List[str]: + r"""The optional input files to run a DeltaSpin task. + + Returns + ------- + files: List[str] + A list of optional input files names. + + """ + return [] + + def run_task( + self, + command: str, + out: str, + log: str, + ) -> Tuple[str, str]: + r"""Defines how one FP task runs + + Parameters + ---------- + command : str + The command of running vasp task + out : str + The name of the output data file. + log : str + The name of the log file + + Returns + ------- + out_name: str + The file name of the output data in the dpdata.LabeledSystem format. + log_name: str + The file name of the log. + """ + + log_name = log + out_name = out + # run vasp + command = " ".join([command, ">", log_name]) + ret, out, err = run_command(command, shell=True) + if ret != 0: + logging.error( + "".join( + ( + "DeltaSpin failed\n", + "out msg: ", + out, + "\n", + "err msg: ", + err, + "\n", + ) + ) + ) + raise TransientError("DeltaSpin failed") + # convert the output to deepmd/npy format + sys = dpdata.LabeledSystem("OUTCAR", fmt="vasp_deltaspin/outcar") + sys.to("deepmd/npy", out_name) + return out_name, log_name + + @staticmethod + def args(): + r"""The argument definition of the `run_task` method. + + Returns + ------- + arguments: List[dargs.Argument] + List of dargs.Argument defines the arguments of `run_task` method. + """ + + doc_deltaspin_cmd = "The command of DeltaSpin" + doc_deltaspin_log = "The log file name of DeltaSpin" + doc_deltaspin_out = "The output dir name of labeled data. In `deepmd/spin/npy` format provided by `dpdata`." + return [ + Argument( + "command", + str, + optional=True, + default="vasp_deltaspin", + doc=doc_deltaspin_cmd, + ), + Argument( + "out", + str, + optional=True, + default=fp_default_out_data_name, + doc=doc_deltaspin_out, + ), + Argument( + "log", + str, + optional=True, + default=fp_default_log_name, + doc=doc_deltaspin_log, + ), + ] diff --git a/tests/exploration/test_devi_manager_spin.py b/tests/exploration/test_devi_manager_spin.py new file mode 100644 index 00000000..dcae5c03 --- /dev/null +++ b/tests/exploration/test_devi_manager_spin.py @@ -0,0 +1,121 @@ +import os +import unittest +from pathlib import ( + Path, +) + +import numpy as np + +# isort: off +from .context import ( + dpgen2, +) +from dpgen2.exploration.deviation import ( + DeviManager, + DeviManagerSpin, +) + +# isort: on + + +class TestDeviManagerSpin(unittest.TestCase): + def test_success(self): + model_devi = DeviManagerSpin() + model_devi.add(DeviManagerSpin.MAX_DEVI_AF, np.array([1, 2, 3])) + model_devi.add(DeviManagerSpin.MAX_DEVI_AF, np.array([4, 5, 6])) + model_devi.add(DeviManagerSpin.MAX_DEVI_MF, np.array([7, 8, 9])) + model_devi.add(DeviManagerSpin.MAX_DEVI_MF, np.array([10, 11, 12])) + + self.assertEqual(model_devi.ntraj, 2) + self.assertTrue( + np.allclose( + model_devi.get(DeviManagerSpin.MAX_DEVI_AF), + np.array([[1, 2, 3], [4, 5, 6]]), + ) + ) + self.assertTrue( + np.allclose( + model_devi.get(DeviManagerSpin.MAX_DEVI_MF), + np.array([[7, 8, 9], [10, 11, 12]]), + ) + ) + self.assertEqual(model_devi.get(DeviManager.MAX_DEVI_V), [None, None]) + + model_devi.clear() + self.assertEqual(model_devi.ntraj, 0) + self.assertEqual(model_devi.get(DeviManagerSpin.MAX_DEVI_AF), []) + self.assertEqual(model_devi.get(DeviManagerSpin.MAX_DEVI_MF), []) + self.assertEqual(model_devi.get(DeviManager.MAX_DEVI_V), []) + + def test_add_invalid_name(self): + model_devi = DeviManagerSpin() + + self.assertRaisesRegex( + AssertionError, + "Error: unknown deviation name foo", + model_devi.add, + "foo", + np.array([1, 2, 3]), + ) + + def test_add_invalid_deviation(self): + model_devi = DeviManagerSpin() + + self.assertRaisesRegex( + AssertionError, + "Error: deviation\(shape: ", + model_devi.add, + DeviManagerSpin.MAX_DEVI_AF, + np.array([[1], [2], [3]]), + ) + + self.assertRaisesRegex( + AssertionError, + "Error: deviation\(type: ", + model_devi.add, + DeviManagerSpin.MAX_DEVI_MF, + "foo", + ) + + def test_devi_manager_spin_check_data(self): + model_devi = DeviManagerSpin() + model_devi.add(DeviManagerSpin.MAX_DEVI_AF, np.array([1, 2, 3])) + model_devi.add(DeviManagerSpin.MAX_DEVI_AF, np.array([4, 5, 6])) + model_devi.add(DeviManagerSpin.MAX_DEVI_MF, np.array([7, 8, 9])) + + self.assertEqual(model_devi.ntraj, 2) + + self.assertRaisesRegex( + AssertionError, + "Error: the number of model deviation", + model_devi.get, + DeviManagerSpin.MAX_DEVI_MF, + ) + + model_devi = DeviManagerSpin() + model_devi.add(DeviManagerSpin.MAX_DEVI_MF, np.array([1, 2, 3])) + + self.assertRaisesRegex( + AssertionError, + f"Error: cannot find model deviation {DeviManagerSpin.MAX_DEVI_AF}", + model_devi.get, + DeviManagerSpin.MAX_DEVI_MF, + ) + + model_devi = DeviManagerSpin() + model_devi.add(DeviManagerSpin.MAX_DEVI_AF, np.array([1, 2, 3])) + model_devi.add(DeviManagerSpin.MAX_DEVI_AF, np.array([4, 5, 6])) + model_devi.add(DeviManagerSpin.MAX_DEVI_MF, np.array([1, 2, 3])) + model_devi.add(DeviManagerSpin.MAX_DEVI_MF, np.array([4, 5])) + self.assertRaisesRegex( + AssertionError, + f"Error: the number of frames in", + model_devi.get, + DeviManagerSpin.MAX_DEVI_AF, + ) + self.assertRaisesRegex( + AssertionError, + f"Error: the number of frames in", + model_devi.get, + DeviManagerSpin.MAX_DEVI_MF, + ) diff --git a/tests/exploration/test_report_trust_levels_spin.py b/tests/exploration/test_report_trust_levels_spin.py new file mode 100644 index 00000000..e91b5c9d --- /dev/null +++ b/tests/exploration/test_report_trust_levels_spin.py @@ -0,0 +1,103 @@ +import os +import textwrap +import unittest +from collections import ( + Counter, +) + +import numpy as np +from dargs import ( + Argument, +) + +# isort: off +from context import ( + dpgen2, +) +from dpgen2.exploration.deviation import ( + DeviManager, + DeviManagerSpin, +) +from dpgen2.exploration.report import ( + ExplorationReportTrustLevelsSpin, +) +from dpgen2.exploration.report.report_trust_levels_base import ( + ExplorationReportTrustLevels, +) + +# isort: on + + +class TestTrajsExplorationReportSpin(unittest.TestCase): + def test_exploration_report_trust_levels_spin(self): + self.selection_test(ExplorationReportTrustLevelsSpin) + self.args_test(ExplorationReportTrustLevelsSpin) + + def selection_test(self, exploration_report: ExplorationReportTrustLevelsSpin): + model_devi = DeviManagerSpin() + model_devi.add( + DeviManagerSpin.MAX_DEVI_AF, + np.array([0.90, 0.10, 0.91, 0.11, 0.50, 0.12, 0.51, 0.52, 0.92]), + ) + model_devi.add( + DeviManagerSpin.MAX_DEVI_AF, + np.array([0.40, 0.20, 0.80, 0.81, 0.82, 0.21, 0.41, 0.22, 0.42]), + ) + model_devi.add( + DeviManagerSpin.MAX_DEVI_MF, + np.array([0.40, 0.20, 0.21, 0.80, 0.81, 0.41, 0.22, 0.82, 0.42]), + ) + model_devi.add( + DeviManagerSpin.MAX_DEVI_MF, + np.array([0.50, 0.90, 0.91, 0.92, 0.51, 0.52, 0.10, 0.11, 0.12]), + ) + + # id_f_accu = [ [3, 5, 1], [1, 7, 5] ] + # id_f_cand = [ [4, 7, 6], [8, 6, 0] ] + # id_f_fail = [ [2, 0, 8], [4, 2, 3] ] + # id_v_accu = [ [1, 2, 6], [7, 8, 6] ] + # id_v_cand = [ [0, 5, 8], [0, 5, 4] ] + # id_v_fail = [ [4, 3, 7], [2, 3, 1] ] + expected_accu = [[1], [7]] + expected_cand = [[6, 5], [8, 6, 0, 5]] + expected_fail = [[0, 2, 3, 4, 7, 8], [1, 2, 3, 4]] + expected_accu = [set(ii) for ii in expected_accu] + expected_cand = [set(ii) for ii in expected_cand] + expected_fail = [set(ii) for ii in expected_fail] + all_cand_sel = [(0, 6), (0, 5), (1, 8), (1, 6), (1, 0), (1, 5)] + + ter = exploration_report(0.3, 0.6, 0.3, 0.6, conv_accuracy=0.9) + ter.record(model_devi) + self.assertEqual(ter.traj_cand, expected_cand) + self.assertEqual(ter.traj_accu, expected_accu) + self.assertEqual(ter.traj_fail, expected_fail) + + picked = ter.get_candidate_ids(2) + npicked = 0 + self.assertEqual(len(picked), 2) + for ii in range(2): + for jj in picked[ii]: + self.assertTrue(jj in expected_cand[ii]) + npicked += 1 + self.assertEqual(npicked, 2) + self.assertEqual(ter.candidate_ratio(), 6.0 / 18.0) + self.assertEqual(ter.accurate_ratio(), 2.0 / 18.0) + self.assertEqual(ter.failed_ratio(), 10.0 / 18.0) + + def args_test(self, exploration_report: ExplorationReportTrustLevelsSpin): + input_dict = { + "level_af_lo": 0.5, + "level_af_hi": 1.0, + "level_mf_lo": 0.05, + "level_mf_hi": 0.1, + "conv_accuracy": 0.9, + } + + base = Argument("base", dict, exploration_report.args()) + data = base.normalize_value(input_dict) + self.assertAlmostEqual(data["level_af_lo"], 0.5) + self.assertAlmostEqual(data["level_af_hi"], 1.0) + self.assertAlmostEqual(data["level_mf_lo"], 0.05) + self.assertAlmostEqual(data["level_mf_hi"], 0.1) + self.assertAlmostEqual(data["conv_accuracy"], 0.9) + exploration_report(*data)