diff --git a/deepmd/dpmodel/atomic_model/dp_atomic_model.py b/deepmd/dpmodel/atomic_model/dp_atomic_model.py index cca46d3710..d39e236d07 100644 --- a/deepmd/dpmodel/atomic_model/dp_atomic_model.py +++ b/deepmd/dpmodel/atomic_model/dp_atomic_model.py @@ -79,27 +79,6 @@ def mixed_types(self) -> bool: """ return self.descriptor.mixed_types() - def set_out_bias(self, out_bias: np.ndarray, add=False) -> None: - """ - Modify the output bias for the atomic model. - - Parameters - ---------- - out_bias : np.ndarray - The new bias to be applied. - add : bool, optional - Whether to add the new bias to the existing one. - If False, the output bias will be directly replaced by the new bias. - If True, the new bias will be added to the existing one. - """ - self.fitting["bias_atom_e"] = ( - out_bias + self.fitting["bias_atom_e"] if add else out_bias - ) - - def get_out_bias(self) -> np.ndarray: - """Return the output bias of the atomic model.""" - return self.fitting["bias_atom_e"] - def forward_atomic( self, extended_coord: np.ndarray, diff --git a/deepmd/dpmodel/atomic_model/linear_atomic_model.py b/deepmd/dpmodel/atomic_model/linear_atomic_model.py index 71e4aa542a..e4a85d7bc2 100644 --- a/deepmd/dpmodel/atomic_model/linear_atomic_model.py +++ b/deepmd/dpmodel/atomic_model/linear_atomic_model.py @@ -162,7 +162,6 @@ def forward_atomic( ) ] ener_list = [] - for i, model in enumerate(self.models): mapping = self.mapping_list[i] ener_list.append( @@ -176,13 +175,10 @@ def forward_atomic( )["energy"] ) self.weights = self._compute_weight(extended_coord, extended_atype, nlists_) - self.atomic_bias = None - if self.atomic_bias is not None: - raise NotImplementedError("Need to add bias in a future PR.") - else: - fit_ret = { - "energy": np.sum(np.stack(ener_list) * np.stack(self.weights), axis=0), - } # (nframes, nloc, 1) + + fit_ret = { + "energy": np.sum(np.stack(ener_list) * np.stack(self.weights), axis=0), + } # (nframes, nloc, 1) return fit_ret @staticmethod @@ -252,7 +248,8 @@ def _compute_weight( ) -> List[np.ndarray]: """This should be a list of user defined weights that matches the number of models to be combined.""" nmodels = len(self.models) - return [np.ones(1) / nmodels for _ in range(nmodels)] + nframes, nloc, _ = nlists_[0].shape + return [np.ones((nframes, nloc, 1)) / nmodels for _ in range(nmodels)] def get_dim_fparam(self) -> int: """Get the number (dimension) of frame parameters of this atomic model.""" @@ -275,27 +272,6 @@ def get_sel_type(self) -> List[int]: # join all the selected types return list(set().union(*[model.get_sel_type() for model in self.models])) - def set_out_bias(self, out_bias: np.ndarray, add=False) -> None: - """ - Modify the output bias for all the models in the linear atomic model. - - Parameters - ---------- - out_bias : torch.Tensor - The new bias to be applied. - add : bool, optional - Whether to add the new bias to the existing one. - If False, the output bias will be directly replaced by the new bias. - If True, the new bias will be added to the existing one. - """ - for model in self.models: - model.set_out_bias(out_bias, add=add) - - def get_out_bias(self) -> np.ndarray: - """Return the weighted output bias of the linear atomic model.""" - # TODO add get_out_bias for linear atomic model - raise NotImplementedError - def is_aparam_nall(self) -> bool: """Check whether the shape of atomic parameters is (nframes, nall, ndim). diff --git a/deepmd/dpmodel/atomic_model/make_base_atomic_model.py b/deepmd/dpmodel/atomic_model/make_base_atomic_model.py index 3e02a5d076..936c2b0943 100644 --- a/deepmd/dpmodel/atomic_model/make_base_atomic_model.py +++ b/deepmd/dpmodel/atomic_model/make_base_atomic_model.py @@ -95,25 +95,6 @@ def get_sel_type(self) -> List[int]: If returning an empty list, all atom types are selected. """ - @abstractmethod - def set_out_bias(self, out_bias: t_tensor, add=False) -> None: - """ - Modify the output bias for the atomic model. - - Parameters - ---------- - out_bias : t_tensor - The new bias to be applied. - add : bool, optional - Whether to add the new bias to the existing one. - If False, the output bias will be directly replaced by the new bias. - If True, the new bias will be added to the existing one. - """ - - @abstractmethod - def get_out_bias(self) -> t_tensor: - """Return the output bias of the atomic model.""" - @abstractmethod def is_aparam_nall(self) -> bool: """Check whether the shape of atomic parameters is (nframes, nall, ndim). diff --git a/deepmd/dpmodel/atomic_model/pairtab_atomic_model.py b/deepmd/dpmodel/atomic_model/pairtab_atomic_model.py index c970278bcf..2d3bccb258 100644 --- a/deepmd/dpmodel/atomic_model/pairtab_atomic_model.py +++ b/deepmd/dpmodel/atomic_model/pairtab_atomic_model.py @@ -130,25 +130,6 @@ def mixed_types(self) -> bool: # to match DPA1 and DPA2. return True - def set_out_bias(self, out_bias: np.ndarray, add=False) -> None: - """ - Modify the output bias for the atomic model. - - Parameters - ---------- - out_bias : torch.Tensor - The new bias to be applied. - add : bool, optional - Whether to add the new bias to the existing one. - If False, the output bias will be directly replaced by the new bias. - If True, the new bias will be added to the existing one. - """ - self.bias_atom_e = out_bias + self.bias_atom_e if add else out_bias - - def get_out_bias(self) -> np.ndarray: - """Return the output bias of the atomic model.""" - return self.bias_atom_e - def serialize(self) -> dict: dd = BaseAtomicModel.serialize(self) dd.update( diff --git a/deepmd/pt/model/atomic_model/base_atomic_model.py b/deepmd/pt/model/atomic_model/base_atomic_model.py index 57ca21a826..e750b6a54e 100644 --- a/deepmd/pt/model/atomic_model/base_atomic_model.py +++ b/deepmd/pt/model/atomic_model/base_atomic_model.py @@ -366,7 +366,6 @@ def change_out_bias( rcond=self.rcond, preset_bias=self.preset_out_bias, ) - # self.set_out_bias(delta_bias, add=True) self._store_out_stat(delta_bias, out_std, add=True) elif bias_adjust_mode == "set-by-statistic": bias_out, std_out = compute_output_stats( @@ -377,7 +376,6 @@ def change_out_bias( rcond=self.rcond, preset_bias=self.preset_out_bias, ) - # self.set_out_bias(bias_out) self._store_out_stat(bias_out, std_out) else: raise RuntimeError("Unknown bias_adjust_mode mode: " + bias_adjust_mode) diff --git a/deepmd/pt/model/atomic_model/dp_atomic_model.py b/deepmd/pt/model/atomic_model/dp_atomic_model.py index c9c9e6ed47..6cbbf5ed7f 100644 --- a/deepmd/pt/model/atomic_model/dp_atomic_model.py +++ b/deepmd/pt/model/atomic_model/dp_atomic_model.py @@ -176,6 +176,9 @@ def forward_atomic( ) return fit_ret + def get_out_bias(self) -> torch.Tensor: + return self.out_bias + def compute_or_load_stat( self, sampled_func, @@ -217,27 +220,6 @@ def wrapped_sampler(): self.descriptor.compute_input_stats(wrapped_sampler, stat_file_path) self.compute_or_load_out_stat(wrapped_sampler, stat_file_path) - def set_out_bias(self, out_bias: torch.Tensor, add=False) -> None: - """ - Modify the output bias for the atomic model. - - Parameters - ---------- - out_bias : torch.Tensor - The new bias to be applied. - add : bool, optional - Whether to add the new bias to the existing one. - If False, the output bias will be directly replaced by the new bias. - If True, the new bias will be added to the existing one. - """ - self.fitting_net["bias_atom_e"] = ( - out_bias + self.fitting_net["bias_atom_e"] if add else out_bias - ) - - def get_out_bias(self) -> torch.Tensor: - """Return the output bias of the atomic model.""" - return self.fitting_net["bias_atom_e"] - def get_dim_fparam(self) -> int: """Get the number (dimension) of frame parameters of this atomic model.""" return self.fitting_net.get_dim_fparam() diff --git a/deepmd/pt/model/atomic_model/linear_atomic_model.py b/deepmd/pt/model/atomic_model/linear_atomic_model.py index f9fc97dea4..c5abc4575c 100644 --- a/deepmd/pt/model/atomic_model/linear_atomic_model.py +++ b/deepmd/pt/model/atomic_model/linear_atomic_model.py @@ -1,10 +1,12 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import copy from typing import ( + Callable, Dict, List, Optional, Tuple, + Union, ) import torch @@ -73,7 +75,6 @@ def __init__( self.mapping_list.append(self.remap_atype(tpmp, self.type_map)) assert len(err_msg) == 0, "\n".join(err_msg) - self.atomic_bias = None self.mixed_types_list = [model.mixed_types() for model in self.models] self.rcuts = torch.tensor( self.get_model_rcuts(), dtype=torch.float64, device=env.DEVICE @@ -92,6 +93,9 @@ def mixed_types(self) -> bool: """ return True + def get_out_bias(self) -> torch.Tensor: + return self.out_bias + def get_rcut(self) -> float: """Get the cut-off radius.""" return max(self.get_model_rcuts()) @@ -188,8 +192,9 @@ def forward_atomic( for i, model in enumerate(self.models): mapping = self.mapping_list[i] + # apply bias to each individual model ener_list.append( - model.forward_atomic( + model.forward_common_atomic( extended_coord, mapping[extended_atype], nlists_[i], @@ -198,26 +203,32 @@ def forward_atomic( aparam, )["energy"] ) - weights = self._compute_weight(extended_coord, extended_atype, nlists_) - atype = extended_atype[:, :nloc] - for idx, model in enumerate(self.models): - # TODO: provide interfaces for atomic models to access bias_atom_e - if isinstance(model, DPAtomicModel): - bias_atom_e = model.fitting_net.bias_atom_e - elif isinstance(model, PairTabAtomicModel): - bias_atom_e = model.bias_atom_e - else: - bias_atom_e = None - if bias_atom_e is not None: - ener_list[idx] += bias_atom_e[atype] - fit_ret = { "energy": torch.sum(torch.stack(ener_list) * torch.stack(weights), dim=0), } # (nframes, nloc, 1) return fit_ret + def apply_out_stat( + self, + ret: Dict[str, torch.Tensor], + atype: torch.Tensor, + ): + """Apply the stat to each atomic output. + The developer may override the method to define how the bias is applied + to the atomic output of the model. + + Parameters + ---------- + ret + The returned dict by the forward_atomic method + atype + The atom types. nf x nloc + + """ + return ret + @staticmethod def remap_atype(ori_map: List[str], new_map: List[str]) -> torch.Tensor: """ @@ -284,32 +295,13 @@ def _compute_weight( ) -> List[torch.Tensor]: """This should be a list of user defined weights that matches the number of models to be combined.""" nmodels = len(self.models) + nframes, nloc, _ = nlists_[0].shape return [ - torch.ones(1, dtype=torch.float64, device=env.DEVICE) / nmodels + torch.ones((nframes, nloc, 1), dtype=torch.float64, device=env.DEVICE) + / nmodels for _ in range(nmodels) ] - def set_out_bias(self, out_bias: torch.Tensor, add=False) -> None: - """ - Modify the output bias for all the models in the linear atomic model. - - Parameters - ---------- - out_bias : torch.Tensor - The new bias to be applied. - add : bool, optional - Whether to add the new bias to the existing one. - If False, the output bias will be directly replaced by the new bias. - If True, the new bias will be added to the existing one. - """ - for model in self.models: - model.set_out_bias(out_bias, add=add) - - def get_out_bias(self) -> torch.Tensor: - """Return the weighted output bias of the linear atomic model.""" - # TODO add get_out_bias for linear atomic model - raise NotImplementedError - def get_dim_fparam(self) -> int: """Get the number (dimension) of frame parameters of this atomic model.""" # tricky... @@ -346,6 +338,53 @@ def is_aparam_nall(self) -> bool: """ return False + def compute_or_load_out_stat( + self, + merged: Union[Callable[[], List[dict]], List[dict]], + stat_file_path: Optional[DPPath] = None, + ): + """ + Compute the output statistics (e.g. energy bias) for the fitting net from packed data. + + Parameters + ---------- + merged : Union[Callable[[], List[dict]], List[dict]] + - List[dict]: A list of data samples from various data systems. + Each element, `merged[i]`, is a data dictionary containing `keys`: `torch.Tensor` + originating from the `i`-th data system. + - Callable[[], List[dict]]: A lazy function that returns data samples in the above format + only when needed. Since the sampling process can be slow and memory-intensive, + the lazy function helps by only sampling once. + stat_file_path : Optional[DPPath] + The path to the stat file. + + """ + for md in self.models: + md.compute_or_load_out_stat(merged, stat_file_path) + + def compute_or_load_stat( + self, + sampled_func, + stat_file_path: Optional[DPPath] = None, + ): + """ + Compute or load the statistics parameters of the model, + such as mean and standard deviation of descriptors or the energy bias of the fitting net. + When `sampled` is provided, all the statistics parameters will be calculated (or re-calculated for update), + and saved in the `stat_file_path`(s). + When `sampled` is not provided, it will check the existence of `stat_file_path`(s) + and load the calculated statistics parameters. + + Parameters + ---------- + sampled_func + The lazy sampled function to get data frames from different data systems. + stat_file_path + The dictionary of paths to the statistics files. + """ + for md in self.models: + md.compute_or_load_stat(sampled_func, stat_file_path) + class DPZBLLinearEnergyAtomicModel(LinearEnergyAtomicModel): """Model linearly combine a list of AtomicModels. @@ -388,29 +427,6 @@ def __init__( # this is a placeholder being updated in _compute_weight, to handle Jit attribute init error. self.zbl_weight = torch.empty(0, dtype=torch.float64, device=env.DEVICE) - def compute_or_load_stat( - self, - sampled_func, - stat_file_path: Optional[DPPath] = None, - ): - """ - Compute or load the statistics parameters of the model, - such as mean and standard deviation of descriptors or the energy bias of the fitting net. - When `sampled` is provided, all the statistics parameters will be calculated (or re-calculated for update), - and saved in the `stat_file_path`(s). - When `sampled` is not provided, it will check the existence of `stat_file_path`(s) - and load the calculated statistics parameters. - - Parameters - ---------- - sampled_func - The lazy sampled function to get data frames from different data systems. - stat_file_path - The dictionary of paths to the statistics files. - """ - self.models[0].compute_or_load_stat(sampled_func, stat_file_path) - self.models[1].compute_or_load_stat(sampled_func, stat_file_path) - def serialize(self) -> dict: dd = BaseAtomicModel.serialize(self) dd.update( diff --git a/deepmd/pt/model/atomic_model/pairtab_atomic_model.py b/deepmd/pt/model/atomic_model/pairtab_atomic_model.py index 627dffd620..b4639fcbb4 100644 --- a/deepmd/pt/model/atomic_model/pairtab_atomic_model.py +++ b/deepmd/pt/model/atomic_model/pairtab_atomic_model.py @@ -136,6 +136,9 @@ def fitting_output_def(self) -> FittingOutputDef: ] ) + def get_out_bias(self) -> torch.Tensor: + return self.out_bias + def get_rcut(self) -> float: return self.rcut @@ -226,25 +229,6 @@ def compute_or_load_stat( """ self.compute_or_load_out_stat(merged, stat_file_path) - def set_out_bias(self, out_bias: torch.Tensor, add=False) -> None: - """ - Modify the output bias for the atomic model. - - Parameters - ---------- - out_bias : torch.Tensor - The new bias to be applied. - add : bool, optional - Whether to add the new bias to the existing one. - If False, the output bias will be directly replaced by the new bias. - If True, the new bias will be added to the existing one. - """ - self.bias_atom_e = out_bias + self.bias_atom_e if add else out_bias - - def get_out_bias(self) -> torch.Tensor: - """Return the output bias of the atomic model.""" - return self.bias_atom_e - def forward_atomic( self, extended_coord: torch.Tensor, diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 73404b0c83..f6fbfedf15 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -1168,7 +1168,7 @@ def _model_change_out_bias( idx_type_map = sorter[np.searchsorted(model_type_map, new_type_map, sorter=sorter)] log.info( f"Change output bias of {new_type_map!s} " - f"from {to_numpy_array(old_bias[idx_type_map]).reshape(-1)!s} " - f"to {to_numpy_array(new_bias[idx_type_map]).reshape(-1)!s}." + f"from {to_numpy_array(old_bias[:,idx_type_map]).reshape(-1)!s} " + f"to {to_numpy_array(new_bias[:,idx_type_map]).reshape(-1)!s}." ) return _model diff --git a/source/tests/pt/model/test_linear_atomic_model_stat.py b/source/tests/pt/model/test_linear_atomic_model_stat.py new file mode 100644 index 0000000000..010cecf9f8 --- /dev/null +++ b/source/tests/pt/model/test_linear_atomic_model_stat.py @@ -0,0 +1,231 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import tempfile +import unittest +from pathlib import ( + Path, +) +from typing import ( + Optional, +) + +import h5py +import numpy as np +import torch + +from deepmd.dpmodel.output_def import ( + FittingOutputDef, + OutputVariableDef, +) +from deepmd.pt.model.atomic_model import ( + DPAtomicModel, + LinearEnergyAtomicModel, +) +from deepmd.pt.model.descriptor.dpa1 import ( + DescrptDPA1, +) +from deepmd.pt.model.task.base_fitting import ( + BaseFitting, +) +from deepmd.pt.utils import ( + env, +) +from deepmd.pt.utils.utils import ( + to_numpy_array, + to_torch_tensor, +) +from deepmd.utils.path import ( + DPPath, +) + +from .test_env_mat import ( + TestCaseSingleFrameWithNlist, +) + +dtype = env.GLOBAL_PT_FLOAT_PRECISION + + +class FooFittingA(torch.nn.Module, BaseFitting): + def output_def(self): + return FittingOutputDef( + [ + OutputVariableDef( + "energy", + [1], + reduciable=True, + r_differentiable=True, + c_differentiable=True, + ), + ] + ) + + def serialize(self) -> dict: + raise NotImplementedError + + def forward( + self, + descriptor: torch.Tensor, + atype: torch.Tensor, + gr: Optional[torch.Tensor] = None, + g2: Optional[torch.Tensor] = None, + h2: Optional[torch.Tensor] = None, + fparam: Optional[torch.Tensor] = None, + aparam: Optional[torch.Tensor] = None, + ): + nf, nloc, _ = descriptor.shape + ret = {} + ret["energy"] = ( + torch.Tensor( + [ + [1.0, 2.0, 3.0], + [4.0, 5.0, 6.0], + ] + ) + .view([nf, nloc, *self.output_def()["energy"].shape]) + .to(env.GLOBAL_PT_FLOAT_PRECISION) + .to(env.DEVICE) + ) + + return ret + + +class FooFittingB(torch.nn.Module, BaseFitting): + def output_def(self): + return FittingOutputDef( + [ + OutputVariableDef( + "energy", + [1], + reduciable=True, + r_differentiable=True, + c_differentiable=True, + ), + ] + ) + + def serialize(self) -> dict: + raise NotImplementedError + + def forward( + self, + descriptor: torch.Tensor, + atype: torch.Tensor, + gr: Optional[torch.Tensor] = None, + g2: Optional[torch.Tensor] = None, + h2: Optional[torch.Tensor] = None, + fparam: Optional[torch.Tensor] = None, + aparam: Optional[torch.Tensor] = None, + ): + nf, nloc, _ = descriptor.shape + ret = {} + ret["energy"] = ( + torch.Tensor( + [ + [7.0, 8.0, 9.0], + [10.0, 11.0, 12.0], + ] + ) + .view([nf, nloc, *self.output_def()["energy"].shape]) + .to(env.GLOBAL_PT_FLOAT_PRECISION) + .to(env.DEVICE) + ) + + return ret + + +class TestAtomicModelStat(unittest.TestCase, TestCaseSingleFrameWithNlist): + def tearDown(self): + self.tempdir.cleanup() + + def setUp(self): + TestCaseSingleFrameWithNlist.setUp(self) + nf, nloc, nnei = self.nlist.shape + self.merged_output_stat = [ + { + "coord": to_torch_tensor(np.zeros([2, 3, 3])), + "atype": to_torch_tensor( + np.array([[0, 0, 1], [0, 1, 1]], dtype=np.int32) + ), + "atype_ext": to_torch_tensor( + np.array([[0, 0, 1, 0], [0, 1, 1, 0]], dtype=np.int32) + ), + "box": to_torch_tensor(np.zeros([2, 3, 3])), + "natoms": to_torch_tensor( + np.array([[3, 3, 2, 1], [3, 3, 1, 2]], dtype=np.int32) + ), + # bias of foo: 1, 3 + "energy": to_torch_tensor(np.array([5.0, 7.0]).reshape(2, 1)), + } + ] + self.tempdir = tempfile.TemporaryDirectory() + h5file = str((Path(self.tempdir.name) / "testcase.h5").resolve()) + with h5py.File(h5file, "w") as f: + pass + self.stat_file_path = DPPath(h5file, "a") + + def test_linear_atomic_model_stat_with_bias(self): + nf, nloc, nnei = self.nlist.shape + ds = DescrptDPA1( + self.rcut, + self.rcut_smth, + sum(self.sel), + self.nt, + ).to(env.DEVICE) + ft_a = FooFittingA().to(env.DEVICE) + ft_b = FooFittingB().to(env.DEVICE) + type_map = ["foo", "bar"] + md0 = DPAtomicModel( + ds, + ft_a, + type_map=type_map, + ).to(env.DEVICE) + md1 = DPAtomicModel( + ds, + ft_b, + type_map=type_map, + ).to(env.DEVICE) + linear_model = LinearEnergyAtomicModel([md0, md1], type_map=type_map).to( + env.DEVICE + ) + + args = [ + to_torch_tensor(ii) for ii in [self.coord_ext, self.atype_ext, self.nlist] + ] + # nf x nloc + at = self.atype_ext[:, :nloc] + + # 1. test run without bias + # nf x na x odim + ret0 = linear_model.forward_common_atomic(*args) + + ret0 = to_numpy_array(ret0["energy"]) + ret_no_bias = [] + for md in linear_model.models: + ret_no_bias.append( + to_numpy_array(md.forward_common_atomic(*args)["energy"]) + ) + expected_ret0 = np.array( + [ + [4.0, 5.0, 6.0], + [7.0, 8.0, 9.0], + ] + ).reshape(nf, nloc, *linear_model.fitting_output_def()["energy"].shape) + + np.testing.assert_almost_equal(ret0, expected_ret0) + + # 2. test bias is applied + linear_model.compute_or_load_out_stat( + self.merged_output_stat, stat_file_path=self.stat_file_path + ) + # bias applied to sub atomic models. + ener_bias = np.array([1.0, 3.0]).reshape(2, 1) + linear_ret = [] + for idx, md in enumerate(linear_model.models): + ret = md.forward_common_atomic(*args) + ret = to_numpy_array(ret["energy"]) + linear_ret.append(ret_no_bias[idx] + ener_bias[at]) + np.testing.assert_almost_equal((ret_no_bias[idx] + ener_bias[at]), ret) + + # linear model not adding bias again + ret1 = linear_model.forward_common_atomic(*args) + ret1 = to_numpy_array(ret1["energy"]) + np.testing.assert_almost_equal(np.mean(np.stack(linear_ret), axis=0), ret1)