Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

34 add setters validators #41

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
27 changes: 21 additions & 6 deletions EosLib/packet/data_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


class DataHeader:

data_header_struct_format_string = "!" \
"B" \
"B" \
Expand Down Expand Up @@ -50,17 +49,19 @@ def validate_data_header(self):

:return: True if valid
"""
if not isinstance(self.sender, int) or not 0 <= self.sender <= 255 or self.sender == \
definitions.Device.NO_DEVICE:

if not isinstance(self.sender, int) or self.sender == definitions.Device.NO_DEVICE or\
not self.sender in definitions.Device:
David-Rey marked this conversation as resolved.
Show resolved Hide resolved
raise DataHeaderFormatError("Invalid Sender")

if not isinstance(self.data_type, int) or not 0 <= self.data_type <= 255:
if not isinstance(self.data_type, int) or not self.data_type in definitions.Type:
David-Rey marked this conversation as resolved.
Show resolved Hide resolved
raise DataHeaderFormatError("Invalid Type")

if not isinstance(self.priority, int) or not 0 <= self.priority <= 255:
if not isinstance(self.priority, int) or not self.priority in definitions.Priority:
raise DataHeaderFormatError("Invalid Priority")

if not isinstance(self.destination, int) or not 0 <= self.destination <= 255:

if not isinstance(self.destination, int) or not self.destination in definitions.Device:
raise DataHeaderFormatError("Invalid Destination")

if not isinstance(self.generate_time, datetime):
Expand Down Expand Up @@ -111,3 +112,17 @@ def decode(header_bytes: bytes):
decoded_header = DataHeader(unpacked[1], unpacked[2], unpacked[3], unpacked[4],
datetime.fromtimestamp(unpacked[5]))
return decoded_header


def check_data_header(data_header: DataHeader) -> bool:
""" Takes a packet data header and checks to see if it is valid

:return: boolean True if valid
"""
if data_header is None:
raise PacketFormatError("All packets must have a data header")
else:
data_header.validate_data_header()

return True

18 changes: 14 additions & 4 deletions EosLib/packet/definitions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from enum import IntEnum, unique
from enum import IntEnum, unique, EnumMeta


class EnumMetaClass(EnumMeta):
def __contains__(cls, item):
try:
cls(item)
except ValueError:
return False
else:
return True


@unique
class Type(IntEnum):
class Type(IntEnum, metaclass=EnumMetaClass):
NO_TYPE = 0
TELEMETRY = 1
WARNING = 2
Expand All @@ -12,7 +22,7 @@ class Type(IntEnum):


@unique
class Priority(IntEnum):
class Priority(IntEnum, metaclass=EnumMetaClass):
NO_TRANSMIT = 0
URGENT = 11
TELEMETRY = 9
Expand All @@ -23,7 +33,7 @@ class Priority(IntEnum):


@unique
class Device(IntEnum):
class Device(IntEnum, metaclass=EnumMetaClass):
NO_DEVICE = 0
PRESSURE = 1
PARTICULATES = 2
Expand Down
50 changes: 50 additions & 0 deletions EosLib/packet/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, body: bytes, data_header: DataHeader, transmit_header: Transm
self.body = body # type: bytes
self.data_header = data_header # type: DataHeader
self.transmit_header = transmit_header # type: TransmitHeader
self.validate_packet() # checks if packet is valid
David-Rey marked this conversation as resolved.
Show resolved Hide resolved

def __eq__(self, other):
""" Compares two packets for value equality
Expand Down Expand Up @@ -129,6 +130,55 @@ def encode_to_string(self):
data_header=self.data_header.encode_to_string(),
body=self.body.decode())

def set_data_header(self, new_data_header: DataHeader) -> bool:
""" Setter that sets new data header

:return: boolean True set is successful
:raises: PacketFormatError if data header is none
"""
if new_data_header is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is very incorrect

if new_data_header.validate_data_header():
self.data_header = new_data_header
return True

raise PacketFormatError("data header can not be none")

def set_transmit_header(self, new_transmit_header: TransmitHeader) -> bool:
""" Setter that sets new transmit header

:return: boolean True set is successful
"""
if new_transmit_header is None:
self.transmit_header = new_transmit_header
return True

if new_transmit_header.validate_transmit_header():
self.transmit_header = new_transmit_header

return True

def set_body(self, new_body: bytes) -> bool:
""" Setter that sets new body

:return: boolean True set is successful
"""
if new_body is not None and len(new_body) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the body validator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to make a function or is there already a function for this? If so I can't find it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth making one, there's good logic in validate_packet.

Also this function's logic is wrong (only sets if length 0) and it doesn't return something even though the docstring specifies that it should

self.body = new_body
return True

return False

@staticmethod
def check_body(body: bytes) -> bool:
""" Takes a packet body and checks to see if it is valid

:return: boolean True if valid
"""
if body is None or len(body) == 0 or not isinstance(body, bytes):
raise PacketFormatError("All packets must have a body")

return True

@staticmethod
def decode(packet_bytes: bytes):
"""Takes a bytes object and decodes it into a Packet object.
Expand Down
12 changes: 12 additions & 0 deletions EosLib/packet/transmit_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,15 @@ def decode(header_bytes: bytes):
unpacked = struct.unpack(TransmitHeader.transmit_header_struct_format_string, header_bytes)
decoded_header = TransmitHeader(unpacked[1], datetime.fromtimestamp(unpacked[2]))
return decoded_header


def check_transmit_header(transmit_header: TransmitHeader) -> bool:
""" Takes a packet transmit header and checks to see if it is valid

:return: boolean True if valid
"""
if transmit_header is not None:
transmit_header.validate_transmit_header()

return True

David-Rey marked this conversation as resolved.
Show resolved Hide resolved
25 changes: 25 additions & 0 deletions tests/packet/test_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,28 @@ def test_packet_print_no_body():
"No body"

assert expected_string == test_packet.__str__()


def test_set_data_header():
test_packet = get_valid_packet()
data_header = DataHeader(definitions.Device.GPS,
definitions.Type.TELEMETRY,
definitions.Priority.TELEMETRY,
definitions.Device.GPS,
datetime.now())

test_packet.set_data_header(data_header)


def test_set_transmit_header():
test_packet = get_valid_packet()
transmit_header = TransmitHeader(0, datetime.now())

test_packet.set_transmit_header(transmit_header)

def test_set_body():
test_packet = get_valid_packet()
body = bytes("temp", 'utf-8')

test_packet.set_body(body)