From f059da1f160698738cb9081c71d6d9afb821a5b1 Mon Sep 17 00:00:00 2001 From: Peter Konradi Date: Wed, 1 Feb 2023 15:14:53 +0100 Subject: [PATCH] Local state --- README.md | 140 ++++++++++++ pydapsys/__init__.py | 0 pydapsys/neo_convert/__init__.py | 0 pydapsys/neo_convert/abstract_converter.py | 156 +++++++++++++ pydapsys/neo_convert/ni_pulse_stim.py | 108 +++++++++ pydapsys/page.py | 54 +++++ pydapsys/rawio/__init__.py | 6 + pydapsys/rawio/basic.py | 250 +++++++++++++++++++++ pydapsys/rawio/embedded.py | 71 ++++++ pydapsys/rawio/np_embedded.py | 50 +++++ pydapsys/read.py | 109 +++++++++ pydapsys/toc/__init__.py | 0 pydapsys/toc/entry.py | 128 +++++++++++ pydapsys/toc/plot.py | 54 +++++ pydapsys/util/__init__.py | 0 pydapsys/util/floats.py | 14 ++ pydapsys/util/structs.py | 92 ++++++++ pyproject.toml | 18 ++ tests/__init__.py | 0 19 files changed, 1250 insertions(+) create mode 100644 README.md create mode 100644 pydapsys/__init__.py create mode 100644 pydapsys/neo_convert/__init__.py create mode 100644 pydapsys/neo_convert/abstract_converter.py create mode 100644 pydapsys/neo_convert/ni_pulse_stim.py create mode 100644 pydapsys/page.py create mode 100644 pydapsys/rawio/__init__.py create mode 100644 pydapsys/rawio/basic.py create mode 100644 pydapsys/rawio/embedded.py create mode 100644 pydapsys/rawio/np_embedded.py create mode 100644 pydapsys/read.py create mode 100644 pydapsys/toc/__init__.py create mode 100644 pydapsys/toc/entry.py create mode 100644 pydapsys/toc/plot.py create mode 100644 pydapsys/util/__init__.py create mode 100644 pydapsys/util/floats.py create mode 100644 pydapsys/util/structs.py create mode 100644 pyproject.toml create mode 100644 tests/__init__.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..dd4ee87 --- /dev/null +++ b/README.md @@ -0,0 +1,140 @@ +# PyDapsys - Read DAPSYS recordings with Python + +PyDapsys is a package to read neurography recordings made with [DAPSYS](http://dapsys.net/) (Data Acquisition Processor System). It is based on a reverse-engineered specification of the binary data format used by the latest DAPSYS version. + +Optionally, the library provides functionality to store loaded data into [Neo](https://github.com/NeuralEnsemble/python-neo) datastrucutres, from where they can be exported into various other formats. + +## Installation + +Download the latest release from the Github releases page. + +### Basic functionalities + +Will only offer the data representation of PyDapsys, without ability to convert to Neo. Has only numpy as sole dependency. + +`pip install {name_of_downloaded_wheel}.whl` + +### With Neo converters + +Install base library with additional dependencies required to load data into Neo datastructures. Writing Neo datastructures to some formats may require additional dependencies. Please see the Neo documentation for further information. + +`pip install {name_of_downloaded_wheel}.whl[neo]` + +## Usage + +### Basics + +A Dapsys file is made up of two parts: A sequential list of blocks or **pages**, which store either a text with a timestamp or a waveform with associated timestamps, and a table of contents (toc). The toc consists of **folders** and **streams**. Each page has an id unique in the context of the file. Streams in the toc have an array of ids of the pages belonging to the stream. A stream is either a text stream (referring only to text pages) or a data stream (referring only to recording pages). + +#### Load a file + +Use `read_file` to get the root of the table of contents and a dictionary which maps from the page ids to the object representing the page itself. + +```python +from pydapsys.read import read_file +from pathlib import Path +MY_DAPSYS_FILE = Path(".")/"to"/"my"/"dapsys_file.dps" +toc_root, pages = read_file(MY_DAPSYS_FILE) +``` + +The `toc_root` object will have children, either folders (which, in turn, can have additional children) or streams. You can access the childrens by using the index-operator. Access to children is case-insensitive. This is done for conveniance and does not inlfuence the correctness, as DAPSYS itself does not allow two objects of the same (case insensitive) name to exist on the same hierachy level. For typed access you can use either `.f` to get folders or `.s` to only get streams: + +```python +comment_stream = toc_root["comments"] # Will return the stream Comments, but is typed as generic stream +comment_stream = toc_root.s["coMMents"] # Will return the stream Comments, typed as Stream +top_folder = toc_root.f["Folder"] # will return the folder Folder +top_folder = toc_root.f["comments"] # will fail (exception), because comments is not a folder + +# iterate over all folders: +for folder in toc_root.f.values(): + ... + +# iterate over all streams: +for stream in toc_root.s.values(): + ... +``` + +#### Access data from a file + +To get text data from a file, get the datastream object from the toc and access its `page_ids` property. For conveniance, the `__getitem__`, `__iter__` and `__contains__` methods of stream objects have been overloaded to return the result of the same operation on `page_ids`. From there, you can get the corresponding pages from the `pages` dict: + +```python +from pydapsys.toc.entry import StreamType + +def get_pages(stream, expected_stream_type: StreamType): + if stream.entry_type != expected_stream_type: + raise Exception(f"{stream.name} is not a {expected_stream_type.name} stream, but {stream.stream_type.name}") + return [pages[page_id] for page_id in stream] # or [pages[page_id] for page_id in stream.page_ids] + +text_stream = ... +text_pages = get_pages(text_stream, StreamType.Text) + +waveform_stream = ... +waveform_pages = get_pages(waveform_stream, StreamType.Waveform) +``` + +##### Text pages + +A text page consists of three fields: + +* `text`: The text stored in the page, string + +* `timestamp_a`: The first timestamp of the page, float64 (seconds) + +* `timestamp_b`: The second timestamp of the page (float64, seconds), which sometimes is not presented and is thus set to None + +##### Waveform pages + +Waveform pages consist of three fields: + +* `values`: Values of the waveform, float32 (volt) + +* `timestamps`: Timestamps corresponding to `values`, float64 (seconds) + +* `interval`: Interval between values, float64 (seconds) + +In **continuously sampled waveforms**, only the timestamp of the first value will be present, in addition to the sampling `interval`. The timestamps of the other values can be calculated by this two values. + +**Irregularly sampled waveforms** will have one timestamp for each value, but no `interval`. + +## Neo converters + +The module `pydapsys.neo_convert` contains classes to convert a Dapsys recording to the Neo format. **IMPORTANT: importing the module without installing neo first will raise an exception** + +As Dapsys files may have different structures, depending on how it was configured and what hardware is used, different converters are required for each file structure. + +Currently there is only one converter available, for recordings made using a NI Pulse stimulator. + +### NI Pulse stimulator + +Converter class for Dapsys recording created using an NI Pulse stimulator. Puts everything into one neo sequence. +Waveform pages of the continuous recording are merged if the difference between a pair of consecutive pages is less than a specified threshold (`grouping_tolerance`). + +```python +from pydapsys.neo_convert.ni_pulse_stim import NIPulseStimulatorToNeo + +# convert a recording to a neo block +neo_block = NIPulseStimulatorToNeo(toc_root, pages, grouping_tolerance=1e-9).to_neo() +``` + +#### Expected file structure + +{stim_folder} must be one of "NI Puls Stimulator", "pulse stimulator", "NI Pulse stimulator", but can be changed by adding entries to `NIPulseStimulatorToNeo.stim_foler_names` + +* Root + + * [Text] Comments -> Converted into a single event called "comments" + + * {stim_folder} + + * [Text] Pulses -> Converted into one neo event streams, one per unique text + + * [Waveform] Continuous recording -> Converted into multiple AnalogSignals + + * Responses + + * Tracks for All Responses -> Optional. Will silently ignore spike trains if this folder does not exist + + * ... [Text] tracks... -> Converted into spike trains + + diff --git a/pydapsys/__init__.py b/pydapsys/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pydapsys/neo_convert/__init__.py b/pydapsys/neo_convert/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pydapsys/neo_convert/abstract_converter.py b/pydapsys/neo_convert/abstract_converter.py new file mode 100644 index 0000000..9dadcd3 --- /dev/null +++ b/pydapsys/neo_convert/abstract_converter.py @@ -0,0 +1,156 @@ +from abc import ABC, abstractmethod +from typing import Mapping, Sequence, Union, Optional, Iterable, List, Dict + +import neo +import numpy as np +import numpy.typing as npt +import quantities as pq +from pydapsys.neoconverter import recording_segment_end + +from pydapsys.page import DataPage, WaveformPage +from pydapsys.toc.entry import Root, Stream, StreamType +from pydapsys.util.floats import float_comp + + +class DapsysToNeoConverter(ABC): + """ Converter to put Dapsys recordings into the neo structure + + This abstract base class provides common functionalities to transform Dapsys streams into common neo structures + :param toc: Root of the table of contents + :type toc: class:`pydapsys.toc.entry.Root` + :param pages: Mapping between the id of the data page and itself + :type toc: class:`typing.Mapping[int, pydapsys.page.DataPage]` + """ + + def __init__(self, toc: Root, pages: Mapping[int, DataPage]): + self.toc = toc + self.pages = pages + + @abstractmethod + def to_neo(self) -> neo.Block: + """ + Create a neo structure based on the given recording + :return: A neo block containing the data from the recording + """ + ... + + def _pageids_to_event(self, page_ids: Union[Sequence[int], npt.NDArray[np.uint32]], name: str = "") -> neo.Event: + """Converts data from a sequence (or numpy array) of page ids to a neo event. + The labels will be taken from the page text and the event times from the first timestamp (timestamp_a) + :param page_ids: Page ids of the comment pages + :param name: name of the returned neo event + :return: A neo event containing the text of the comment pages as labels and their first timestamps as times + """ + times = np.empty(len(page_ids), dtype=np.float64) + comments = np.empty(len(page_ids), dtype=str) + for i in range(len(page_ids)): + times[i] = self.pages[page_ids[i]].timestamp_a + comments[i] = self.pages[page_ids[i]].comment + return neo.Event(times=times, labels=comments, units=pq.second, name=name, copy=False) + + def textstream_to_event(self, stream: Stream, name: Optional[str] = None) -> neo.Event: + """Converts data from a text stream to a neo event. + + Labels of the event will be the text from the pages and the event times the first timestamp (timestamp_a) from them. + + :param stream: Stream to convert + :param name: name of the returned neo event, defaults to the name of the passed stream + :return: A neo event containing the text of the streams comment pages as labels and their first timestamps as times + """ + if stream.stream_type != StreamType.Text: + raise ValueError(f"StreamType.Text required for this operation, not {stream.stream_type.name}") + return self._pageids_to_event(stream.page_ids, name=stream.name if name is None else name) + + def _pageids_to_spiketrain(self, page_ids: Union[Sequence[int], npt.NDArray[np.uint32]], t_stop: float, + name: str = "") -> neo.SpikeTrain: + """Puts data from comment pages into a spike train. Requires an additional parameter t_stop for the equally named, + required parameter on :class:`neo.SpikeTrain`. t_stop must be greater than the last timestamp of the train. + + The times of the spike train will be taken from the timestamp_a of the given comment pages. + + :param page_ids: Page ids of the comment pages + :param t_stop: t_stop parameter to set on :class:`neo.SpikeTrain` + :param name: Name of the spike train, optional. + :return: A spike train build from the comment pages + """ + return neo.SpikeTrain( + times=np.fromiter((comment.timestamp_a for comment in (self.pages[pid] for pid in page_ids)), + dtype=np.float64, count=len(page_ids)), name=name, units=pq.second, t_stop=t_stop, + copy=False) + + def textstream_to_spiketrain(self, stream: Stream, t_stop: float, name: Optional[str] = None) -> neo.SpikeTrain: + """Puts data from a text stream into a spike train. Requires an additional parameter t_stop for the equally named, + required parameter on :class:`neo.SpikeTrain`. t_stop must be greater than the last timestamp of the train. + + The times of the spike train will be taken from the timestamp_a of the streams comment pages. + :param stream: The stream to convert + :param t_stop: t_stop parameter to set on :class:`neo.SpikeTrain` + :param name: Name of the spike train. Will default to the name of the stream + :return: A spike train build from the given text stream + """ + if stream.stream_type != StreamType.Text: + raise ValueError(f"StreamType.Text required for this operation, not {stream.stream_type.name}") + return self._pageids_to_spiketrain(stream.page_ids, t_stop, name=stream.name if name is None else name) + + def _pageids_to_events_by_comment_text(self, page_ids: Union[Sequence[int], npt.NDArray[np.uint32]]) -> Iterable[ + neo.Event]: + """Orders a number of comment pages by their text and emits one event for each unique text. + The times are loaded from the comment pages timestamp_a, will have no labels and the name of the events will be + the unique text. + :param page_ids: Ids of the comment pages + :return: An iterable of neo events + """ + comment_string_to_timestamps: Dict[str, List[float]] = dict() + for comment in (self.pages[pid] for pid in page_ids): + comment_string_to_timestamps.setdefault(comment.comment, list()).append(comment.timestamp_a) + for comment_string, comment_timestamps in comment_string_to_timestamps.items(): + yield neo.Event(times=np.array(comment_timestamps, dtype=np.float64), units=pq.second, name=comment_string, + copy=False) + + def textstream_to_events_by_comment_text(self, stream: Stream) -> Iterable[neo.Event]: + """Orders the comment pages of a text stream by their text and emits one event for each unique text. + The times are loaded from the comment pages timestamp_a, will have no labels and the name of the events will be + the unique text. + :param stream: A text stream to convert + :return: An iterable of neo events + """ + if stream.stream_type != StreamType.Text: + raise ValueError(f"StreamType.Text required for this operation, not {stream.stream_type.name}") + return self._pageids_to_events_by_comment_text(stream.page_ids) + + def _group_recordingsegments(self, rec_pages: Iterable[WaveformPage], tolerance: float = 1e-5) -> Iterable[ + List[WaveformPage]]: + """Groups consecutive recording pages into lists, if the difference between the end of the last page and the start + of the next one is less than the threshold and they have the same sampling interval + :param rec_pages: Recording pages to group. Must be in orderly sequence. + :param tolerance: Tolerance for grouping, defaults to 1e-5 + :return: An iterable of lists containing grouped recording pages + """ + page_iter = iter(rec_pages) + current_set: List[WaveformPage] = [next(page_iter)] + for page in page_iter: + if not (float_comp(current_set[-1].interval, page.interval) and \ + not float_comp(recording_segment_end(current_set[-1]) + current_set[-1].interval, + page.timestamps[0], epsilon=tolerance)): + current_set.append(page) + else: + yield current_set + current_set = [page] + yield current_set + + def waveformstream_to_analogsignals(self, stream: Stream, tolerance: float = 1e-5) -> Iterable[neo.AnalogSignal]: + """ Groups consecutive pages of a waveform stream together, based on the given tolerance and creates one + AnalogSignal from each group. + + :param stream: Data stream to convert + :param tolerance: Tolerance for grouping + :return: Analog signals created from grouped recording pages + """ + if stream.stream_type != StreamType.Waveform: + raise ValueError(f"StreamType.Waveform required for this operation, not {stream.stream_type.name}") + for segment_group in self._group_recordingsegments((self.pages[pid] for pid in stream.page_ids), + tolerance=tolerance): + continuous = np.concatenate(list(segment.values for segment in segment_group)).ravel() + yield neo.AnalogSignal(continuous, pq.volt, + t_start=segment_group[0].timestamps[0] * pq.second, + sampling_period=segment_group[0].interval * pq.second, copy=False) diff --git a/pydapsys/neo_convert/ni_pulse_stim.py b/pydapsys/neo_convert/ni_pulse_stim.py new file mode 100644 index 0000000..12bdaaa --- /dev/null +++ b/pydapsys/neo_convert/ni_pulse_stim.py @@ -0,0 +1,108 @@ +from datetime import datetime +from typing import Mapping, Optional, Iterable + +import neo + +from pydapsys.neo_convert.abstract_converter import DapsysToNeoConverter +from pydapsys.page import DataPage +from pydapsys.toc.entry import Root, Folder, Stream, StreamType + + +class NIPulseStimulatorToNeo(DapsysToNeoConverter): + """Converter class for Dapsys recording created using an NI Pulse stimulator. Puts everything into one neo sequence. + Waveform pages of continuous recording are merged if the difference between a pair of consecutive pages is less than a specified threshold. + + + Expected structure is: + + - Root + - Comments -> Converted into an Event + - [stim_folder_name] + - Pulses -> Converted into one neo event streams, one per unique name + - Continuous recording -> Converted into multiple AnalogSignals + - Responses + - Tracks for All Responses -> Optional. If it doesn't exist, there simply will be no spike trains + - ...Track text streams... -> Will be converted into one spike train each + + :param toc: Root of the table of contents + :type toc: class:`pydapsys.toc.entry.Root` + :param pages: Mapping between the id of the data page and itself + :type toc: class:`typing.Mapping[int, pydapsys.page.DataPage]` + :param grouping_tolerance: Maximum delta (in seconds) between two consecutive pages to group them together, defaults to 1e-9 + :type grouping_tolerance: float + """ + stim_foler_names = ["NI Puls Stimulator", "pulse stimulator", "NI Pulse stimulator"] + """valid stimulator names for this converter""" + + def __init__(self, toc: Root, pages: Mapping[int, DataPage], grouping_tolerance=1e-9): + """constructor method""" + super().__init__(toc, pages) + self.grouping_tolerance = grouping_tolerance + + @property + def stimulator_folder(self) -> Folder: + """ + Returns the folder of the stimulator. + + Looks in :attr:`self.toc` for a folder with one of the keys of :attr:`self.stim_folder_names` and returns the first match + :return:The folder object of the stimulator + """ + candidates = self.toc.folders + for stim_name in self.stim_foler_names: + if stim_name in candidates: + return candidates[stim_name] + raise Exception(f"Could not find a fitting stimulator name: {self.toc.children.keys()}") + + @property + def comment_stream(self) -> Stream: + """ + Returns the stream containing the comments of the recording (root/comments) + :return: Comment stream + """ + return self.toc.s["comments"] + + @property + def track_textstreams(self) -> Iterable[Stream]: + """ + Yields the streams containing sorted tracks + + Looks in root/[stimulator]/responses/tracks for all responses/ for any streams and returns them. + root/[stimulator]/responses must exist, the function will silently ignore missing "tracks for all responses". + :return: Streams containing sorted tracks + """ + if "tracks for all responses" in self.stimulator_folder.f["responses"].folders: + for pulse_stream in self.stimulator_folder.f["responses"].f["tracks for all responses"].streams.values(): + if pulse_stream.stream_type == StreamType.Text: + yield pulse_stream + + def to_neo(self, block_name="DAPSYS recording", segment_name="Main segment", + file_datetime: Optional[datetime] = None, rec_datetime: Optional[datetime] = None) -> neo.Block: + """ + Attemps to read the data of the recording into a neo structure. + :param block_name: Name of the neo Block that will be returned + :param segment_name: Name of the sole sequence contained in the block + :param file_datetime: File datetime to set on the neo block and sequence. If none, will be set to unix-epoch 0 + :param rec_datetime: Recording datetime to set on the neo block and sequence. If none, will be set to unix-epoch 0 + :return: A neo block structured according to classdoc. + """ + file_datetime = datetime.fromtimestamp(0) if file_datetime is None else file_datetime + rec_datetime = datetime.fromtimestamp(0) if rec_datetime is None else rec_datetime + neo_block = neo.Block(name=block_name, file_datetime=file_datetime, rec_datetime=rec_datetime) + neo_segment = neo.Segment(name=segment_name, file_datetime=file_datetime, rec_datetime=rec_datetime) + neo_block.segments.append(neo_segment) + stim_folder = self.stimulator_folder + for analogsignal in self.waveformstream_to_analogsignals(stim_folder.s["continuous recording"], + tolerance=self.grouping_tolerance): + analogsignal.set_parent(neo_segment) + neo_segment.analogsignals.append(analogsignal) + for track_stream in self.track_textstreams: + spike_train = self.textstream_to_spiketrain(track_stream, neo_segment.analogsignals[-1].t_stop) + spike_train.set_parent(neo_segment) + neo_segment.spiketrains.append(spike_train) + for pulse in self.textstream_to_events_by_comment_text(stim_folder.s["Pulses"]): + pulse.set_parent(neo_segment) + neo_segment.events.append(pulse) + comments = self.textstream_to_event(self.comment_stream) + comments.set_parent(neo_segment) + neo_segment.events.append(comments) + return neo_block diff --git a/pydapsys/page.py b/pydapsys/page.py new file mode 100644 index 0000000..ad88d20 --- /dev/null +++ b/pydapsys/page.py @@ -0,0 +1,54 @@ +from abc import ABC +from dataclasses import dataclass +from enum import IntEnum +from typing import Optional + +import numpy as np +import numpy.typing as npt + + +class PageType(IntEnum): + """ + Type of the page + """ + Waveform = 2 + Text = 3 + + +@dataclass +class DataPage(ABC): + """ + Shared attributes for various data pages of a Dapsys recording + """ + type: PageType + id: int + reference_id: Optional[int] + + +@dataclass +class TextPage(DataPage): + """ + Page containing some text and at least one timestamp. + """ + text: str + """Text contained in the page""" + timestamp_a: float + """First timestamp""" + timestamp_b: Optional[float] + """Second timestamp""" + + +@dataclass +class WaveformPage(DataPage): + """ + Page containing datapoints from a recording. In a continuous recording, there will only be one timestamp for the first value, + but will have an interval for the time between the values. Irregular recordings will have a timestamp for each value, + but no interval + """ + values: npt.NDArray[np.float32] + timestamps: npt.NDArray[np.float64] + interval: Optional[float] + + @property + def irregular_recording(self) -> bool: + return len(self.timestamps) == len(self.values) diff --git a/pydapsys/rawio/__init__.py b/pydapsys/rawio/__init__.py new file mode 100644 index 0000000..49b30c3 --- /dev/null +++ b/pydapsys/rawio/__init__.py @@ -0,0 +1,6 @@ +from __future__ import annotations + +from typing import Literal + +INT_STRUCTS = Literal['b', 'B', 'h', 'H', 'i', 'I', 'l', 'L', 'n', 'N', 'P'] +FLOAT_STRUCTS = Literal['e', 'f', 'd'] diff --git a/pydapsys/rawio/basic.py b/pydapsys/rawio/basic.py new file mode 100644 index 0000000..4eb682f --- /dev/null +++ b/pydapsys/rawio/basic.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +from io import SEEK_CUR +from struct import unpack, calcsize +from typing import BinaryIO, Union, Tuple, Literal, overload, Optional + +from pydapsys.rawio import INT_STRUCTS, FLOAT_STRUCTS + + +def __read_nullaware(reader: BinaryIO, type_fmt: str, count: int, byte_order='<') -> Tuple: + """ + Reads a number of binary values "null aware": The function will check if the read bytes for each value are unitized. + + Will read x values and compare each bytes with 'CD' * length of block. If the block is unitilized, the value will be set to None. + Else the bytes will be unpacked. + Example: To read 8 32-bit ints, the function will read 4 bytes 8 times. Each 4-byte block will be compared with 'CDCDCDCD'. + If the comparison is true, the value will be set to None. Else it will unpack the bytes to an int. + :param reader: Open binary reader to read from + :param type_fmt: Type fmt string + :param count: Number of values to read + :param byte_order: byte order + :return: Tuple containing the read objects + """ + struct_str = byte_order + type_fmt + data_size = calcsize(struct_str) + null_bytes = bytes.fromhex('CD' * data_size) + return tuple(unpack(struct_str, read_bytes)[0] if read_bytes != null_bytes else None for read_bytes in + (reader.read(data_size) for _ in range(count))) + + +def __read_direct(reader: BinaryIO, type_fmt: str, count: int, byte_order='<') -> Tuple: + """ + Will read a number of values specified by the type fmt from a binary reader + :param reader: Open binary reader + :param type_fmt: Type fmt string of the data to read + :param count: number of values to read + :param byte_order: byte order + :return: Tuple containing the read values + """ + struct_str = byte_order + type_fmt * count + return unpack(struct_str, reader.read(calcsize(struct_str))) + + +@overload +def read_tuple(reader: BinaryIO, type_fmt: INT_STRUCTS, count: int, byte_order: str = ..., + check_null: Literal[False] = ...) -> Tuple[int, ...]: + ... + + +@overload +def read_tuple(reader: BinaryIO, type_fmt: INT_STRUCTS, count: int, byte_order: str = ..., + check_null: Literal[True] = ...) -> Tuple[Optional[int], ...]: + ... + + +@overload +def read_tuple(reader: BinaryIO, type_fmt: FLOAT_STRUCTS, count: int, byte_order: str = ..., + check_null: Literal[False] = ...) -> Tuple[float, ...]: + ... + + +@overload +def read_tuple(reader: BinaryIO, type_fmt: FLOAT_STRUCTS, count: int, byte_order: str = ..., + check_null: Literal[True] = ...) -> Tuple[Optional[float], ...]: + ... + + +# We need this last overload as a fallback for mypy when you call the function with a generic string and / or bool as parameter +@overload +def read_tuple(reader: BinaryIO, type_fmt: str, count: int, byte_order: str = ..., check_null: bool = ...) -> Tuple: + ... + + +def read_tuple(reader: BinaryIO, type_fmt: str, count: int, byte_order: str = '<', + check_null: bool = False) -> Union[ + Tuple[float, ...], Tuple[int, ...], Tuple[Optional[float], ...], Tuple[Optional[int]]]: + """ + Will read a tuple of values according to type_fmt. + :param reader: Open binary reader + :param type_fmt: Type fmt string + :param count: Number of values to read + :param byte_order: byte order + :param check_null: If the function should check if each function is unitilized according to visual C++ (0xCDCDCDCD) + :return:Tuple containig the read data + """ + read_func = __read_nullaware if check_null else __read_direct + unpacked_values = read_func(reader, type_fmt, count, byte_order=byte_order) + return unpacked_values + + +@overload +def read_single(reader: BinaryIO, type_fmt: INT_STRUCTS, byte_order: str = ..., + check_null: Literal[False] = ...) -> int: + ... + + +@overload +def read_single(reader: BinaryIO, type_fmt: INT_STRUCTS, byte_order: str = ..., check_null: Literal[True] = ...) -> \ + Optional[int]: + ... + + +@overload +def read_single(reader: BinaryIO, type_fmt: FLOAT_STRUCTS, byte_order: str = ..., + check_null: Literal[False] = ...) -> float: + ... + + +@overload +def read_single(reader: BinaryIO, type_fmt: FLOAT_STRUCTS, byte_order: str = ..., check_null: Literal[True] = ...) -> \ + Optional[float]: + ... + + +# We need this last overload as a fallback for mypy when you call the function with a generic string and / or bool as parameter +@overload +def read_single(reader: BinaryIO, type_fmt: str, byte_order: str = ..., check_null: bool = ...) -> Optional[ + Union[float, int]]: + ... + + +def read_single(reader: BinaryIO, type_fmt: str, byte_order: str = '<', + check_null: bool = False) -> \ + Optional[Union[float, int]]: + """ + Will read a single value according to type_fmt. + :param reader: Open binary reader + :param type_fmt: Type fmt string + :param byte_order: byte order + :param check_null: If the function should check if each function is unitilized according to visual C++ (0xCDCDCDCD) + :return:Tuple containig the read data + """ + return read_tuple(reader, type_fmt, 1, byte_order=byte_order, check_null=check_null)[0] + + +@overload +def read_u32(reader: BinaryIO, check_null: Literal[False] = ..., + byte_order: str = ...) -> int: + ... + + +@overload +def read_u32(reader: BinaryIO, check_null: Literal[True] = ..., byte_order: str = ...) -> Optional[int]: + ... + + +def read_u32(reader: BinaryIO, check_null=False, byte_order='<') -> Optional[int]: + """ + Will read a single u32 value + :param reader: Open binary reader + :param check_null: Wether to check for null + :param byte_order: + :return: An int or none. + """ + return read_single(reader, 'I', check_null=check_null, byte_order=byte_order) + + +@overload +def read_f32(reader: BinaryIO, check_null: Literal[False] = ..., + byte_order: str = ...) -> float: + ... + + +@overload +def read_f32(reader: BinaryIO, check_null: Literal[True] = ..., byte_order: str = ...) -> Optional[float]: + ... + + +def read_f32(reader: BinaryIO, check_null=False, byte_order='<') -> Optional[float]: + """ + Will read a single f32 value + :param reader: Open binary reader + :param check_null: Wether to check for null + :param byte_order: + :return: A float or none. + """ + return read_single(reader, 'f', check_null=check_null, byte_order=byte_order) + + +@overload +def read_f64(reader: BinaryIO, check_null: Literal[False] = ..., + byte_order: str = ...) -> float: + ... + + +@overload +def read_f64(reader: BinaryIO, check_null: Literal[True] = ..., byte_order: str = ...) -> \ + Optional[float]: + ... + + +def read_f64(reader: BinaryIO, check_null=False, byte_order='<') -> Optional[float]: + """ + Will read a single f64 value + :param reader: Open binary reader + :param check_null: Wether to check for null + :param byte_order: + :return: A float or none. + """ + return read_single(reader, 'd', check_null=check_null, byte_order=byte_order) + + +def read_ubyte(reader: BinaryIO, byte_order: str = '<') -> int: + """ + Reads the value of a single byte as usigned value + :param reader: Open binary reader + :param byte_order: byte order + :return: An integer representing the value of the read byte + """ + return read_single(reader, 'B', check_null=False, byte_order=byte_order) + + +def read_ubytes(reader: BinaryIO, count: int, byte_order: str = '<') -> Tuple[int, ...]: + """ + Reads the multiple bytes as usigned value + :param reader: Open binary reader + :param count: Number of bytes to read + :param byte_order: byte order to use when reading + :return: A tuple of integer values representing the individual usnigned values of the read bytes + """ + return read_tuple(reader, 'B', count, check_null=False, byte_order=byte_order) + + +def skip_32(reader: BinaryIO, count=1): + """ + Advances the reader in 32-bit steps + :param reader: Open binary reader + :param count: Number of 32-bit blocks to skip, defaults to 1 + """ + reader.seek(4 * count, SEEK_CUR) + + +def skip_64(reader: BinaryIO, count=1): + """ + Advances the reader in 64-bit steps + :param reader: Open binary reader + :param count: Number of 64-bit blocks to skip, defaults to 1 + """ + reader.seek(8 * count, SEEK_CUR) + + +def read_bool(reader: BinaryIO) -> bool: + """ + Reads a dapsys bool (reads 1 byte, then skips 3 additional bytes) + :param reader: Open binary reader + :return: Value of the read bool + """ + v = reader.read(1) + reader.seek(3, SEEK_CUR) + return v != 0 diff --git a/pydapsys/rawio/embedded.py b/pydapsys/rawio/embedded.py new file mode 100644 index 0000000..99d25b5 --- /dev/null +++ b/pydapsys/rawio/embedded.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from typing import BinaryIO, Tuple, overload, Union + +from pydapsys.rawio import INT_STRUCTS, FLOAT_STRUCTS +from pydapsys.rawio.basic import read_u32, read_tuple + + +@overload +def read_array(reader: BinaryIO, type_fmt: INT_STRUCTS, byte_order: str = ...) -> Tuple[int, ...]: + ... + + +@overload +def read_array(reader: BinaryIO, type_fmt: FLOAT_STRUCTS, byte_order: str = ...) -> Tuple[float, ...]: + ... + + +def read_array(reader: BinaryIO, type_fmt: str, byte_order: str = '<') -> Union[Tuple[int, ...], Tuple[float, ...]]: + """ + Reads an u32 value as x and then the following x values according to type fmt + :param reader: Open binary reader + :param type_fmt: Type fmt of the values + :param byte_order: byte order + :return: Tuple containing x values + """ + value_counts = read_u32(reader, byte_order=byte_order) + return read_tuple(reader, type_fmt, count=value_counts, check_null=False, byte_order=byte_order) + + +def read_u32_array(reader: BinaryIO, byte_order: str = '<') -> Tuple[int, ...]: + """ + Reads a single u32 value as x and then the following x u32 values as an array + :param reader: Open binary reader + :param byte_order: byte order + :return: Tuple containing x int values + """ + return read_array(reader, 'I', byte_order=byte_order) + + +def read_f32_array(reader: BinaryIO, byte_order: str = '<') -> Tuple[float, ...]: + """ + Reads a single u32 value as x and then the following x f32 values as an array + :param reader: Open binary reader + :param byte_order: byte order + :return: Tuple containing x float values + """ + return read_array(reader, 'f', byte_order=byte_order) + + +def read_f64_array(reader: BinaryIO, byte_order: str = '<') -> Tuple[float, ...]: + """ + Reads a single u32 value as x and then the following x f64 values as an array + :param reader: Open binary reader + :param byte_order: byte order + :return: Tuple containing x float values + """ + return read_array(reader, 'd', byte_order=byte_order) + + +def read_str(reader: BinaryIO, byte_order='<', encoding='latin_1') -> str: + """ + Reads a single u32 value as x and then the following x bytes and decodes them as string + :param reader: Open binary reader + :param byte_order: byte order + :param encoding: Encoding to use when decoding the bytes, defaults to 'latin_1' + :return: The decoded string + """ + length = read_u32(reader, byte_order=byte_order) + str_bytes = reader.read(length) + return str_bytes.decode(encoding=encoding) diff --git a/pydapsys/rawio/np_embedded.py b/pydapsys/rawio/np_embedded.py new file mode 100644 index 0000000..634c902 --- /dev/null +++ b/pydapsys/rawio/np_embedded.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from typing import BinaryIO + +import numpy as np +from numpy import typing as npt + +from pydapsys.rawio.basic import read_u32 + + +def _read_nparray(reader: BinaryIO, dtype: np.dtype, byte_order='<') -> npt.NDArray: + """ + Reads an u32 value as x and then uses numpy fromfile to read x following values of the specified type + :param reader: Open binary reader + :param dtype: Dtype of the values to read + :param byte_order: byte order to use when reading + :return: Numpy array containing the values read + """ + value_counts = read_u32(reader, byte_order=byte_order) + return np.fromfile(reader, dtype=dtype.newbyteorder(byte_order), count=value_counts) + + +def read_f32_nparray(reader: BinaryIO, byte_order='<') -> npt.NDArray[np.float32]: + """ + Reads an u32 value as x and then uses numpy fromfile to read x following f32 values of the specified type + :param reader: Open binary reader + :param byte_order: byte order to use when reading + :return: Numpy array containing the read np.float32 values + """ + return _read_nparray(reader, np.dtype(np.float32), byte_order=byte_order) + + +def read_f64_nparray(reader: BinaryIO, byte_order='<') -> npt.NDArray[np.float64]: + """ + Reads an u32 value as x and then uses numpy fromfile to read x following f64 values of the specified type + :param reader: Open binary reader + :param byte_order: byte order to use when reading + :return: Numpy array containing the read np.float64 values + """ + return _read_nparray(reader, np.dtype(np.float64), byte_order=byte_order) + + +def read_u32_nparray(reader: BinaryIO, byte_order='<') -> npt.NDArray[np.uint32]: + """ + Reads an u32 value as x and then uses numpy fromfile to read x following u32 values of the specified type + :param reader: Open binary reader + :param byte_order: byte order to use when reading + :return: Numpy array containing the read np.uint32 values + """ + return _read_nparray(reader, np.dtype(np.uint32), byte_order=byte_order) diff --git a/pydapsys/read.py b/pydapsys/read.py new file mode 100644 index 0000000..df0e088 --- /dev/null +++ b/pydapsys/read.py @@ -0,0 +1,109 @@ +from typing import BinaryIO, Tuple, Dict + +from pydapsys.page import DataPage, PageType, TextPage, WaveformPage +from pydapsys.rawio.basic import read_u32, read_f64, read_bool, skip_64, skip_32, read_ubytes +from pydapsys.rawio.embedded import read_str +from pydapsys.rawio.np_embedded import read_f32_nparray, read_f64_nparray, read_u32_nparray +from pydapsys.toc.entry import Entry, EntryType, Folder, Root, StreamType, Stream +from pydapsys.toc.plot import PlotConfig, PlotType, LatencyPlotUnit, PointStyle, RGBA8 +from pydapsys.util.structs import CaseInsensitiveDict + + +def _read_plot_config(file: BinaryIO) -> PlotConfig: + """ + Reads a plot configuration from a binary file + :param file: Opened binary file to read from + :return: The read plot config object + """ + plot_type = PlotType(read_u32(file)) + hist_interval = read_f64(file) + latency_unit = LatencyPlotUnit(read_u32(file)) + latency_reference = read_u32(file) + recording_unit = read_str(file) + point_style = PointStyle(read_u32(file)) + r, g, b, a = read_ubytes(file, 4) + hist_begin = read_f64(file) + return PlotConfig(plot_type, hist_interval, latency_unit, latency_reference, recording_unit, point_style, + RGBA8(r=r, g=g, b=b, a=a), + hist_begin) + + +def _read_toc_entry(file: BinaryIO) -> Entry: + """ + Reads an entry from the table of contents. Children will be read recursively. + :param file: Opened binary file to read from + :return: The entry, populated with its children (if any) + """ + type = EntryType(read_u32(file)) + name = read_str(file) + skip_32(file) + id = read_u32(file) + if type == EntryType.Folder: + child_count = read_u32(file) + children = {entry.name: entry for entry in + (_read_toc_entry(file) for _ in range(child_count))} + return Folder(id=id, name=name, children=CaseInsensitiveDict.from_dict(children)) + elif type == EntryType.Stream: + stream_type = StreamType(read_u32(file)) + plot_config = _read_plot_config(file) + open_at_start = read_bool(file) + page_ids = read_u32_nparray(file) + return Stream(id=id, name=name, stream_type=stream_type, open_at_start=open_at_start, plot_config=plot_config, + page_ids=page_ids) + else: + raise Exception(f"Unhandled entry type {type}") + + +def _read_toc(file: BinaryIO) -> Root: + """ + Reads the Root of the table of contents and recursively all further elements of it. + :param file: Opened binary file to read from + :return: The root of the ToC + """ + root_name = read_str(file) + skip_64(file) + element_count = read_u32(file) + children = {entry.name: entry for entry in + (_read_toc_entry(file) for _ in range(element_count))} + footer = read_str(file) + return Root(name=root_name, footer=footer, children=CaseInsensitiveDict.from_dict(children)) + + +def _read_page(file: BinaryIO) -> DataPage: + """ + Reads a page. Dynamically creates either a text page or a recording page, depending on the read page type. + :param file: Opened binary file to read from + :return: A DataPage, either a TextPage or a RecordingPage, depending on the read page type + """ + type = PageType(read_u32(file)) + id = read_u32(file) + ref = read_u32(file, check_null=True) + if type == PageType.Text: + comment = read_str(file) + ts_a = read_f64(file) + ts_b = read_f64(file, check_null=True) + return TextPage(type=type, id=id, reference_id=ref, text=comment, timestamp_a=ts_a, timestamp_b=ts_b) + elif type == PageType.Waveform: + values = read_f32_nparray(file) + timestamps = read_f64_nparray(file) + tail = read_f64(file, check_null=True) + skip_64(file, count=3) + return WaveformPage(type=type, id=id, reference_id=ref, values=values, timestamps=timestamps, + interval=tail) + else: + raise Exception(f"Unhandled page type {type}") + + +def read_file(file) -> Tuple[Root, Dict[int, DataPage]]: + """ + Reads a Dapsys recording file and returns the root of the table of contents and an dictionary mapping from a datapage + id to the respective datapage object + :param file: File to read from. Must be openable. + :return: Root of the ToC, DataPage mapping + """ + with open(file, "br") as file: + file.seek(0x30) + page_count = read_u32(file) + pages = {page.id: page for page in (_read_page(file) for _ in range(page_count))} + root = _read_toc(file) + return root, pages diff --git a/pydapsys/toc/__init__.py b/pydapsys/toc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pydapsys/toc/entry.py b/pydapsys/toc/entry.py new file mode 100644 index 0000000..2bcfaea --- /dev/null +++ b/pydapsys/toc/entry.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import IntEnum + +import numpy as np +import numpy.typing as npt + +from pydapsys.toc.plot import PlotConfig +from pydapsys.util.structs import CaseInsensitiveDict, CaseInsensitiveDictView + + +class EntryType(IntEnum): + """ + Type of an entry in the table of contents + """ + Folder = 1, + Stream = 2 + + +@dataclass +class Entry(ABC): + """ + Represents an entry of the table of contents + """ + name: str + """name of the entry in the ToC""" + id: int + """ToC id of the entry""" + + @property + @abstractmethod + def entry_type(self) -> EntryType: + """Type of the entry""" + ... + + +@dataclass +class ChildContainer: + """ + Abstract class for ToC entries that have children (Folders and root) + """ + children: CaseInsensitiveDict[Entry] + """Children of this entry""" + + @property + def f(self) -> CaseInsensitiveDictView[Folder]: + """View containing only sub folders of this entry""" + return self.folders + + @property + def folders(self) -> CaseInsensitiveDictView[Folder]: + """View containing only sub folders of this entry""" + return self.children.select(lambda _, v: v.entry_type == EntryType.Folder) + + @property + def s(self) -> CaseInsensitiveDictView[Stream]: + """View containing only streams of this entry""" + return self.streams + + @property + def streams(self) -> CaseInsensitiveDictView[Stream]: + """View containing only streams of this entry""" + return self.children.select(lambda _, v: v.entry_type == EntryType.Stream) + + def __getitem__(self, item: str) -> Entry: + return self.children[item] + + def __contains__(self, item: str) -> bool: + return item in self.children + + +@dataclass +class Folder(ChildContainer, Entry): + """ + Represents a folder in the ToC + """ + + @property + def entry_type(self) -> EntryType: + return EntryType.Folder + + +@dataclass +class Root(ChildContainer): + """The root of the table of contents. It differentiates from a :class:`pydapsys.toc.entry.Folder`, + as it does not have a ToC id and contains the footer string of the file.""" + name: str + """name of the root""" + footer: str + """footer string. Usually contains the version and serial number of the Dapsys program the recording was created from""" + + +class StreamType(IntEnum): + """ + Type of a stream + """ + Waveform = 2 + Text = 3 + + +@dataclass +class Stream(Entry): + """ + Stream entry in the ToC. + """ + stream_type: StreamType + """Type of the stream""" + open_at_start: bool + """Indicates if Dapsys should open this stream at start""" + page_ids: npt.NDArray[np.uint32] + """Pages belonging to this tream""" + plot_config: PlotConfig + """Plot configuration of this stream""" + + @property + def entry_type(self) -> EntryType: + return EntryType.Stream + + def __getitem__(self, item) -> int: + return self.page_ids[item] + + def __iter__(self): + return iter(self.page_ids) + + def __contains__(self, item): + return item in self.page_ids diff --git a/pydapsys/toc/plot.py b/pydapsys/toc/plot.py new file mode 100644 index 0000000..9511c31 --- /dev/null +++ b/pydapsys/toc/plot.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass +from enum import IntEnum + + +class PointStyle(IntEnum): + """ + Plot point styles + """ + Simple_Dot = 0x15 + Solid_Circle = 0x0A + Empty_Circle = 0x09 + Solid_Square = 0x02 + Empty_Square = 0x01 + NA = 0xFFFFFFFF + + +class LatencyPlotUnit(IntEnum): + """ + Units used in the latency plot (If applicable) + """ + MilliSec = 0 + Seconds = 1 + Hertz = 2 + + +class PlotType(IntEnum): + """ + Plot type + """ + Normal = 0 + Instantaneous = 1 + Histogram = 2 + Relative_Latency = 3 + + +@dataclass +class RGBA8(object): + r: int + g: int + b: int + a: int = 0 + + +@dataclass +class PlotConfig(object): + """Configuration of a stream plot""" + plot_type: PlotType + histogram_interval: float + latency_unit: LatencyPlotUnit + latency_reference: int + unit: str + point_style: PointStyle + waveform_color: RGBA8 + histogram_begin: float diff --git a/pydapsys/util/__init__.py b/pydapsys/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pydapsys/util/floats.py b/pydapsys/util/floats.py new file mode 100644 index 0000000..8223cce --- /dev/null +++ b/pydapsys/util/floats.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +import sys + + +def float_comp(f1: float, f2: float, epsilon=sys.float_info.epsilon) -> bool: + """ + Checks if two floats are equal enough + :param f1: Value 1 + :param f2: Value 2 + :param epsilon: Value used for check + :return: abs(f1-f2) <= epsilon + """ + return abs(f1 - f2) <= epsilon diff --git a/pydapsys/util/structs.py b/pydapsys/util/structs.py new file mode 100644 index 0000000..5512061 --- /dev/null +++ b/pydapsys/util/structs.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +from typing import MutableMapping, Iterator, TypeVar, Generic, Dict, Optional, Callable, Set + +_VT = TypeVar("_VT") +_ST = TypeVar("_ST") + + +class CaseInsensitiveDict(MutableMapping[str, _VT], Generic[_VT]): + """ + A class wrapping a dict with string keys in a read-only fashion. Provides case-insensitive access to the items. + """ + + def __init__(self, wrap_dict: Optional[Dict[str, _VT]] = None): + self._dict = wrap_dict if wrap_dict is not None else dict() + + @staticmethod + def from_dict(data_dict: Dict[str, _VT]) -> CaseInsensitiveDict[_VT]: + return CaseInsensitiveDict[_VT](wrap_dict={k.lower(): v for k, v in data_dict.items()}) + + @property + def backing_dict(self) -> Dict[str, _VT]: + return self._dict + + def __contains__(self, __k: object) -> bool: + if type(__k) is str: + return self.transform_key(__k) in self._dict + return False + + def __setitem__(self, __k: str, __v: _VT) -> None: + """ + Will always raise an exception + """ + raise NotImplementedError( + "__setitem__ is not supported on CaseInsensitiveDict to prevent inconsistent state between views.") + + def __delitem__(self, __k: str) -> None: + """ + Will always raise an exception + """ + raise NotImplementedError( + "__delitem__ is not supported on CaseInsensitiveDict to prevent inconsistent state between views.") + + def __getitem__(self, __k: str) -> _VT: + return self._dict[self.transform_key(__k)] + + def __len__(self) -> int: + return len(self._dict) + + def __iter__(self) -> Iterator[str]: + return iter(self._dict) + + def transform_key(self, __k: str) -> str: + return __k.lower() + + def select(self, selector: Callable[[str, _VT], bool]) -> CaseInsensitiveDictView[_VT]: + """ + Returns a view of the dictionary only containing the items for which selector returned true + :param selector: Function for selecting items that should be present in the dict view + :return: A dict view + """ + return CaseInsensitiveDictView(self, {k for k, v in self.items() if selector(k, v)}) + + + +class CaseInsensitiveDictView(MutableMapping[str, _VT], Generic[_VT]): + """ + Provides view capabilities for :class:`pydapsys.util.structs.CaseInsensitiveDict` + """ + + def __init__(self, source: CaseInsensitiveDict[_VT], elements: Set[str]): + self._source = source + self._elements = elements + + def __setitem__(self, __k: str, __v: _VT) -> None: + raise NotImplementedError( + "__setitem__ is not supported on CaseInsensitiveDictView to prevent inconsistent state between views.") + + def __delitem__(self, __k: str) -> None: + raise NotImplementedError( + "__delitem__ is not supported on CaseInsensitiveDictView to prevent inconsistent state between views.") + + def __getitem__(self, __k: str) -> _VT: + if self._source.transform_key(__k) not in self._elements: + raise KeyError(f"Key {__k} is not contained in this view") + return self._source[__k] + + def __len__(self) -> int: + return len(self._elements) + + def __iter__(self) -> Iterator[str]: + return iter(self._elements) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4ea46c1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,18 @@ +[tool.poetry] +name = "pydapsys" +version = "0.1.0" +description = "Read recordings made with DAPSYS" +authors = ["Peter Konradi "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.8" +numpy = "^1.21" +neo = "^0.11.1" + +[tool.poetry.extras] +neo = ["neo"] + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29