Skip to content
This repository has been archived by the owner on Oct 9, 2024. It is now read-only.

Commit

Permalink
building out test class
Browse files Browse the repository at this point in the history
  • Loading branch information
ljwoods2 committed May 10, 2024
1 parent 00cb9aa commit 6c49a8e
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions imdreader/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ def __init__(

self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(("localhost", self.port))
# GROMACS waits for a connection for an unlimited time here
self.socket.listen(1)
self.conn, self.address = self.socket.accept()

self._send_handshake()
self._simulation_loop()

def _send_handshake(self):
Expand All @@ -46,15 +42,26 @@ def disconnect(self):

def _simulation_loop(self):
if self.imdwait and self.imdstep == -1:
self._await_go()
pass
self._handle_connect()

# First, read all incoming packets from the client
while self._peek_header():
command = self._expect_header()
if command.type == IMDType.IMD_KILL:
pass
if command.type == IMDType.IMD_DISCONNECT:
pass
if command.type == IMDType.IMD_MDCOMM:
pass
if command.type == IMDType.IMD_PAUSE:
pass
if command.type == IMDType.IMD_TRATE:
pass

# Then, send energy + position data

def _await_go(self):
# must recieve go within 1 second of sending handshake
# if not, terminate connection
self.conn.settimeout(1)
header = self._expect_header(expected_type=IMDType.IMD_GO)
self.conn.settimeout(None)
self._expect_header(expected_type=IMDType.IMD_GO)

def _expect_header(self, expected_type=None, expected_value=None):
"""
Expand All @@ -63,14 +70,25 @@ def _expect_header(self, expected_type=None, expected_value=None):
header = parse_header_bytes(self._recv_n_bytes(IMDHEADERSIZE))
if expected_type is not None and header.type != expected_type:
raise ValueError(
f"Expected packet type {expected_type}, got {header.type}"
f"DummyIMDServer: Expected packet type {expected_type}, got {header.type}"
)
elif expected_value is not None and header.length != expected_value:
raise ValueError(
f"Expected packet length {expected_value}, got {header.length}"
f"DummyIMDServer: Expected packet length {expected_value}, got {header.length}"
)
return header

def _peek_header(self):
"""
Peek at a header packet from the socket without consuming it
"""

data = self.socket.recv(IMDHEADERSIZE, socket.MSG_PEEK)
if data:
header = parse_header_bytes(data)
return header
return None

def _recv_n_bytes(self, num_bytes):
"""Receive an arbitrary number of bytes from the socket."""
data = bytearray(num_bytes)
Expand All @@ -83,3 +101,15 @@ def _recv_n_bytes(self, num_bytes):
view[total_received : total_received + len(chunk)] = chunk
total_received += len(chunk)
return data

def _handle_connect(self):
# GROMACS waits for a connection for an unlimited time here
self.socket.listen(1)
self.conn, self.address = self.socket.accept()

self._send_handshake()
# must recieve go within 1 second of sending handshake
# if not, terminate connection
self.conn.settimeout(1)
self._await_go()
self.conn.settimeout(None)

0 comments on commit 6c49a8e

Please sign in to comment.