From 8d28fddb69124b82fec5697c3317d1654860db09 Mon Sep 17 00:00:00 2001 From: danielhrisca Date: Mon, 18 Mar 2024 10:06:46 +0200 Subject: [PATCH] load FlexRay and LIN trace windows even if the file does not contain bus logging data --- src/asammdf/gui/widgets/mdi_area.py | 590 +++++++++++++++------------- 1 file changed, 310 insertions(+), 280 deletions(-) diff --git a/src/asammdf/gui/widgets/mdi_area.py b/src/asammdf/gui/widgets/mdi_area.py index d50cc882c..b6c5c10e8 100644 --- a/src/asammdf/gui/widgets/mdi_area.py +++ b/src/asammdf/gui/widgets/mdi_area.py @@ -1423,169 +1423,183 @@ def set_title(mdi): return trace def _add_flexray_bus_trace_window(self, ranges=None): - if self.mdf.version < "4.00": - return - items = [] - groups_count = len(self.mdf.groups) + if self.mdf.version >= "4.00": - for index in range(groups_count): - group = self.mdf.groups[index] - if group.channel_group.flags & v4c.FLAG_CG_BUS_EVENT: - source = group.channel_group.acq_source + groups_count = len(self.mdf.groups) - names = [ch.name for ch in group.channels] + for index in range(groups_count): + group = self.mdf.groups[index] + if group.channel_group.flags & v4c.FLAG_CG_BUS_EVENT: + source = group.channel_group.acq_source - if source and source.bus_type == v4c.BUS_TYPE_FLEXRAY: - if "FLX_Frame" in names: - data = self.mdf.get("FLX_Frame", index, raw=True) - items.append((data, names)) + names = [ch.name for ch in group.channels] - elif "FLX_NullFrame" in names: - data = self.mdf.get("FLX_NullFrame", index, raw=True) - items.append((data, names)) + if source and source.bus_type == v4c.BUS_TYPE_FLEXRAY: + if "FLX_Frame" in names: + data = self.mdf.get("FLX_Frame", index, raw=True) + items.append((data, names)) - elif "FLX_StartCycle" in names: - data = self.mdf.get("FLX_StartCycle", index, raw=True) - items.append((data, names)) + elif "FLX_NullFrame" in names: + data = self.mdf.get("FLX_NullFrame", index, raw=True) + items.append((data, names)) - elif "FLX_Status" in names: - data = self.mdf.get("FLX_Status", index, raw=True) - items.append((data, names)) + elif "FLX_StartCycle" in names: + data = self.mdf.get("FLX_StartCycle", index, raw=True) + items.append((data, names)) - if not len(items): - return + elif "FLX_Status" in names: + data = self.mdf.get("FLX_Status", index, raw=True) + items.append((data, names)) - df_index = np.sort(np.concatenate([item.timestamps for (item, names) in items])) - count = len(df_index) - - columns = { - "timestamps": df_index, - "Bus": np.full(count, "Unknown", dtype="O"), - "ID": np.full(count, 0xFFFF, dtype="u2"), - "Direction": np.full(count, "", dtype="O"), - "Cycle": np.full(count, 0xFF, dtype="u1"), - "Name": np.full(count, "", dtype="O"), - "Event Type": np.full(count, "FlexRay Frame", dtype="O"), - "Details": np.full(count, "", dtype="O"), - "Data Length": np.zeros(count, dtype="u1"), - "Data Bytes": np.full(count, "", dtype="O"), - "Header CRC": np.full(count, 0xFFFF, dtype="u2"), - } + df_index = np.sort(np.concatenate([item.timestamps for (item, names) in items])) + count = len(df_index) - count = len(items) + columns = { + "timestamps": df_index, + "Bus": np.full(count, "Unknown", dtype="O"), + "ID": np.full(count, 0xFFFF, dtype="u2"), + "Direction": np.full(count, "", dtype="O"), + "Cycle": np.full(count, 0xFF, dtype="u1"), + "Name": np.full(count, "", dtype="O"), + "Event Type": np.full(count, "FlexRay Frame", dtype="O"), + "Details": np.full(count, "", dtype="O"), + "Data Length": np.zeros(count, dtype="u1"), + "Data Bytes": np.full(count, "", dtype="O"), + "Header CRC": np.full(count, 0xFFFF, dtype="u2"), + } - # TO DO: add flexray error types - # for string in v4c.CAN_ERROR_TYPES.values(): - # sys.intern(string) + count = len(items) - for _ in range(count): - data, names = items.pop() + # TO DO: add flexray error types + # for string in v4c.CAN_ERROR_TYPES.values(): + # sys.intern(string) - frame_map = {} + for _ in range(count): + data, names = items.pop() - # TO DO : add flexray fibex support - # if item.attachment and item.attachment[0]: - # dbc = load_can_database(item.attachment[1], item.attachment[0]) - # if dbc: - # frame_map = { - # frame.arbitration_id.id: frame.name for frame in dbc - # } - # - # for name in frame_map.values(): - # sys.intern(name) + frame_map = {} - if data.name == "FLX_Frame": - index = np.searchsorted(df_index, data.timestamps) + # TO DO : add flexray fibex support + # if item.attachment and item.attachment[0]: + # dbc = load_can_database(item.attachment[1], item.attachment[0]) + # if dbc: + # frame_map = { + # frame.arbitration_id.id: frame.name for frame in dbc + # } + # + # for name in frame_map.values(): + # sys.intern(name) - vals = data["FLX_Frame.FlxChannel"].astype("u1") + if data.name == "FLX_Frame": + index = np.searchsorted(df_index, data.timestamps) - vals = [f"FlexRay {chn}" for chn in vals.tolist()] - columns["Bus"][index] = vals + vals = data["FLX_Frame.FlxChannel"].astype("u1") - vals = data["FLX_Frame.ID"].astype("u2") - columns["ID"][index] = vals - if frame_map: - columns["Name"][index] = [frame_map.get(_id, "") for _id in vals.tolist()] + vals = [f"FlexRay {chn}" for chn in vals.tolist()] + columns["Bus"][index] = vals - vals = data["FLX_Frame.Cycle"].astype("u1") - columns["Cycle"][index] = vals + vals = data["FLX_Frame.ID"].astype("u2") + columns["ID"][index] = vals + if frame_map: + columns["Name"][index] = [frame_map.get(_id, "") for _id in vals.tolist()] - data_length = data["FLX_Frame.DataLength"].astype("u1") - columns["Data Length"][index] = data_length + vals = data["FLX_Frame.Cycle"].astype("u1") + columns["Cycle"][index] = vals - vals = csv_bytearray2hex( - pd.Series(list(data["FLX_Frame.DataBytes"])), - data_length, - ) - columns["Data Bytes"][index] = vals + data_length = data["FLX_Frame.DataLength"].astype("u1") + columns["Data Length"][index] = data_length - vals = data["FLX_Frame.HeaderCRC"].astype("u2") - columns["Header CRC"][index] = vals + vals = csv_bytearray2hex( + pd.Series(list(data["FLX_Frame.DataBytes"])), + data_length, + ) + columns["Data Bytes"][index] = vals - if "FLX_Frame.Dir" in names: - if data["FLX_Frame.Dir"].dtype.kind == "S": - columns["Direction"][index] = [v.decode("utf-8") for v in data["FLX_Frame.Dir"].tolist()] - else: - columns["Direction"][index] = [ - "TX" if dir else "RX" for dir in data["FLX_Frame.Dir"].astype("u1").tolist() - ] + vals = data["FLX_Frame.HeaderCRC"].astype("u2") + columns["Header CRC"][index] = vals + + if "FLX_Frame.Dir" in names: + if data["FLX_Frame.Dir"].dtype.kind == "S": + columns["Direction"][index] = [v.decode("utf-8") for v in data["FLX_Frame.Dir"].tolist()] + else: + columns["Direction"][index] = [ + "TX" if dir else "RX" for dir in data["FLX_Frame.Dir"].astype("u1").tolist() + ] - vals = None - data_length = None + vals = None + data_length = None - elif data.name == "FLX_NullFrame": - index = np.searchsorted(df_index, data.timestamps) + elif data.name == "FLX_NullFrame": + index = np.searchsorted(df_index, data.timestamps) - vals = data["FLX_NullFrame.FlxChannel"].astype("u1") - vals = [f"FlexRay {chn}" for chn in vals.tolist()] - columns["Bus"][index] = vals + vals = data["FLX_NullFrame.FlxChannel"].astype("u1") + vals = [f"FlexRay {chn}" for chn in vals.tolist()] + columns["Bus"][index] = vals - vals = data["FLX_NullFrame.ID"].astype("u2") - columns["ID"][index] = vals - if frame_map: - columns["Name"][index] = [frame_map.get(_id, "") for _id in vals.tolist()] + vals = data["FLX_NullFrame.ID"].astype("u2") + columns["ID"][index] = vals + if frame_map: + columns["Name"][index] = [frame_map.get(_id, "") for _id in vals.tolist()] - vals = data["FLX_NullFrame.Cycle"].astype("u1") - columns["Cycle"][index] = vals + vals = data["FLX_NullFrame.Cycle"].astype("u1") + columns["Cycle"][index] = vals - columns["Event Type"][index] = "FlexRay NullFrame" + columns["Event Type"][index] = "FlexRay NullFrame" - vals = data["FLX_NullFrame.HeaderCRC"].astype("u2") - columns["Header CRC"][index] = vals + vals = data["FLX_NullFrame.HeaderCRC"].astype("u2") + columns["Header CRC"][index] = vals - if "FLX_NullFrame.Dir" in names: - if data["FLX_NullFrame.Dir"].dtype.kind == "S": - columns["Direction"][index] = [v.decode("utf-8") for v in data["FLX_NullFrame.Dir"].tolist()] - else: - columns["Direction"][index] = [ - "TX" if dir else "RX" for dir in data["FLX_NullFrame.Dir"].astype("u1").tolist() - ] + if "FLX_NullFrame.Dir" in names: + if data["FLX_NullFrame.Dir"].dtype.kind == "S": + columns["Direction"][index] = [v.decode("utf-8") for v in data["FLX_NullFrame.Dir"].tolist()] + else: + columns["Direction"][index] = [ + "TX" if dir else "RX" for dir in data["FLX_NullFrame.Dir"].astype("u1").tolist() + ] - vals = None - data_length = None + vals = None + data_length = None - elif data.name == "FLX_StartCycle": - index = np.searchsorted(df_index, data.timestamps) + elif data.name == "FLX_StartCycle": + index = np.searchsorted(df_index, data.timestamps) - vals = data["FLX_StartCycle.Cycle"].astype("u1") - columns["Cycle"][index] = vals + vals = data["FLX_StartCycle.Cycle"].astype("u1") + columns["Cycle"][index] = vals - columns["Event Type"][index] = "FlexRay StartCycle" + columns["Event Type"][index] = "FlexRay StartCycle" - vals = None - data_length = None + vals = None + data_length = None - elif data.name == "FLX_Status": - index = np.searchsorted(df_index, data.timestamps) + elif data.name == "FLX_Status": + index = np.searchsorted(df_index, data.timestamps) - vals = data["FLX_Status.StatusType"].astype("u1") - columns["Details"][index] = vals.astype("U").astype("O") + vals = data["FLX_Status.StatusType"].astype("u1") + columns["Details"][index] = vals.astype("U").astype("O") - columns["Event Type"][index] = "FlexRay Status" + columns["Event Type"][index] = "FlexRay Status" - vals = None - data_length = None + vals = None + data_length = None + + else: + df_index = [] + count = 0 + + columns = { + "timestamps": df_index, + "Bus": np.full(count, "Unknown", dtype="O"), + "ID": np.full(count, 0xFFFF, dtype="u2"), + "Direction": np.full(count, "", dtype="O"), + "Cycle": np.full(count, 0xFF, dtype="u1"), + "Name": np.full(count, "", dtype="O"), + "Event Type": np.full(count, "FlexRay Frame", dtype="O"), + "Details": np.full(count, "", dtype="O"), + "Data Length": np.zeros(count, dtype="u1"), + "Data Bytes": np.full(count, "", dtype="O"), + "Header CRC": np.full(count, 0xFFFF, dtype="u2"), + } signals = pd.DataFrame(columns) @@ -1693,211 +1707,227 @@ def set_title(mdi): self.windows_modified.emit() def _add_lin_bus_trace_window(self, ranges=None): - if self.mdf.version < "4.00": - return - dfs = [] - groups_count = len(self.mdf.groups) + if self.mdf.version >= "4.00": - for index in range(groups_count): - group = self.mdf.groups[index] - if group.channel_group.flags & v4c.FLAG_CG_BUS_EVENT: - source = group.channel_group.acq_source + groups_count = len(self.mdf.groups) - names = [ch.name for ch in group.channels] + for index in range(groups_count): + group = self.mdf.groups[index] + if group.channel_group.flags & v4c.FLAG_CG_BUS_EVENT: + source = group.channel_group.acq_source - if source and source.bus_type == v4c.BUS_TYPE_LIN: - if "LIN_Frame" in names: - data = self.mdf.get("LIN_Frame", index, raw=True) + names = [ch.name for ch in group.channels] - elif "LIN_SyncError" in names: - data = self.mdf.get("LIN_SyncError", index, raw=True) + if source and source.bus_type == v4c.BUS_TYPE_LIN: + if "LIN_Frame" in names: + data = self.mdf.get("LIN_Frame", index, raw=True) - elif "LIN_TransmissionError" in names: - data = self.mdf.get("LIN_TransmissionError", index, raw=True) + elif "LIN_SyncError" in names: + data = self.mdf.get("LIN_SyncError", index, raw=True) - elif "LIN_ChecksumError" in names: - data = self.mdf.get("LIN_ChecksumError", index, raw=True) + elif "LIN_TransmissionError" in names: + data = self.mdf.get("LIN_TransmissionError", index, raw=True) - elif "LIN_ReceiveError" in names: - data = self.mdf.get("LIN_ReceiveError", index, raw=True) + elif "LIN_ChecksumError" in names: + data = self.mdf.get("LIN_ChecksumError", index, raw=True) - df_index = data.timestamps - count = len(df_index) + elif "LIN_ReceiveError" in names: + data = self.mdf.get("LIN_ReceiveError", index, raw=True) - columns = { - "timestamps": df_index, - "Bus": np.full(count, "Unknown", dtype="O"), - "ID": np.full(count, 0xFFFFFFFF, dtype="u4"), - "Direction": np.full(count, "", dtype="O"), - "Name": np.full(count, "", dtype="O"), - "Event Type": np.full(count, "LIN Frame", dtype="O"), - "Details": np.full(count, "", dtype="O"), - "Received Byte Count": np.zeros(count, dtype="u1"), - "Data Length": np.zeros(count, dtype="u1"), - "Data Bytes": np.full(count, "", dtype="O"), - } + df_index = data.timestamps + count = len(df_index) - frame_map = None - if data.attachment and data.attachment[0]: - dbc = load_can_database(data.attachment[1], data.attachment[0]) - if dbc: - frame_map = {frame.arbitration_id.id: frame.name for frame in dbc} + columns = { + "timestamps": df_index, + "Bus": np.full(count, "Unknown", dtype="O"), + "ID": np.full(count, 0xFFFFFFFF, dtype="u4"), + "Direction": np.full(count, "", dtype="O"), + "Name": np.full(count, "", dtype="O"), + "Event Type": np.full(count, "LIN Frame", dtype="O"), + "Details": np.full(count, "", dtype="O"), + "Received Byte Count": np.zeros(count, dtype="u1"), + "Data Length": np.zeros(count, dtype="u1"), + "Data Bytes": np.full(count, "", dtype="O"), + } - for name in frame_map.values(): - sys.intern(name) + frame_map = None + if data.attachment and data.attachment[0]: + dbc = load_can_database(data.attachment[1], data.attachment[0]) + if dbc: + frame_map = {frame.arbitration_id.id: frame.name for frame in dbc} - if data.name == "LIN_Frame": - vals = data["LIN_Frame.BusChannel"].astype("u1") - vals = [f"LIN {chn}" for chn in vals.tolist()] - columns["Bus"] = vals + for name in frame_map.values(): + sys.intern(name) - vals = data["LIN_Frame.ID"].astype("u1") & 0x3F - columns["ID"] = vals - if frame_map: - columns["Name"] = [frame_map.get(_id, "") for _id in vals.tolist()] + if data.name == "LIN_Frame": + vals = data["LIN_Frame.BusChannel"].astype("u1") + vals = [f"LIN {chn}" for chn in vals.tolist()] + columns["Bus"] = vals + + vals = data["LIN_Frame.ID"].astype("u1") & 0x3F + columns["ID"] = vals + if frame_map: + columns["Name"] = [frame_map.get(_id, "") for _id in vals.tolist()] - columns["Received Byte Count"] = data["LIN_Frame.ReceivedDataByteCount"].astype("u1") - data_length = data["LIN_Frame.DataLength"].astype("u1").tolist() - columns["Data Length"] = data_length + columns["Received Byte Count"] = data["LIN_Frame.ReceivedDataByteCount"].astype("u1") + data_length = data["LIN_Frame.DataLength"].astype("u1").tolist() + columns["Data Length"] = data_length - vals = csv_bytearray2hex( - pd.Series(list(data["LIN_Frame.DataBytes"])), - data_length, - ) - columns["Data Bytes"] = vals + vals = csv_bytearray2hex( + pd.Series(list(data["LIN_Frame.DataBytes"])), + data_length, + ) + columns["Data Bytes"] = vals - if "LIN_Frame.Dir" in names: - if data["LIN_Frame.Dir"].dtype.kind == "S": - columns["Direction"] = [v.decode("utf-8") for v in data["LIN_Frame.Dir"].tolist()] - else: - columns["Direction"] = [ - "TX" if dir else "RX" for dir in data["LIN_Frame.Dir"].astype("u1").tolist() - ] + if "LIN_Frame.Dir" in names: + if data["LIN_Frame.Dir"].dtype.kind == "S": + columns["Direction"] = [v.decode("utf-8") for v in data["LIN_Frame.Dir"].tolist()] + else: + columns["Direction"] = [ + "TX" if dir else "RX" for dir in data["LIN_Frame.Dir"].astype("u1").tolist() + ] - vals = None - data_length = None + vals = None + data_length = None - elif data.name == "LIN_SyncError": - names = set(data.samples.dtype.names) + elif data.name == "LIN_SyncError": + names = set(data.samples.dtype.names) - if "LIN_SyncError.BusChannel" in names: - vals = data["LIN_SyncError.BusChannel"].astype("u1") - vals = [f"LIN {chn}" for chn in vals.tolist()] - columns["Bus"] = vals + if "LIN_SyncError.BusChannel" in names: + vals = data["LIN_SyncError.BusChannel"].astype("u1") + vals = [f"LIN {chn}" for chn in vals.tolist()] + columns["Bus"] = vals - if "LIN_SyncError.BaudRate" in names: - vals = data["LIN_SyncError.BaudRate"] - unique = np.unique(vals).tolist() - for val in unique: - sys.intern(f"Baudrate {val}") - vals = [f"Baudrate {val}" for val in vals.tolist()] - columns["Details"] = vals + if "LIN_SyncError.BaudRate" in names: + vals = data["LIN_SyncError.BaudRate"] + unique = np.unique(vals).tolist() + for val in unique: + sys.intern(f"Baudrate {val}") + vals = [f"Baudrate {val}" for val in vals.tolist()] + columns["Details"] = vals - columns["Event Type"] = "Sync Error Frame" + columns["Event Type"] = "Sync Error Frame" - vals = None - data_length = None + vals = None + data_length = None - elif data.name == "LIN_TransmissionError": - names = set(data.samples.dtype.names) + elif data.name == "LIN_TransmissionError": + names = set(data.samples.dtype.names) - if "LIN_TransmissionError.BusChannel" in names: - vals = data["LIN_TransmissionError.BusChannel"].astype("u1") - vals = [f"LIN {chn}" for chn in vals.tolist()] - columns["Bus"] = vals + if "LIN_TransmissionError.BusChannel" in names: + vals = data["LIN_TransmissionError.BusChannel"].astype("u1") + vals = [f"LIN {chn}" for chn in vals.tolist()] + columns["Bus"] = vals - if "LIN_TransmissionError.BaudRate" in names: - vals = data["LIN_TransmissionError.BaudRate"] - unique = np.unique(vals).tolist() - for val in unique: - sys.intern(f"Baudrate {val}") - vals = [f"Baudrate {val}" for val in vals.tolist()] - columns["Details"] = vals + if "LIN_TransmissionError.BaudRate" in names: + vals = data["LIN_TransmissionError.BaudRate"] + unique = np.unique(vals).tolist() + for val in unique: + sys.intern(f"Baudrate {val}") + vals = [f"Baudrate {val}" for val in vals.tolist()] + columns["Details"] = vals - vals = data["LIN_TransmissionError.ID"].astype("u1") & 0x3F - columns["ID"] = vals - if frame_map: - columns["Name"] = [frame_map.get(_id, "") for _id in vals.tolist()] + vals = data["LIN_TransmissionError.ID"].astype("u1") & 0x3F + columns["ID"] = vals + if frame_map: + columns["Name"] = [frame_map.get(_id, "") for _id in vals.tolist()] - columns["Event Type"] = "Transmission Error Frame" - columns["Direction"] = ["TX"] * count + columns["Event Type"] = "Transmission Error Frame" + columns["Direction"] = ["TX"] * count - vals = None + vals = None - elif data.name == "LIN_ReceiveError": - names = set(data.samples.dtype.names) + elif data.name == "LIN_ReceiveError": + names = set(data.samples.dtype.names) - if "LIN_ReceiveError.BusChannel" in names: - vals = data["LIN_ReceiveError.BusChannel"].astype("u1") - vals = [f"LIN {chn}" for chn in vals.tolist()] - columns["Bus"] = vals + if "LIN_ReceiveError.BusChannel" in names: + vals = data["LIN_ReceiveError.BusChannel"].astype("u1") + vals = [f"LIN {chn}" for chn in vals.tolist()] + columns["Bus"] = vals - if "LIN_ReceiveError.BaudRate" in names: - vals = data["LIN_ReceiveError.BaudRate"] - unique = np.unique(vals).tolist() - for val in unique: - sys.intern(f"Baudrate {val}") - vals = [f"Baudrate {val}" for val in vals.tolist()] - columns["Details"] = vals + if "LIN_ReceiveError.BaudRate" in names: + vals = data["LIN_ReceiveError.BaudRate"] + unique = np.unique(vals).tolist() + for val in unique: + sys.intern(f"Baudrate {val}") + vals = [f"Baudrate {val}" for val in vals.tolist()] + columns["Details"] = vals - if "LIN_ReceiveError.ID" in names: - vals = data["LIN_ReceiveError.ID"].astype("u1") & 0x3F - columns["ID"] = vals - if frame_map: - columns["Name"] = [frame_map[_id] for _id in vals] + if "LIN_ReceiveError.ID" in names: + vals = data["LIN_ReceiveError.ID"].astype("u1") & 0x3F + columns["ID"] = vals + if frame_map: + columns["Name"] = [frame_map[_id] for _id in vals] - columns["Event Type"] = "Receive Error Frame" + columns["Event Type"] = "Receive Error Frame" - columns["Direction"] = ["RX"] * count + columns["Direction"] = ["RX"] * count - vals = None + vals = None - elif data.name == "LIN_ChecksumError": - names = set(data.samples.dtype.names) + elif data.name == "LIN_ChecksumError": + names = set(data.samples.dtype.names) - if "LIN_ChecksumError.BusChannel" in names: - vals = data["LIN_ChecksumError.BusChannel"].astype("u1") - vals = [f"LIN {chn}" for chn in vals.tolist()] - columns["Bus"] = vals + if "LIN_ChecksumError.BusChannel" in names: + vals = data["LIN_ChecksumError.BusChannel"].astype("u1") + vals = [f"LIN {chn}" for chn in vals.tolist()] + columns["Bus"] = vals - if "LIN_ChecksumError.Checksum" in names: - vals = data["LIN_ChecksumError.Checksum"] - unique = np.unique(vals).tolist() - for val in unique: - sys.intern(f"Baudrate {val}") - vals = [f"Checksum 0x{val:02X}" for val in vals.tolist()] - columns["Details"] = vals + if "LIN_ChecksumError.Checksum" in names: + vals = data["LIN_ChecksumError.Checksum"] + unique = np.unique(vals).tolist() + for val in unique: + sys.intern(f"Baudrate {val}") + vals = [f"Checksum 0x{val:02X}" for val in vals.tolist()] + columns["Details"] = vals - if "LIN_ChecksumError.ID" in names: - vals = data["LIN_ChecksumError.ID"].astype("u1") & 0x3F - columns["ID"] = vals - if frame_map: - columns["Name"] = [frame_map[_id] for _id in vals] + if "LIN_ChecksumError.ID" in names: + vals = data["LIN_ChecksumError.ID"].astype("u1") & 0x3F + columns["ID"] = vals + if frame_map: + columns["Name"] = [frame_map[_id] for _id in vals] - if "LIN_ChecksumError.DataBytes" in names: - data_length = data["LIN_ChecksumError.DataLength"].astype("u1").tolist() - columns["Data Length"] = data_length + if "LIN_ChecksumError.DataBytes" in names: + data_length = data["LIN_ChecksumError.DataLength"].astype("u1").tolist() + columns["Data Length"] = data_length - vals = csv_bytearray2hex( - pd.Series(list(data["LIN_ChecksumError.DataBytes"])), - data_length, - ) - columns["Data Bytes"] = vals + vals = csv_bytearray2hex( + pd.Series(list(data["LIN_ChecksumError.DataBytes"])), + data_length, + ) + columns["Data Bytes"] = vals - columns["Event Type"] = "Checksum Error Frame" + columns["Event Type"] = "Checksum Error Frame" - if "LIN_ChecksumError.Dir" in names: - columns["Direction"] = [ - "TX" if dir else "RX" for dir in data["LIN_ChecksumError.Dir"].astype("u1").tolist() - ] + if "LIN_ChecksumError.Dir" in names: + columns["Direction"] = [ + "TX" if dir else "RX" for dir in data["LIN_ChecksumError.Dir"].astype("u1").tolist() + ] - vals = None + vals = None - dfs.append(pd.DataFrame(columns, index=df_index)) + dfs.append(pd.DataFrame(columns, index=df_index)) if not dfs: - return + df_index = [] + count = 0 + + columns = { + "timestamps": df_index, + "Bus": np.full(count, "Unknown", dtype="O"), + "ID": np.full(count, 0xFFFFFFFF, dtype="u4"), + "Direction": np.full(count, "", dtype="O"), + "Name": np.full(count, "", dtype="O"), + "Event Type": np.full(count, "LIN Frame", dtype="O"), + "Details": np.full(count, "", dtype="O"), + "Received Byte Count": np.zeros(count, dtype="u1"), + "Data Length": np.zeros(count, dtype="u1"), + "Data Bytes": np.full(count, "", dtype="O"), + } + + signals = pd.DataFrame(columns, index=df_index) + else: signals = pd.concat(dfs).sort_index() index = pd.Index(range(len(signals)))