Skip to content

Commit

Permalink
Merge pull request #978 from VorobiovM/master
Browse files Browse the repository at this point in the history
Parsing of nested CA structures
  • Loading branch information
danielhrisca authored Mar 6, 2024
2 parents 8c1c5ff + 74f8a9a commit b955a0b
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 17 deletions.
159 changes: 142 additions & 17 deletions src/asammdf/blocks/mdf_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import mmap
import os
from pathlib import Path
import re
import shutil
import sys
from tempfile import gettempdir, NamedTemporaryFile
Expand Down Expand Up @@ -993,12 +994,14 @@ def _read_channels(
f"Channel component address {component_addr:X} is outside the file size {self.file_limit}"
)
break

index = ch_cntr - 1
dependencies.append(None)

# check if it is a CABLOCK or CNBLOCK
stream.seek(component_addr)
blk_id = stream.read(4)
if blk_id == b"##CN":
index = ch_cntr - 1
dependencies.append(None)
(
ch_cntr,
ret_composition,
Expand All @@ -1019,10 +1022,16 @@ def _read_channels(
else:
# only channel arrays with storage=CN_TEMPLATE are
# supported so far
channel.dtype_fmt = dtype(
get_fmt_v4(
channel.data_type,
channel.bit_offset + channel.bit_count,
channel.channel_type,
)
)

first_dep = ca_block = ChannelArrayBlock(address=component_addr, stream=stream, mapped=mapped)
if ca_block.storage != v4c.CA_STORAGE_TYPE_CN_TEMPLATE:
logger.warning("Only CN template arrays are supported")
ca_list = [ca_block]
dependencies[index] = [first_dep]

while ca_block.composition_addr:
stream.seek(ca_block.composition_addr)
Expand All @@ -1033,20 +1042,130 @@ def _read_channels(
stream=stream,
mapped=mapped,
)
ca_list.append(ca_block)
else:
logger.warning("skipping CN block; CN block within CA block" " is not implemented yet")
break
dependencies[index].append(ca_block)

dependencies.append(ca_list)
elif channel.data_type == v4c.DATA_TYPE_BYTEARRAY:
# read CA-CN nested structure
(
ch_cntr,
ret_composition,
ret_composition_dtype,
) = self._read_channels(
ca_block.composition_addr,
grp,
stream,
dg_cntr,
ch_cntr,
True,
mapped=mapped,
)

channel.dtype_fmt = dtype(
get_fmt_v4(
channel.data_type,
channel.bit_offset + channel.bit_count,
channel.channel_type,
)
)
ca_cnt = len(dependencies[index])
if ret_composition:
dependencies[index].extend(ret_composition)

byte_offset_factors = []
bit_pos_inval_factors = []
dimensions = []
total_elem = 1

for ca_blck in dependencies[index][:ca_cnt]:
# only consider CN templates
if ca_blck.ca_type != v4c.CA_STORAGE_TYPE_CN_TEMPLATE:
logger.warning("Only CN template arrays are supported")
continue

# 1D array with dimensions
for i in range(ca_blck.dims):
dim_size = ca_blck[f"dim_size_{i}"]
dimensions.append(dim_size)
total_elem *= dim_size

# 1D arrays for byte offset and invalidation bit pos calculations
byte_offset_factors.extend(ca_blck.get_byte_offset_factors())
bit_pos_inval_factors.extend(ca_blck.get_bit_pos_inval_factors())

multipliers = [1] * len(dimensions)
for i in range(len(dimensions) - 2, -1, -1):
multipliers[i] = multipliers[i + 1] * dimensions[i + 1]

def _get_nd_coords(index, factors: list[int]) -> list[int]:
"""Convert 1D index to CA's nD coordinates"""
coords = [0] * len(factors)
for i, factor in enumerate(factors):
coords[i] = index // factor
index %= factor
return coords

def _get_name_with_indices(ch_name: str, ch_parent_name: str, indices: list[int]) -> str:
coords = "[" + "][".join(str(coord) for coord in indices) + "]"
m = re.match(ch_parent_name, ch_name)
n = re.search(r"\[\d+\]", ch_name)
if m:
name = ch_name[: m.end()] + coords + ch_name[m.end() :]
elif n:
name = ch_name[: n.start()] + coords + ch_name[n.start() :]
else:
name = ch_name + coords
return name

ch_len = len(channels)
for elem_id in range(1, total_elem):
for cn_id in range(index, ch_len):
nd_coords = _get_nd_coords(elem_id, multipliers)

# copy composition block
new_block = deepcopy(channels[cn_id])

# update byte offset & position of invalidation bit
byte_offset = bit_offset = 0
for coord, byte_factor, bit_factor in zip(
nd_coords, byte_offset_factors, bit_pos_inval_factors
):
byte_offset += coord * byte_factor
bit_offset += coord * bit_factor
new_block.byte_offset += byte_offset
new_block.pos_invalidation_bit += bit_offset

# update channel name
new_block.name = _get_name_with_indices(new_block.name, channel.name, nd_coords)

# append to channel list
channels.append(new_block)

# update channel dependencies
if dependencies[cn_id] is not None:
deps = []
for dep in dependencies[cn_id]:
if not isinstance(dep, ChannelArrayBlock):
dep_entry = (dep[0], dep[1] + (ch_len - index) * elem_id)
deps.append(dep_entry)
dependencies.append(deps)
else:
dependencies.append(None)

# update channels db
entry = (dg_cntr, ch_cntr)
self.channels_db.add(new_block.name, entry)
ch_cntr += 1

# modify channels' names found recursively in-place
orig_name = channel.name
for cn_id in range(index, ch_len):
nd_coords = _get_nd_coords(0, multipliers)
name = _get_name_with_indices(channels[cn_id].name, orig_name, nd_coords)
entry = self.channels_db.pop(channels[cn_id].name)
channels[cn_id].name = name
# original channel entry will only contain single source tuple
self.channels_db.add(name, entry[0])

break

else:
logger.warning(
"skipping CN block; Nested CA structure should be contained within BYTEARRAY data type"
)
break

else:
dependencies.append(None)
Expand Down Expand Up @@ -6942,6 +7061,9 @@ def _get_array(
cycles_nr = len(vals)

for ca_block in dependency_list[:1]:
if not isinstance(ca_block, ChannelArrayBlock):
break

dims_nr = ca_block.dims

if ca_block.ca_type == v4c.CA_TYPE_SCALE_AXIS:
Expand Down Expand Up @@ -7032,6 +7154,9 @@ def _get_array(
types.append(dtype_pair)

for ca_block in dependency_list[1:]:
if not isinstance(ca_block, ChannelArrayBlock):
break

dims_nr = ca_block.dims

if ca_block.flags & v4c.FLAG_CA_FIXED_AXIS:
Expand Down
25 changes: 25 additions & 0 deletions src/asammdf/blocks/v4_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,31 @@ def __bytes__(self) -> bytes:
result = pack(fmt, *[getattr(self, key) for key in keys])
return result

def get_byte_offset_factors(self) -> list[int]:
"""Returns list of factors f(d), used to calculate byte offset"""
return self._factors(self.byte_offset_base)

def get_bit_pos_inval_factors(self) -> list[int]:
"""Returns list of factors f(d), used to calculate invalidation bit position"""
return self._factors(self.invalidation_bit_base)

def _factors(self, base: int) -> list[int]:
factor = base
factors = [factor]
# column oriented layout
if self.flags & v4c.FLAG_CA_INVERSE_LAYOUT:
for i in range(1, self.dims):
factor *= self[f"dim_size_{i - 1}"]
factors.append(factor)

# row oriented layout
else:
for i in range(self.dims - 2, -1, -1):
factor *= self[f"dim_size_{i + 1}"]
factors.insert(0, factor)

return factors


class ChannelGroup:
"""*ChannelGroup* has the following attributes, that are also available as
Expand Down

0 comments on commit b955a0b

Please sign in to comment.