Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Prob sampler #129

Merged
merged 14 commits into from
Oct 31, 2023
10 changes: 9 additions & 1 deletion deepmd_pt/entrypoints/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from deepmd_pt.utils.stat import make_stat_input
from deepmd_pt.utils.multi_task import preprocess_shared_params

from deepmd_pt import __version__


def get_trainer(config, init_model=None, restart_model=None, finetune_model=None, model_branch='', force_load=False):
# Initialize DDP
Expand Down Expand Up @@ -158,13 +160,19 @@ def freeze(FLAGS):
# TODO: _extra_files
})


#avoid logger conflicts of tf version
def clean_loggers():
logger = logging.getLogger()
while logger.hasHandlers():
logger.removeHandler(logger.handlers[0])
@record
def main(args=None):
clean_loggers()
logging.basicConfig(
level=logging.WARNING if env.LOCAL_RANK else logging.INFO,
format=f"%(asctime)-15s {os.environ.get('RANK') or ''} [%(filename)s:%(lineno)d] %(levelname)s %(message)s"
)
logging.info('DeepMD version: %s', __version__)
parser = argparse.ArgumentParser(description='A tool to manager deep models of potential energy surface.')
subparsers = parser.add_subparsers(dest='command')
train_parser = subparsers.add_parser('train', help='Train a model.')
Expand Down
5 changes: 2 additions & 3 deletions deepmd_pt/model/task/ener.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, ntypes, embedding_width, neuron, bias_atom_e, resnet_dt=True,

filter_layers = []
for type_i in range(self.ntypes):
bias_type = 0.0 if self.use_tebd else bias_atom_e[type_i]
bias_type = 0.0
one = ResidualDeep(type_i, embedding_width, neuron, bias_type, resnet_dt=resnet_dt)
filter_layers.append(one)
self.filter_layers = torch.nn.ModuleList(filter_layers)
Expand Down Expand Up @@ -69,8 +69,7 @@ def forward(self,
for type_i, filter_layer in enumerate(self.filter_layers):
mask = atype == type_i
atom_energy = filter_layer(inputs)
if not env.ENERGY_BIAS_TRAINABLE:
atom_energy = atom_energy + self.bias_atom_e[type_i]
atom_energy = atom_energy + self.bias_atom_e[type_i]
atom_energy = atom_energy * mask.unsqueeze(-1)
outs = outs + atom_energy # Shape is [nframes, natoms[0], 1]
return outs.to(env.GLOBAL_PT_FLOAT_PRECISION), None
Expand Down
20 changes: 17 additions & 3 deletions deepmd_pt/train/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from deepmd_pt.loss import EnergyStdLoss, DenoiseLoss
from deepmd_pt.model.model import get_model
from deepmd_pt.train.wrapper import ModelWrapper
from deepmd_pt.utils.dataloader import BufferedIterator
from deepmd_pt.utils.dataloader import BufferedIterator, get_weighted_sampler
from pathlib import Path
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
Expand Down Expand Up @@ -99,9 +99,23 @@ def get_opt_param(params):
return opt_type, opt_param

def get_data_loader(_training_data, _validation_data, _training_params):
if 'auto_prob' in _training_params['training_data']:
train_sampler = get_weighted_sampler(_training_data, _training_params['training_data']['auto_prob'])
elif 'sys_probs' in _training_params['training_data']:
train_sampler = get_weighted_sampler(_training_data, _training_params['training_data']['sys_probs'],sys_prob=True)
else:
train_sampler = get_weighted_sampler(_training_data, 'prob_sys_size')


if 'auto_prob' in _training_params['validation_data']:
valid_sampler = get_weighted_sampler(_validation_data, _training_params['validation_data']['auto_prob'])
elif 'sys_probs' in _training_params['validation_data']:
valid_sampler = get_weighted_sampler(_validation_data, _training_params['validation_data']['sys_probs'],sys_prob=True)
else:
valid_sampler = get_weighted_sampler(_validation_data, 'prob_sys_size')
training_dataloader = DataLoader(
_training_data,
sampler=torch.utils.data.RandomSampler(_training_data),
sampler=train_sampler,
batch_size=None,
num_workers=8, # setting to 0 diverges the behavior of its iterator; should be >=1
drop_last=False,
Expand All @@ -110,7 +124,7 @@ def get_data_loader(_training_data, _validation_data, _training_params):
training_data_buffered = BufferedIterator(iter(training_dataloader))
validation_dataloader = DataLoader(
_validation_data,
sampler=torch.utils.data.RandomSampler(_validation_data),
sampler=valid_sampler,
batch_size=None,
num_workers=1,
drop_last=False,
Expand Down
37 changes: 30 additions & 7 deletions deepmd_pt/utils/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import queue
import time
import numpy as np
from threading import Thread
from typing import Callable, Dict, List, Tuple, Type, Union
from multiprocessing.dummy import Pool
Expand All @@ -11,11 +12,15 @@
import torch.distributed as dist
from deepmd_pt.utils import env
from deepmd_pt.utils.dataset import DeepmdDataSetForLoader
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
import torch.multiprocessing

from deepmd.utils.data_system import (
prob_sys_size_ext,
process_sys_probs
)
torch.multiprocessing.set_sharing_strategy("file_system")


Expand Down Expand Up @@ -75,6 +80,7 @@ def construct_dataset(system):

self.sampler_list: List[DistributedSampler] = []
self.index = []
self.total_batch = 0

self.dataloaders = []
for system in self.systems:
Expand Down Expand Up @@ -105,9 +111,8 @@ def construct_dataset(system):
shuffle=(not dist.is_initialized()) and shuffle,
)
self.dataloaders.append(system_dataloader)
for _ in range(len(system_dataloader)):
self.index.append(len(self.dataloaders) - 1)

self.index.append(len(system_dataloader))
self.total_batch += len(system_dataloader)
# Initialize iterator instances for DataLoader
self.iters = []
for item in self.dataloaders:
Expand All @@ -124,11 +129,11 @@ def set_noise(self, noise_settings):
system.set_noise(noise_settings)

def __len__(self):
return len(self.index)
return len(self.dataloaders)

def __getitem__(self, idx):
# logging.warning(str(torch.distributed.get_rank())+" idx: "+str(idx)+" index: "+str(self.index[idx]))
return next(self.iters[self.index[idx]])
#logging.warning(str(torch.distributed.get_rank())+" idx: "+str(idx)+" index: "+str(self.index[idx]))
return next(self.iters[idx])


_sentinel = object()
Expand Down Expand Up @@ -257,3 +262,21 @@ def collate_batch(batch):
else:
result[key] = collate_tensor_fn([d[key] for d in batch])
return result

def get_weighted_sampler(training_data,prob_style,sys_prob=False):
if sys_prob == False:
if prob_style == "prob_uniform":
prob_v = 1.0 / float(training_data.__len__())
probs = [prob_v for ii in range(training_data.__len__())]
else:#prob_sys_size;A:B:p1;C:D:p2 or prob_sys_size = prob_sys_size;0:nsys:1.0
if prob_style == "prob_sys_size":
style = "prob_sys_size;0:{}:1.0".format(len(training_data))
else:
style = prob_style
probs = prob_sys_size_ext(style,len(training_data),training_data.index)
else:
probs = process_sys_probs(prob_style,training_data.index)
logging.info("Generated weighted sampler with prob array: "+str(probs))
#training_data.total_batch is the size of one epoch, you can increase it to avoid too many rebuilding of iteraters
sampler = WeightedRandomSampler(probs,training_data.total_batch, replacement = True)
return sampler
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
'tqdm',
'h5py',
'wandb',
'deepmd-kit >= 2.2.7',
]
requires-python = ">=3.8"
readme = "README.md"
Expand Down
4 changes: 2 additions & 2 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
tensorflow==2.10.0
deepmd-kit==2.1.5
tensorflow>=2.14.0
deepmd-kit>=2.2.7
coverage
pytest
23 changes: 12 additions & 11 deletions tests/test_fitting_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ def gen_key(type_id, layer_id, w_or_b):
return (type_id, layer_id, w_or_b)


def base_fitting_net(dp_fn, embedding, natoms):
def base_fitting_net(dp_fn, embedding, natoms, atype):
g = tf.Graph()
with g.as_default():
t_embedding = tf.placeholder(GLOBAL_NP_FLOAT_PRECISION, [None, None])
t_natoms = tf.placeholder(tf.int32, [None])
t_energy = dp_fn.build(t_embedding, t_natoms, {})
t_atype = tf.placeholder(tf.int32, [None, None])
t_energy = dp_fn.build(t_embedding, t_natoms, {'atype': t_atype})
init_op = tf.global_variables_initializer()
t_vars = {}
for var in tf.global_variables():
Expand All @@ -55,7 +56,8 @@ def base_fitting_net(dp_fn, embedding, natoms):
sess.run(init_op)
energy, values = sess.run([t_energy, t_vars], feed_dict={
t_embedding: embedding,
t_natoms: natoms
t_natoms: natoms,
t_atype: atype,
})
return energy, values

Expand All @@ -69,14 +71,18 @@ def setUp(self):
self.embedding = np.random.uniform(size=[4, nloc * self.embedding_width])
self.ntypes = self.natoms.size - 2
self.n_neuron = [32, 32, 32]
self.atype = np.zeros([4, nloc], dtype=np.int32)
cnt = 0
for i in range(self.ntypes):
self.atype[:, cnt:cnt + self.natoms[i + 2]] = i
cnt += self.natoms[i + 2]

fake_d = FakeDescriptor(2, 30)
self.dp_fn = EnerFitting(fake_d, self.n_neuron)
self.dp_fn.bias_atom_e = np.random.uniform(size=[self.ntypes])
self.dp_fn.bias_atom_e = [1e8, 0]

def test_consistency(self):
dp_energy, values = base_fitting_net(self.dp_fn, self.embedding, self.natoms)
dp_energy, values = base_fitting_net(self.dp_fn, self.embedding, self.natoms, self.atype)
my_fn = EnergyFittingNet(self.ntypes, self.embedding_width, self.n_neuron, self.dp_fn.bias_atom_e, use_tebd=False)
for name, param in my_fn.named_parameters():
matched = re.match('filter_layers\.(\d).deep_layers\.(\d)\.([a-z]+)', name)
Expand All @@ -94,12 +100,7 @@ def test_consistency(self):
param.data.copy_(torch.from_numpy(var))
embedding = torch.from_numpy(self.embedding)
embedding = embedding.view(4, -1, self.embedding_width)
natoms = torch.from_numpy(self.natoms)
atype = torch.zeros(1, natoms[0], dtype=torch.long)
cnt = 0
for i in range(natoms.shape[0] - 2):
atype[:, cnt:cnt + natoms[i + 2]] = i
cnt += natoms[i + 2]
atype = torch.from_numpy(self.atype)
my_energy, _ = my_fn(embedding, atype)
my_energy = my_energy.detach()
self.assertTrue(np.allclose(dp_energy, my_energy.numpy().reshape([-1])))
Expand Down
77 changes: 77 additions & 0 deletions tests/test_sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import numpy as np
import os
import unittest
import json

from deepmd.utils.data_system import DeepmdDataSystem
from deepmd.utils import random as tf_random
from deepmd.common import expand_sys_str

from deepmd_pt.utils.dataloader import DpLoaderSet, get_weighted_sampler
from deepmd_pt.utils import env

CUR_DIR = os.path.dirname(__file__)



class TestSampler(unittest.TestCase):

def setUp(self):
with open(env.TEST_CONFIG, 'r') as fin:
content = fin.read()
config = json.loads(content)
model_config = config['model']
self.rcut = model_config['descriptor']['rcut']
self.rcut_smth = model_config['descriptor']['rcut_smth']
self.sel = model_config['descriptor']['sel']
self.batch_size = config['training']['training_data']['batch_size']
self.systems = config['training']['validation_data']['systems']
if isinstance(self.systems, str):
self.systems = expand_sys_str(self.systems)
self.my_dataset = DpLoaderSet(self.systems, self.batch_size,
model_params={
'descriptor': {
'sel': self.sel,
'rcut': self.rcut,
},
'type_map': model_config['type_map']
}, seed=10)

tf_random.seed(10)
self.dp_dataset = DeepmdDataSystem(self.systems, self.batch_size, 1, self.rcut)

def test_auto_prob_uniform(self):
auto_prob_style= 'prob_uniform'
sampler = get_weighted_sampler(self.my_dataset,prob_style=auto_prob_style)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))


def test_auto_prob_sys_size(self):
auto_prob_style= 'prob_sys_size'
sampler = get_weighted_sampler(self.my_dataset,prob_style=auto_prob_style)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))

def test_auto_prob_sys_size_ext(self):
auto_prob_style= 'prob_sys_size;0:1:0.2;1:3:0.8'
sampler = get_weighted_sampler(self.my_dataset,prob_style=auto_prob_style)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(auto_prob_style=auto_prob_style)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))

def test_sys_probs(self):
sys_probs= [0.1,0.4,0.5]
sampler = get_weighted_sampler(self.my_dataset,prob_style=sys_probs,sys_prob=True)
my_probs = np.array(sampler.weights)
self.dp_dataset.set_sys_probs(sys_probs=sys_probs)
dp_probs = np.array(self.dp_dataset.sys_probs)
self.assertTrue(np.allclose(my_probs,dp_probs))

if __name__ == '__main__':
unittest.main()