-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(POC) feat: can aux #432
(POC) feat: can aux #432
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #432 +/- ##
==========================================
+ Coverage 98.11% 98.16% +0.04%
==========================================
Files 80 83 +3
Lines 6487 6637 +150
==========================================
+ Hits 6365 6515 +150
Misses 122 122 ☔ View full report in Codecov by Sentry. |
msg_to_send = self.parser.encode(message, signals) | ||
self.channel.cc_send(msg_to_send[0], remote_id=msg_to_send[1]) | ||
|
||
def run_command( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this API reimplemented? It's already part of the DTAuxiliaryInterface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i will remove it
return None | ||
|
||
@staticmethod | ||
def _is_payload_valid(payload: Optional[bytes]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look like it belongs here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remove it
super().__init__( | ||
is_proxy_capable=True, tx_task_on=True, rx_task_on=True, **kwargs | ||
) | ||
self.tx_task_on = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be set in the line above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
self.channel = com | ||
self.queue_tx = queue.Queue() | ||
self.queueing_event = threading.Event() | ||
self.collect_messages = functools.partial(_collect_messages, com_aux=self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 3 attributes don't seem to be used, you could remove the _collect_messages
class above too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i will remove it and test it
src/pykiso/can_message.py
Outdated
def __init__(self, name: str, signals: dict[str, Any], time_stamp: float) -> None: | ||
self.name = name | ||
self.signals = signals | ||
self.time_stamp = time_stamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.time_stamp = time_stamp | |
self.timestamp = timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i will change it
src/pykiso/can_message.py
Outdated
|
||
|
||
class CanMessage(): | ||
def __init__(self, name: str, signals: dict[str, Any], time_stamp: float) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def __init__(self, name: str, signals: dict[str, Any], time_stamp: float) -> None: | |
def __init__(self, name: str, signals: dict[str, Any], timestamp: float) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i will change it
src/pykiso/can_message.py
Outdated
|
||
|
||
class CanMessage(): | ||
def __init__(self, name: str, signals: dict[str, Any], time_stamp: float) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are supposed to be compatible with python 3.7+, so using builtin types as type hints isn't possible
Use the ones from typing instead:
from typing import Any, Dict
Dict[str, Any]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i will fix it
self.can_messages[message_name].name = message_name | ||
self.can_messages[message_name].signals = message_signals | ||
self.can_messages[message_name].time_stamp = message_timestamp | ||
except (Exception, KeyError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply catching the Exception
will already catch the KeyError
, if you need a different error message you might want to handle them separately:
except KeyError:
log.exception("Some message")
except Exception:
log.exception("My very generic message")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i will separate them
:return: list of last can messages or None if no messages for this component | ||
""" | ||
last_msg = self.can_messages.get(message_name, None) | ||
time.sleep(timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't precise at all, it anyone wants to know how long it took until the message arrived he can't
Instead of having the potential message as can_messages
values, you could also have a Queue with maxsize=1 which would allow you to use the non-polling APIs .get(timeout=...)
examples/test_can/test_can.py
Outdated
def setUp(self): | ||
pass | ||
|
||
def test_run(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally tests have an assert statement which you don't use here..
I would suggest to reduce the example to only one extra thread.
Just start tx thread and wait for incoming message in this test method.
Do an assert on the received message where you know the signal content
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i agree, i will fix it
examples/test_can/test_can.py
Outdated
recv_t.join() | ||
|
||
send_t = threading.Thread( | ||
target=self.send_message, args=[can_aux1, messages_to_send, 0.2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could simplify this. Just pass the can_aux1.
target=self.send_message, args=[can_aux1, messages_to_send, 0.2] | |
target=tx_thread, args=[can_aux1] |
You could also define the tx_thread function inside this mehtod.
Everything what you have passed can be defined in the new tx_thread function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, but method will be the same in two tests. Is it more appropriate to define it once in the class
examples/test_can/test_can.py
Outdated
send_t = threading.Thread( | ||
target=self.send_message, args=[can_aux1, messages_to_send, 0.2] | ||
) | ||
recv_t = threading.Thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't start an extra receive thread.
Just receive the message in this method an do an assert on the expected message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
examples/test_can/test_can.py
Outdated
target=self.get_last_msg_and_wait_for_new_msg, args=[can_aux2, 1] | ||
) | ||
|
||
send_t.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here is a good point to create another test.
def test_wait_for_expected_signals(self):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i will separate them
examples/test_can/test_can.py
Outdated
def setUp(self): | ||
pass | ||
|
||
def test_run(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every method which starts with "test"_ is considered to be run as a test from the unittest framework
I think you could split the test_run into 2 tests
Name the first
def test_run(self): | |
def test_send_and_receive_message(self): |
and the second one
def test_wait_for_expected_signals(self):
Every test method has at least one assert statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, i will separate them
examples/test_can/test_can.py
Outdated
def tearDown(self): | ||
pass | ||
|
||
def send_message(self, can_aux, messages_to_send, time_between_messages): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def send_message(self, can_aux, messages_to_send, time_between_messages): | |
def tx_thread(self, can_aux): |
define messages_to_send, time_between_messages inside this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i agree
examples/test_can/test_can.py
Outdated
can_aux.send_message(message_to_send.name, message_to_send.signals) | ||
time.sleep(time_between_messages) | ||
|
||
def get_last_msg_and_wait_for_new_msg(self, can_aux, timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a better name would have rx_thread but as suggested just put the content in your test method which is currently "test_run"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
examples/test_can/test_can.py
Outdated
"New msg with wait Signal A = " + str(recv_msg.signals["signal_a"]) | ||
) | ||
|
||
def wait_for_match_message_with_signals(self, can_aux, expected_signals, timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method not necessary. Just use the code here and put it into the suggested second test method .
def test_wait_for_expected_signals(self):
No description provided.