Skip to content

Commit

Permalink
Finished BMS module, and added BMS node
Browse files Browse the repository at this point in the history
  • Loading branch information
PizzaAllTheWay committed Oct 18, 2023
1 parent 37594ec commit 3884342
Show file tree
Hide file tree
Showing 84 changed files with 3,646 additions and 65 deletions.
186 changes: 140 additions & 46 deletions sensors/bms/bms/freya_bms.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import subprocess

# TODO: Fix so that it automatically detects which USB port to use
# TODO: Check that data is received on initialization (find out if batteries are
# connected)

class BMS:
""" Class containing Freya's BMS system
""" Class containing Freya's BMS system.
Note:
-----------
If no USB directory is passed in the constructor, the program queries all active
ports to find out where battery pack is connected. Only pass in a USB directory
if you are completely sure that it will be correct.
Attributes:
-----------
Expand Down Expand Up @@ -38,19 +40,37 @@ class BMS:
changes the usb port for the BMS
"""

def __init__(self, usb_port: str) -> None:
def __init__(self, usb_port: str = None) -> None:
"""
Parameters:
usb_port (str): USB port to connect to, either ttyUSB0 or ttyUSB1
usb_port (str): USB port to connect to, E.G. 'ttyUSB0'. If none is
supplied, the program automatically tries to find the correct USB
port
Returns:
None
Note: Private members are denoted by _variable_name
"""
self.usb_port = usb_port
"""
if usb_port:
self._usb_port = usb_port
self._command = ["jbdtool", "-t", f"serial:/dev/{self._usb_port}"]
else:
print("Querying for USB devices...")
devices = subprocess.check_output(["ls", "/dev"], text=True).split("\n")
usb_devices = [device for device in devices if device[:6] == "ttyUSB"]

self.command = ["jbdtool", "-t", f"serial:/dev/{usb_port}"]
for i, device in enumerate(usb_devices):
self._usb_port = device
self._command = ["jbdtool", "-t", f"serial:/dev/{self._usb_port}"]
resp = self.get_bms_data()
if resp != "":
print(f"Found device {self._usb_port}")
break

if i == len(usb_devices) - 1:
raise Exception("No USB device was found. Ensure that battery pack is connected to Raspberry Pi")

self._voltage = 0
self._current = 0
self._design_capacity = 0
Expand Down Expand Up @@ -105,55 +125,54 @@ def get_bms_data(self) -> str | None:
"""

try:
response = subprocess.run(self.command,
response = subprocess.run(self._command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True)

return response
return response.stdout.decode()
except subprocess.CalledProcessError as e:
print("An error occured when getting BMS data")
print(f"Error: {e.stderr.decode()}")
print("Please check that USBs are connected to Raspberry PI")
print("Please check that USBs are connected to Raspberry Pi, and that the _usb_port variable is set correctly")

return None

def parse_bms_data(self, bms_data: subprocess.CompletedProcess) -> None:
def parse_bms_data(self, bms_data: str) -> None:
"""
Parses BMS data and updates class members accordingly
Parameters:
bms_data (subprocess.CompletedProcess): object containing result
of the jbdtool command
bms_data (str): string containing result of the jbdtool command
Returns: None
"""

data = bms_data.stdout.decode().split("\n")

for element in data:

element = element.split()
print(element)

self._voltage = float(data[0])
self._current = float(data[1])
self._design_capacity = float(data[2])
self._remaining_capacity = float(data[3])
self._percent_capacity = float(data[4]) / 100
self._cycle_count = int(data[5])
self._probes = int(data[6])
self._strings = int(data[7])
self._temps = int(data[8].split(","))
self._cells = int(data[9].split(","))
self._balance = data[10]
self._cell_total = float(data[11])
self._cell_min = float(data[12])
self._cell_max = float(data[13])
self._cell_avg = float(data[14])
self._device_name = data[15]
self._manufacture_date = data[16]
self._version = data[17]
self._FET = data[18]
if bms_data == "":
print("Warning: No data was found.")
return

data = [entry.split() for entry in bms_data.split("\n")][:-1] # [:-1] is only there because there is a empty list at the end for some reason

self._voltage = float(data[0][1])
self._current = float(data[1][1])
self._design_capacity = float(data[2][1])
self._remaining_capacity = float(data[3][1])
self._percent_capacity = float(data[4][1]) / 100
self._cycle_count = int(data[5][1])
self._probes = int(data[6][1])
self._strings = int(data[7][1])
self._temps = [float(temp) for temp in data[8][1].split(",")]
self._cells = [float(cell) for cell in data[9][1].split(",")]
self._balance = data[10][1]
self._cell_total = float(data[11][1])
self._cell_min = float(data[12][1])
self._cell_max = float(data[13][1])
self._cell_avg = float(data[14][1])
self._device_name = data[15][1]
self._manufacture_date = data[16][1]
self._version = data[17][1]
self._FET = data[18][1]


def change_usb_port(self, usb_port: str) -> None:
Expand All @@ -167,8 +186,83 @@ def change_usb_port(self, usb_port: str) -> None:
None
"""

self.usb_port = usb_port
self.command = ["jbdtool", "-t", f"serial:/dev/{usb_port}"]
self._usb_port = usb_port
self._command = ["jbdtool", "-t", f"serial:/dev/{usb_port}"]

#region getters
@property
def voltage(self):
return self._voltage

@property
def current(self):
return self._current

@property
def design_capacity(self):
return self._design_capacity

@property
def remaining_capacity(self):
return self._remaining_capacity

@property
def percent_capacity(self):
return self._percent_capacity

@property
def cycle_count(self):
return self._cycle_count

@property
def probes(self):
return self._probes

@property
def strings(self):
return self._strings

test = BMS
test.change_usb_port
@property
def temps(self):
return self._temps

@property
def cells(self):
return self._cells

@property
def balance(self):
return self._balance

@property
def cell_total(self):
return self._cell_total

@property
def cell_min(self):
return self._cell_min

@property
def cell_max(self):
return self._cell_max

@property
def cell_avg(self):
return self._cell_avg

@property
def device_name(self):
return self._device_name

@property
def manufacture_date(self):
return self._manufacture_date

@property
def version(self):
return self._version

@property
def FET(self):
return self._FET
#endregion
42 changes: 42 additions & 0 deletions sensors/bms/bms/freya_bms_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import BatteryState
from bms.freya_bms import BMS

class FreyaBMSNode(Node):
def __init__(self, usb_port=None) -> None:
super().__init__('freya_bms')
self._publisher = self.create_publisher(BatteryState, '/internal/status/bms', 10)
self._timer = self.create_timer(2, self.publish_bms_data)

if usb_port:
self._bms_system = BMS(usb_port=usb_port)
else:
self._bms_system = BMS()

def publish_bms_data(self):
# print("mjau")
battery_msg = BatteryState()

self._bms_system.parse_bms_data(self._bms_system.get_bms_data())

battery_msg.voltage = self._bms_system.voltage
battery_msg.current = self._bms_system.current
battery_msg.cell_temperature = self._bms_system.temps
battery_msg.percentage = self._bms_system.percent_capacity

self._publisher.publish(battery_msg)
# self.get_logger().info(f"Publishing voltage: {battery_msg.voltage}, current: {battery_msg.current}, cell_temperature: {battery_msg.cell_temperature}, percentage: {battery_msg.percentage}")

def main(args=None):
rclpy.init(args=args)

freya_bms_node = FreyaBMSNode()
rclpy.spin(freya_bms_node)

freya_bms_node.destroy_node()
rclpy.shutdown()

if __name__ == "__main__":
main()

38 changes: 19 additions & 19 deletions sensors/bms/bms/testfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@
import re
import subprocess

device_re = re.compile(b"Bus\s+(?P<bus>\d+)\s+Device\s+(?P<device>\d+).+ID\s(?P<id>\w+:\w+)\s(?P<tag>.+)$", re.I)
df = subprocess.check_output("lsusb")
devices = []
for i in df.split(b'\n'):
if i:
info = device_re.match(i)
if info:
dinfo = info.groupdict()
dinfo['device'] = '/dev/bus/usb/%s/%s' % (dinfo.pop('bus'), dinfo.pop('device'))
devices.append(dinfo)

for device in devices:
print(device)

# print(devices)

test = freya_bms.BMS("ttyUSB0")
test.parse_bms_data(test.get_bms_data())
# device_re = re.compile(b"Bus\s+(?P<bus>\d+)\s+Device\s+(?P<device>\d+).+ID\s(?P<id>\w+:\w+)\s(?P<tag>.+)$", re.I)
# df = subprocess.check_output("lsusb")
# devices = []
# for i in df.split(b'\n'):
# if i:
# info = device_re.match(i)
# if info:
# dinfo = info.groupdict()
# dinfo['device'] = '/dev/bus/usb/%s/%s' % (dinfo.pop('bus'), dinfo.pop('device'))
# devices.append(dinfo)

# for device in devices:
# print(device)

# print(devices)

print(test._cells)
test = freya_bms.BMS(usb_port="ksvhbdfhvbjdbhvjbh")
print(test.get_bms_data())

string = "hei på"
print(string.split())
# string = "hei på"
# print(string.split())



Expand Down
1 change: 1 addition & 0 deletions sensors/bms/install/.colcon_install_layout
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
isolated
Empty file.
Loading

0 comments on commit 3884342

Please sign in to comment.