Skip to content

Commit

Permalink
THRUSTERS: create thruster ports from unified config
Browse files Browse the repository at this point in the history
* make thruster_comm_factory take layout param in format of new unified config
* adapt both the fake and real thruster_port constructors to new config format
  (ports are defined independently of the thruster definitions)
  • Loading branch information
DSsoto committed Apr 16, 2017
1 parent cfe9c89 commit cb4a69c
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 77 deletions.
12 changes: 7 additions & 5 deletions drivers/sub8_videoray_m5_thruster/sub8_thruster_comm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
from .thruster_fake import FakeThrusterPort


def thruster_comm_factory(port_info, fake=False):
'''Return the appropriate thruster communication class
def thruster_comm_factory(port_info, thruster_definitions, fake=False):
'''
Return the appropriate thruster communication class
Purpose:
- Use the same code to run both simulated thrusters and real thrusters
- Use the same code to run both simulated thrusters and real thrusters
'''
if fake:
return FakeThrusterPort(port_info)
return FakeThrusterPort(port_info, thruster_definitions)
else:
return ThrusterPort(port_info)
return ThrusterPort(port_info, thruster_definitions)

106 changes: 58 additions & 48 deletions drivers/sub8_videoray_m5_thruster/sub8_thruster_comm/thruster_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,26 @@ class ThrusterPort(object):
_timeout = 0.01 # How long to wait for a serial response
_max_motor_id = 100 # Max motor ID for which to verify the existence

def __init__(self, port_info):
def __init__(self, port_info, thruster_definitions):
'''Communicate on a single port with some thrusters
port_info should be a dictionary, drawn from a .yaml of the form...
- port: /dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A403IMC9-if00-port0
thrusters:
FLV: {
node_id: 10,
}
FLL: {
node_id: 11,
}
port_info an associates a serial port name with a list of thruster names.
YAML form: (part of list)
- port: /dev/serial/by-id/usb-MIL_Data_Merge_Board_Ports_1_to_4_DMBP14-if00-port0
thruster_names: [FLH, FLV]
thruster_definitions should be a dictionary mapping thruster names to thruster properties.
YAML form:
thrusters:
FLH: {
motor_id: 0
position: [0.2678, 0.2795, 0.0],
direction: [-0.866, 0.5, 0.0],
bounds: [-90.0, 90.0]
}
<other_thruster_definitions_continue_here>
Note:
- The thrusters automatically shut down if they do not recieve a
command for ~2 seconds
Expand All @@ -41,7 +49,8 @@ def __init__(self, port_info):
'''
self.port_name = port_info['port']
self.port = self.connect_port(self.port_name)
self.thruster_dict = {}
self.thruster_dict = {} # holds the motor_ids of all the thrusters that were supposed
# to be here and were actually found

# Determine which thrusters are really on this port
self.motor_ids_on_port = []
Expand All @@ -52,9 +61,9 @@ def __init__(self, port_info):

# Load thruster configurations and check if the requested thruster exists on this port
self.missing_thrusters = []
for thruster_name, thruster_info in port_info['thrusters'].items():
for thruster_name in port_info['thruster_names']:
try:
self.load_thruster_config(thruster_name, thruster_info)
self.load_thruster_config(thruster_name, thruster_definitions)
except IOError:
self.missing_thrusters.append(thruster_name)
continue
Expand All @@ -68,15 +77,18 @@ def connect_port(self, port_name):
raise(e)
return serial_port

def load_thruster_config(self, thruster_name, thruster_info):
def load_thruster_config(self, thruster_name, thruster_definitions):
'''Loads the motor_id from the config file if the thruster was found'''
# Get our humungous error string ready
errstr = "Could not find motor_id {} (Called {}) on port {}; existing ids are {}".format(
thruster_info['node_id'], thruster_name, self.port_name, self.motor_ids_on_port)

# Check if we can actually find the thruster on this port
if not int(thruster_info['node_id']) in self.motor_ids_on_port:
motor_id = int(thruster_info[thruster_name][motor_id])
if not motor_id in self.motor_ids_on_port:
rospy.logerr(errstr)
raise(IOError(errstr))
self.thruster_dict[thruster_name] = int(thruster_info['node_id'])
self.thruster_dict[thruster_name] = motor_id

def checksum_struct(self, _struct):
'''Take a struct, convert it to a bytearray, and append its crc32 checksum'''
Expand Down Expand Up @@ -124,7 +136,8 @@ def make_poll_payload(self, motor_id):
return payload

def to_register(self, motor_id, register, value=None):
'''Get a register (and optionally try to set it)
'''
Get a register (and optionally try to set it)
Note: This fails randomly
'''
assert register in Const.csr_address.keys(), "Unknown register, {}".format(register)
Expand Down Expand Up @@ -152,14 +165,11 @@ def to_register(self, motor_id, register, value=None):
)

packet = header + payload
# prev = self.port.read(100)
self.port.write(bytes(packet))
response_bytearray = self.port.read(Const.response_normal_length + return_size)
if len(response_bytearray) == 0:
return None

# rest = self.port.read(100)
# if len(response_bytearray) ==
response = struct.unpack('<HBBBB I B I'.format(size_char), response_bytearray)
response_contents = [
'sync',
Expand Down Expand Up @@ -188,7 +198,8 @@ def send_thrust_msg(self, motor_id, thrust):
self.port.write(bytes(packet))

def make_hex(self, msg):
'''Return a bytearray formatted as a string of hexadecimal characters
'''
Return a bytearray formatted as a string of hexadecimal characters
Useful for packet debugging
'''
return ":".join("{:02x}".format(c) for c in msg)
Expand All @@ -204,9 +215,9 @@ def check_for_thruster(self, motor_id):

def read_status(self):
response_bytearray = self.port.read(Const.thrust_response_length)
# We should always get $Const.thrust_response_length bytes, if we don't, we *are* failing to communicate
# We should always get $Const.thrust_response_length bytes, if we don't
# we *are* failing to communicate
if len(response_bytearray) != Const.thrust_response_length:
# rospy.logwarn("Cannot communicate with thruster")
return None

response = struct.unpack('<HBBBB I BffffB I', response_bytearray)
Expand Down Expand Up @@ -241,29 +252,28 @@ def command_thruster(self, thruster_name, normalized_thrust):


if __name__ == '__main__':
'''Module test code - this requires hardware, and as such is not a unit test'''
# port = '/dev/serial/by-id/usb-FTDI_USB-RS485_Cable_FTX1O9GJ-if00-port0'
port = '/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A403IMC9-if00-port0'
node_id = 10
thruster_name = 'FLV'
print('Test thruster comm over port {}, node_id {}, thruster name {}'.format(
port,
node_id,
'''
Module test code - this requires hardware, and as such is not a unit test
'''
import rospkg
import rosparam
import numpy.random as npr # haha
sub8_thruster_mapper = rospkg.RosPack().get_path('sub8_thruster_mapper')
thruster_layout = rosparam.load_file(sub8_thruster_mapper + '/config/thruster_layout.yaml')[0][0]
print thruster_layout

port_info = npr.choice(thruster_layout['thruster_ports'])
print "port_info", port_info
thruster_definitions = thruster_layout['thrusters']

thruster_name = npr.choice(port_info['thruster_names'])
motor_id = thruster_definitions[thruster_name]['motor_id']

print'Test thruster comm over port {}, node_id {}, thruster name {}'.format(
port_info['port'],
motor_id,
thruster_name
))

port_info = {
'port': port,
'thrusters': {
# 'BRV': {
# 'node_id': 16,
# },
thruster_name: {
'node_id': node_id,
}
}
}

tp = ThrusterPort(port_info)
# tp.send_thrust_msg(node_id, 0.06)
)

tp = ThrusterPort(port_info, thruster_definitions)
print tp.command_thruster(thruster_name, 0.04)
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@


class FakeThrusterPort(object):
def __init__(self, port_info):
def __init__(self, port_info, thruster_definitions):
'''Fake the behavior of a thruster'''
self.port_name = port_info['port']
self.thruster_dict = {}
self.status_dict = {}
self.missing_thrusters = []
for thruster_name, thruster_info in port_info['thrusters'].items():
self.load_thruster_config(thruster_name, thruster_info)
for thruster_name in port_info['thruster_names']:
self.load_thruster_config(thruster_name, thruster_definitions[thruster_name])

def load_thruster_config(self, thruster_name, thruster_info):
self.thruster_dict[thruster_name] = thruster_info['node_id']
self.thruster_dict[thruster_name] = thruster_info['motor_id']
self.status_dict[thruster_name] = {
'fault': 0,
'rpm': 0,
Expand Down Expand Up @@ -45,8 +45,10 @@ def read_status(self, thruster_name):
return response_dict

def command_thruster(self, thruster_name, normalized_thrust):
'''Fake thruster command
normalized_thrust should be between 0 and 1'''
'''
Fake thruster command
normalized_thrust should be between 0 and 1
'''
assert thruster_name in self.thruster_dict.keys(), "{} must be associated with this port".format(thruster_name)
rospy.loginfo('Commanding {}: {}'.format(thruster_name, normalized_thrust))
motor_id = self.thruster_dict[thruster_name]
Expand All @@ -55,23 +57,30 @@ def command_thruster(self, thruster_name, normalized_thrust):


if __name__ == '__main__':
'''Module test code'''
port = '/dev/serial/by-id/usb-FTDI_USB-RS485_Cable_FTX1O9GJ-if00-port0'
node_id = 17
thruster_name = 'BRL'
print 'Testing fake communication over port {}, node_id {}, thruster name {}'.format(port, node_id, thruster_name)
'''
Module test code
TODO: unit-test this if not already done
'''
import rospkg
import rosparam
import numpy.random as npr # haha
sub8_thruster_mapper = rospkg.RosPack().get_path('sub8_thruster_mapper')
thruster_layout = rosparam.load_file(sub8_thruster_mapper + '/config/thruster_layout.yaml')[0][0]
print thruster_layout

port_info = {
'port': port,
'thrusters': {
'BRV': {
'node_id': 16,
},
thruster_name: {
'node_id': node_id,
}
}
}
port_info = npr.choice(thruster_layout['thruster_ports'])
print "port_info", port_info
thruster_definitions = thruster_layout['thrusters']

thruster_name = npr.choice(port_info['thruster_names'])
motor_id = thruster_definitions[thruster_name]['motor_id']

print'Test fake thruster comm over port {}, node_id {}, thruster name {}'.format(
port_info['port'],
motor_id,
thruster_name
)

tp = FakeThrusterPort(port_info, thruster_definitions)
print tp.command_thruster(thruster_name, 0.04)

ftp = FakeThrusterPort(port_info)
ftp.command_thruster('BRV', 0.2)

0 comments on commit cb4a69c

Please sign in to comment.