Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Isotp #996

Merged
merged 2 commits into from
Mar 25, 2024
Merged

Isotp #996

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion src/asammdf/blocks/bus_logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def extract_signal(
payload: NDArray[Any],
raw: bool = False,
ignore_value2text_conversion: bool = True,
is_ISOTP: bool = False,
) -> NDArray[Any]:
vals = payload

Expand Down Expand Up @@ -142,6 +143,8 @@ def extract_signal(

# prepend or append extra bytes columns
# to get a standard size number of bytes
if is_ISOTP: # Don't muck around with size of ISO-TP signals
return vals

if extra_bytes:
if big_endian:
Expand Down Expand Up @@ -271,6 +274,27 @@ class ExtractedSignal(TypedDict):
invalidation_bits: NDArray[Any]


def merge_cantp(payload, ts):
"""Merge sequences of ISO-TP coded CAN payloads, enabling > 8 byte frames"""
INITIAL = 0x10
CONSECUTIVE = 0x20
merged = []
t_out = []
merging = np.array([], "uint8")
for frame, t in zip(payload, ts):
if frame[0] & 0xF0 == INITIAL:
expected_size = 256 * (frame[0] & 0x0F) + frame[1]
merging = np.array(frame[2:8], "uint8")
if frame[0] & 0xF0 == CONSECUTIVE:
merging = np.hstack((merging, frame[1:]))
if len(merging) >= expected_size:
merging = merging[:expected_size]
merged.append(merging)
t_out.append(t) # Using t from final received part (as does Canoe, apparently)
frames = np.vstack(merged) if len(merged) > 0 else np.array([], "uint8")
return frames, np.array(t_out)


def extract_mux(
payload: NDArray[Any],
message: Frame,
Expand Down Expand Up @@ -334,7 +358,20 @@ def extract_mux(

extracted_signals = {}

if message.size > payload.shape[1] or message.size == 0:
# (Too?) simple check for ISO-TP CAN data - if it has flow control, we believe its ISO-TP
is_ISOTP = "CanTpFcFrameId" in message.attributes
if is_ISOTP:
# print(f" ISO-TP frame, for message {message_id}, merging CAN frames...")
payload, t = merge_cantp(payload, t)
# assert(len(payload) == len(t))
# if len(payload) > 0:
# print(f" message size post-merge: {payload.shape[1]}")
# else:
# print(f" no payload found to merge")

if payload.shape[0] == 0 or message.size > payload.shape[1] or message.size == 0:
return extracted_signals

return extracted_signals

pairs = {}
Expand Down Expand Up @@ -367,6 +404,7 @@ def extract_mux(
payload_,
ignore_value2text_conversion=ignore_value2text_conversion,
raw=True,
is_ISOTP=is_ISOTP,
)
if len(samples) == 0 and len(t_):
continue
Expand Down
26 changes: 26 additions & 0 deletions test/test_cantp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import unittest
import numpy as np
from asammdf.blocks import bus_logging_utils as blu


class TestCANTP(unittest.TestCase):
tempdir = None

payload = np.vstack(
[
np.frombuffer(b"\x10\x0B\x52\x49\x47\x20\x20\x39", dtype="uint8"), # Initisl part
np.frombuffer(b"\x30\xff\x00\x4c\x40\x00\xd5\x54", dtype="uint8"), # Flow control
np.frombuffer(b"\x21\x30\x30\x30\x38\x33\x00\x00", dtype="uint8"), # Final (second) part
np.frombuffer(b"\x10\x0B\x52\x49\x47\x20\x20\x39", dtype="uint8"), # Initial part of next frame...
]
)
ts = np.array([0.112, 0.113, 0.116, 0.201])

def test_merge_cantp(self):

merged, t = blu.merge_cantp(TestCANTP.payload, TestCANTP.ts)

assert merged.shape == (1, 11)
assert merged[0, -1] == 0x33
assert t.shape == (1,)
assert t[0] == 0.116
Loading