This repository has been archived by the owner on Oct 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
134 additions
and
143 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,77 +1,85 @@ | ||
from typing import Tuple | ||
|
||
from imdreader.IMDProtocol import * | ||
import MDAnalysis as mda | ||
from MDAnalysis.coordinates.memory import MemoryReader | ||
import numpy as np | ||
import socket | ||
|
||
|
||
def make_Universe( | ||
extras: Tuple[str] = tuple(), | ||
size: Tuple[int, int, int] = (125, 25, 5), | ||
n_frames: int = 0, | ||
velocities: bool = False, | ||
forces: bool = False | ||
) -> mda.Universe: | ||
"""Make a dummy reference Universe | ||
class DummyIMDServer: | ||
def __init__( | ||
self, | ||
imdwait=True, | ||
imdpull=False, | ||
imdterm=True, | ||
port=8888, | ||
endianness="<", | ||
version=2, | ||
): | ||
self.port = port | ||
self.imdstep = -1 | ||
self.imdwait = imdwait | ||
self.imdpull = imdpull | ||
self.imdterm = imdterm | ||
self.endian = endianness | ||
self.version = version | ||
|
||
Allows the construction of arbitrary-sized Universes. Suitable for | ||
the generation of structures for output. | ||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
self.socket.bind(("localhost", self.port)) | ||
# GROMACS waits for a connection for an unlimited time here | ||
self.socket.listen(1) | ||
self.conn, self.address = self.socket.accept() | ||
|
||
Preferable for testing core components because: | ||
* minimises dependencies within the package | ||
* very fast compared to a "real" Universe | ||
self._send_handshake() | ||
self._simulation_loop() | ||
|
||
Parameters | ||
---------- | ||
extras : tuple of strings, optional | ||
extra attributes to add to Universe: | ||
u = make_Universe(('masses', 'charges')) | ||
Creates a lightweight Universe with only masses and charges. | ||
size : tuple of int, optional | ||
number of elements of the Universe (n_atoms, n_residues, n_segments) | ||
n_frames : int | ||
If positive, create a fake Reader object attached to Universe | ||
velocities : bool, optional | ||
if the fake Reader provides velocities | ||
force : bool, optional | ||
if the fake Reader provides forces | ||
def _send_handshake(self): | ||
type = struct.pack("!i", IMDType.IMD_HANDSHAKE.value) | ||
length = struct.pack(f"{self.endian}i", self.version) | ||
header = type + length | ||
self.conn.send(header) | ||
|
||
Returns | ||
------- | ||
MDAnalysis.core.universe.Universe object | ||
def disconnect(self): | ||
header = create_header_bytes(IMDType.IMD_DISCONNECT, 0) | ||
self.connection.close() | ||
self.socket.close() | ||
|
||
""" | ||
def _simulation_loop(self): | ||
if self.imdwait and self.imdstep == -1: | ||
self._await_go() | ||
pass | ||
|
||
n_atoms, n_residues, n_segments = size | ||
trajectory = n_frames > 0 | ||
u = mda.Universe.empty( | ||
# topology things | ||
n_atoms=n_atoms, | ||
n_residues=n_residues, | ||
n_segments=n_segments, | ||
atom_resindex=np.repeat( | ||
np.arange(n_residues), n_atoms // n_residues), | ||
residue_segindex=np.repeat( | ||
np.arange(n_segments), n_residues // n_segments), | ||
# trajectory things | ||
trajectory=trajectory, | ||
velocities=velocities, | ||
forces=forces, | ||
) | ||
if extras is None: | ||
extras = [] | ||
for ex in extras: | ||
u.add_TopologyAttr(ex) | ||
def _await_go(self): | ||
# must recieve go within 1 second of sending handshake | ||
# if not, terminate connection | ||
self.conn.settimeout(1) | ||
header = self._expect_header(expected_type=IMDType.IMD_GO) | ||
self.conn.settimeout(None) | ||
|
||
if trajectory: | ||
pos = np.arange(3 * n_atoms * n_frames).reshape(n_frames, n_atoms, 3) | ||
vel = pos + 100 if velocities else None | ||
fcs = pos + 10000 if forces else None | ||
reader = MemoryReader( | ||
pos, | ||
velocities=vel, | ||
forces=fcs, | ||
) | ||
u.trajectory = reader | ||
def _expect_header(self, expected_type=None, expected_value=None): | ||
""" | ||
Read a header packet from the socket. | ||
""" | ||
header = parse_header_bytes(self._recv_n_bytes(IMDHEADERSIZE)) | ||
if expected_type is not None and header.type != expected_type: | ||
raise ValueError( | ||
f"Expected packet type {expected_type}, got {header.type}" | ||
) | ||
elif expected_value is not None and header.length != expected_value: | ||
raise ValueError( | ||
f"Expected packet length {expected_value}, got {header.length}" | ||
) | ||
return header | ||
|
||
return u | ||
def _recv_n_bytes(self, num_bytes): | ||
"""Receive an arbitrary number of bytes from the socket.""" | ||
data = bytearray(num_bytes) | ||
view = memoryview(data) | ||
total_received = 0 | ||
while total_received < num_bytes: | ||
chunk = self.conn.recv(num_bytes - total_received) | ||
if not chunk: | ||
raise ConnectionError("Socket connection was closed") | ||
view[total_received : total_received + len(chunk)] = chunk | ||
total_received += len(chunk) | ||
return data |