Skip to content

Commit

Permalink
add reduce-water xtc output
Browse files Browse the repository at this point in the history
  • Loading branch information
Chenggong committed Jul 11, 2023
1 parent 76bb54d commit 3bd2755
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 46 deletions.
106 changes: 74 additions & 32 deletions script/count_cylinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import datetime
import os

from MDAnalysis.analysis import distances

class PermeationEvent:
def __init__(self, ind):
Expand Down Expand Up @@ -198,6 +198,49 @@ def prepare_state_str(sf, K_name, state_ts_dict):
state_string = sf.state_2_string(state_ts_dict, method="Everything")
return state_string

def update_event_count_dict(event_count_dict, ts, sf, atom_selection_dict):
state_ts_dict = {}
for at_name in event_count_dict:
at_selection = atom_selection_dict[at_name]
at_state_a = sf.state_detect(at_selection) # state array with the label for every atom
at_state_l = sf.state_2_list(at_state_a,
at_selection) # state list with the index of atoms in every binding site
state_ts_dict[at_name] = at_state_l
at_state_a = state_label_convert(at_state_a)
event_count_dict[at_name].update(at_state_a)

# if K/POT Water system print as K_priority, else print Everything
state_string = prepare_state_str(sf, args.K_name, state_ts_dict)
print("# S6l", ts.frame, state_string)
for i in range(6):
for at_name in state_ts_dict:
print(f" {at_name} : ", end="")
for index in state_ts_dict[at_name][i]:
print(index, end=" ")
print(end=",")
print()


def get_closest_water(center_selection, water_O_selection, n_water, distance_array):
"""
Input the center selection and water oxygen selection, return the closest n water molecules.
:param center_selection: MDAnalysis selection for the center.
We only measure the distance from the center of this selection.
:param water_O_selection: MDAnalysis selection for the water oxygen atoms.
Distance to every water oxygen atom will be measured.
:param n_water: number of water to be selected.
Only the closest n water will be returned.
:param distance_array: distance array for memory efficiency, shape should be (1, water_O_selection.n_atoms)
:return: A MDAnalysis selection for the closest n water molecules
"""
center = center_selection.center_of_geometry()
dist_matrix = distances.distance_array(center, water_O_selection.positions,
box=water_O_selection.dimensions, result=distance_array)
closest_indices = dist_matrix.argsort()[:, :n_water]
closest_indices = closest_indices.reshape(-1)
closest_indices.sort()
closest_water = water_O_selection[closest_indices].residues.atoms
return closest_water

if __name__ == "__main__":
parser = argparse.ArgumentParser(description=
Expand All @@ -210,7 +253,8 @@ def prepare_state_str(sf, K_name, state_ts_dict):
parser.add_argument("-pdb",
dest="top",
help="Ideally This file should be generated from the same trjconv command as xtc. gro and tpr "
"are also acceptable",
"are also acceptable. Water should have resname of SOL, and Water oxygen should have "
"atom name of OW.",
metavar="top.pdb",
type=argparse.FileType('r'),
required=True)
Expand Down Expand Up @@ -280,7 +324,6 @@ def prepare_state_str(sf, K_name, state_ts_dict):
parser.add_argument("-reduced_xtc",
dest="reduced_xtc",
metavar="file name",
type=argparse.FileType("w"),
help="file name for the water-reduced xtc file", )

args = parser.parse_args()
Expand All @@ -298,21 +341,20 @@ def prepare_state_str(sf, K_name, state_ts_dict):
print(f"Radius cutoff for S0 is : {args.s0_rad} Å")
print(f"Z cutoff for S5 is : {args.s5_cutoff} Å")
if args.reduced_xtc is None:
# Output xtc name is not provided, no xtc output
print("No water-reduced xtc output")
print("-reduced_xtc not provided, No water-reduced xtc output")
else:
print(f"Water-reduced xtc output : {args.reduced_xtc.name}")
print(f"Water-reduced xtc output : {args.reduced_xtc}")
print(f"The number of water to keep : {args.n_water}")
# if file exists, delete it
if os.path.exists(args.reduced_xtc.name):
user_input = input("The file exists, do you want to overwrite it? y/Y or Ctrl-C")
if os.path.exists(args.reduced_xtc):
user_input = input("The file exists, do you want to overwrite it? y/Y or Ctrl-C :")
if user_input.lower() == "y":
os.remove(args.reduced_xtc.name)
os.remove(args.reduced_xtc)
else:
sys.exit("User exit")
# check extension, only xtc is allowed
if os.path.splitext(args.reduced_xtc.name)[1][1:] not in ["xtc"]:
sys.exit("Only xtc is allowed for water-reduced trajectory")
if os.path.splitext(args.reduced_xtc)[1][1:] not in ["xtc"]:
sys.exit("Only xtc is allowed for water-reduced trajectory. Exit")
print("#################################################################################")

u = mda.Universe(args.top.name, args.traj.name)
Expand All @@ -324,7 +366,7 @@ def prepare_state_str(sf, K_name, state_ts_dict):
print(site)
for atom in atoms:
print(f'{atom.resname} {atom.name}, Index (0 base): {atom.index}')
event_count_dict = {}
event_count_dict = {} # key is atom name, value is PermeationEvent object
atom_selection_dict = {}
for atom in args.K_name:
selection = sf.u.select_atoms('name ' + atom)
Expand All @@ -337,27 +379,27 @@ def prepare_state_str(sf, K_name, state_ts_dict):
atom_selection_dict["Wat"] = wat_selection

print("# Loop Over Traj ################################################################################")
for ts in u.trajectory:
state_ts_dict = {}
for at_name in event_count_dict:
at_selection = atom_selection_dict[at_name]
at_state_a = sf.state_detect(at_selection) # state array with the label for every atom
at_state_l = sf.state_2_list(at_state_a,
at_selection) # state list with the index of atoms in every binding site
state_ts_dict[at_name] = at_state_l
at_state_a = state_label_convert(at_state_a)
event_count_dict[at_name].update(at_state_a)
if args.reduced_xtc is None:
for ts in u.trajectory:
update_event_count_dict(event_count_dict, ts, sf, atom_selection_dict)
else:
# prepare the selection
if args.n_water > len(wat_selection):
sys.exit("The number of water to keep is larger than the number of water in the trajectory. Exit")
elif args.n_water <= 0:
sys.exit("The number of water to keep is smaller than 0. Exit")

distance_array = np.zeros((1, wat_selection.n_atoms)) # prepare the distance matrix array on memory
non_water = u.select_atoms('not resname SOL')
with mda.Writer(args.reduced_xtc, n_atoms=non_water.n_atoms + args.n_water*3) as W:
for ts in u.trajectory:
# update permeation count
update_event_count_dict(event_count_dict, ts, sf, atom_selection_dict)

# write the reduced trajectory
waters = get_closest_water(sf.sf_oxygen[-1], wat_selection, args.n_water, distance_array)
W.write(non_water + waters)

# if K Water system print as K_priority
state_string = prepare_state_str(sf, args.K_name, state_ts_dict)
print("# S6l", ts.frame, state_string)
for i in range(6):
for at_name in state_ts_dict:
print(f" {at_name} : ", end="")
for index in state_ts_dict[at_name][i]:
print(index, end=" ")
print(end=",")
print()
print("#################################################################################")
knows_charge_table = {"POT": 1, "K": 1,
"SOD": 1, "Na": 1,
Expand Down
76 changes: 62 additions & 14 deletions script/test_PermeationEvent.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import unittest
import sys
from count_cylinder import PermeationEvent
from count_cylinder import get_closest_water
from Sfilter import Sfilter
import MDAnalysis as mda
import numpy as np
import subprocess
import os

class MyTestCase(unittest.TestCase):
def test_up(self):
print("\n# Test PermeationEvent with output writing")
seq = [np.array([5, 5]),
np.array([1, 1]),
np.array([3, 3]),
Expand Down Expand Up @@ -39,8 +43,10 @@ def test_up(self):
self.assertEqual(lines[5], " 5962 , 6, 120, 40\n")
self.assertEqual(lines[6], " 5962 , 9, 180, 40\n")
self.assertTrue("Permeation events up : 5\n" in lines)
os.remove("file")

def test_down(self):
print("\nTest PermeationEvent with down permeation event")
seq = [np.array([1, 5]),
np.array([5, 4]),
np.array([4, 3]),
Expand Down Expand Up @@ -68,20 +74,28 @@ def test_down(self):
def test_count_cylinder_01(self):
print("\n# Test count_cylinder.py and see if we can get the consistent 6 letter code")
args = "-pdb ../test/01-NaK2K/1-Charmm/em.pdb -xtc ../test/01-NaK2K/1-Charmm/with_water/fix_atom_c_100ps.xtc " \
"-K POT -SF_seq THR VAL GLY TYR GLY"
command = "count_cylinder.py " + args
results = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
res_text = results.stdout.decode('utf-8').split("\n")
letter_codes = []
for line in res_text:
if "# S6l" in line:
letter_codes.append(line.split()[-1])
self.assertListEqual(letter_codes, ["WKKKKW", "KK0KKW", "KK0KKW", "WKKKKW", "WKKKKW",
"WKKKKW", "KK0KKW", "WKK0KW", "0K0KKW", "WK0KKW",
"K0KKKW", ])
# remove the output file
os.remove("POT_perm_event.out")
os.remove("Wat_perm_event.out")
"-K POT -SF_seq THR VAL GLY TYR GLY "
xtc_out = "../test/01-NaK2K/1-Charmm/with_water/fix_10water.xtc"
commands = ["count_cylinder.py " + args,
"count_cylinder.py " + args + " -n_water 10 -reduced_xtc " + xtc_out,
]
for command in commands:
if os.path.isfile(xtc_out):
os.remove(xtc_out)
results = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
res_text = results.stdout.decode('utf-8').split("\n")
letter_codes = []
for line in res_text:
if "# S6l" in line:
letter_codes.append(line.split()[-1])
self.assertListEqual(letter_codes, ["WKKKKW", "KK0KKW", "KK0KKW", "WKKKKW", "WKKKKW",
"WKKKKW", "KK0KKW", "WKK0KW", "0K0KKW", "WK0KKW",
"K0KKKW", ])
# remove the output file
os.remove("POT_perm_event.out")
os.remove("Wat_perm_event.out")


def test_count_cylinder_02(self):
print("\n# Test count_cylinder.py and see if we can count the permeation event")
args = "-pdb ../test/01-NaK2K/1-Charmm/dry/em_dry.gro -xtc ../test/01-NaK2K/1-Charmm/dry/fix_atom_c_100ps_dry.xtc " \
Expand All @@ -106,6 +120,40 @@ def test_count_cylinder_02(self):
os.remove("K_perm_event.out")
os.remove("Wat_perm_event.out")

def test_get_closest_water(self):
print("\n# Test if we can iteratively find the closest water")
u = mda.Universe("../test/01-NaK2K/1-Charmm/em.pdb",
"../test/01-NaK2K/1-Charmm/with_water/fix_atom_c_100ps.xtc")
sf = Sfilter(u)
sf.detect_SF_sequence(SF_seq1="THR VAL GLY TYR GLY".split())
s45 = sf.sf_oxygen[-1]
water_O_selection = u.select_atoms('resname SOL and name OW')
distance_array = np.zeros((1, water_O_selection.n_atoms))
waters_0 = []
for ts in u.trajectory:
waters = get_closest_water(s45[[0]], water_O_selection, 2, distance_array)
# print(waters.ix) # only 0 and 3 are oxygen atoms
waters_0.append(waters.ix[[0, 3]])
self.assertTrue(np.all((waters.ix[[1, 4]] - waters.ix[[0, 3]]) == 1))
self.assertTrue(np.all((waters.ix[[2, 5]] - waters.ix[[0, 3]]) == 2))
waters_1 = []
for ts in u.trajectory:
waters = get_closest_water(s45, water_O_selection, 2, distance_array)
waters_1.append(waters.ix[[0, 3]])
self.assertTrue(np.all((waters.ix[[1, 4]] - waters.ix[[0, 3]]) == 1))
self.assertTrue(np.all((waters.ix[[2, 5]] - waters.ix[[0, 3]]) == 2))

self.assertListEqual(waters_0[0].tolist(), [40145, 44588])
self.assertListEqual(waters_0[1].tolist(), [36476, 61961])
self.assertListEqual(waters_0[2].tolist(), [49097, 55148])

self.assertTrue(40145 in waters_1[0])
self.assertTrue(61961 in waters_1[1])
self.assertTrue(51449 in waters_1[2])
self.assertTrue(43874 in waters_1[3] and 58355 in waters_1[3])
self.assertTrue(49463 in waters_1[4])





Expand Down

0 comments on commit 3bd2755

Please sign in to comment.