Skip to content

Commit

Permalink
Local state
Browse files Browse the repository at this point in the history
  • Loading branch information
codingchipmunk committed Feb 1, 2023
1 parent 4b9adc2 commit f059da1
Show file tree
Hide file tree
Showing 19 changed files with 1,250 additions and 0 deletions.
140 changes: 140 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# PyDapsys - Read DAPSYS recordings with Python

PyDapsys is a package to read neurography recordings made with [DAPSYS](http://dapsys.net/) (Data Acquisition Processor System). It is based on a reverse-engineered specification of the binary data format used by the latest DAPSYS version.

Optionally, the library provides functionality to store loaded data into [Neo](https://github.com/NeuralEnsemble/python-neo) datastrucutres, from where they can be exported into various other formats.

## Installation

Download the latest release from the Github releases page.

### Basic functionalities

Will only offer the data representation of PyDapsys, without ability to convert to Neo. Has only numpy as sole dependency.

`pip install {name_of_downloaded_wheel}.whl`

### With Neo converters

Install base library with additional dependencies required to load data into Neo datastructures. Writing Neo datastructures to some formats may require additional dependencies. Please see the Neo documentation for further information.

`pip install {name_of_downloaded_wheel}.whl[neo]`

## Usage

### Basics

A Dapsys file is made up of two parts: A sequential list of blocks or **pages**, which store either a text with a timestamp or a waveform with associated timestamps, and a table of contents (toc). The toc consists of **folders** and **streams**. Each page has an id unique in the context of the file. Streams in the toc have an array of ids of the pages belonging to the stream. A stream is either a text stream (referring only to text pages) or a data stream (referring only to recording pages).

#### Load a file

Use `read_file` to get the root of the table of contents and a dictionary which maps from the page ids to the object representing the page itself.

```python
from pydapsys.read import read_file
from pathlib import Path
MY_DAPSYS_FILE = Path(".")/"to"/"my"/"dapsys_file.dps"
toc_root, pages = read_file(MY_DAPSYS_FILE)
```

The `toc_root` object will have children, either folders (which, in turn, can have additional children) or streams. You can access the childrens by using the index-operator. Access to children is case-insensitive. This is done for conveniance and does not inlfuence the correctness, as DAPSYS itself does not allow two objects of the same (case insensitive) name to exist on the same hierachy level. For typed access you can use either `.f` to get folders or `.s` to only get streams:

```python
comment_stream = toc_root["comments"] # Will return the stream Comments, but is typed as generic stream
comment_stream = toc_root.s["coMMents"] # Will return the stream Comments, typed as Stream
top_folder = toc_root.f["Folder"] # will return the folder Folder
top_folder = toc_root.f["comments"] # will fail (exception), because comments is not a folder

# iterate over all folders:
for folder in toc_root.f.values():
...

# iterate over all streams:
for stream in toc_root.s.values():
...
```

#### Access data from a file

To get text data from a file, get the datastream object from the toc and access its `page_ids` property. For conveniance, the `__getitem__`, `__iter__` and `__contains__` methods of stream objects have been overloaded to return the result of the same operation on `page_ids`. From there, you can get the corresponding pages from the `pages` dict:

```python
from pydapsys.toc.entry import StreamType

def get_pages(stream, expected_stream_type: StreamType):
if stream.entry_type != expected_stream_type:
raise Exception(f"{stream.name} is not a {expected_stream_type.name} stream, but {stream.stream_type.name}")
return [pages[page_id] for page_id in stream] # or [pages[page_id] for page_id in stream.page_ids]

text_stream = ...
text_pages = get_pages(text_stream, StreamType.Text)

waveform_stream = ...
waveform_pages = get_pages(waveform_stream, StreamType.Waveform)
```

##### Text pages

A text page consists of three fields:

* `text`: The text stored in the page, string

* `timestamp_a`: The first timestamp of the page, float64 (seconds)

* `timestamp_b`: The second timestamp of the page (float64, seconds), which sometimes is not presented and is thus set to None

##### Waveform pages

Waveform pages consist of three fields:

* `values`: Values of the waveform, float32 (volt)

* `timestamps`: Timestamps corresponding to `values`, float64 (seconds)

* `interval`: Interval between values, float64 (seconds)

In **continuously sampled waveforms**, only the timestamp of the first value will be present, in addition to the sampling `interval`. The timestamps of the other values can be calculated by this two values.

**Irregularly sampled waveforms** will have one timestamp for each value, but no `interval`.

## Neo converters

The module `pydapsys.neo_convert` contains classes to convert a Dapsys recording to the Neo format. **IMPORTANT: importing the module without installing neo first will raise an exception**

As Dapsys files may have different structures, depending on how it was configured and what hardware is used, different converters are required for each file structure.

Currently there is only one converter available, for recordings made using a NI Pulse stimulator.

### NI Pulse stimulator

Converter class for Dapsys recording created using an NI Pulse stimulator. Puts everything into one neo sequence.
Waveform pages of the continuous recording are merged if the difference between a pair of consecutive pages is less than a specified threshold (`grouping_tolerance`).

```python
from pydapsys.neo_convert.ni_pulse_stim import NIPulseStimulatorToNeo

# convert a recording to a neo block
neo_block = NIPulseStimulatorToNeo(toc_root, pages, grouping_tolerance=1e-9).to_neo()
```

#### Expected file structure

{stim_folder} must be one of "NI Puls Stimulator", "pulse stimulator", "NI Pulse stimulator", but can be changed by adding entries to `NIPulseStimulatorToNeo.stim_foler_names`

* Root

* [Text] Comments -> Converted into a single event called "comments"

* {stim_folder}

* [Text] Pulses -> Converted into one neo event streams, one per unique text

* [Waveform] Continuous recording -> Converted into multiple AnalogSignals

* Responses

* Tracks for All Responses -> Optional. Will silently ignore spike trains if this folder does not exist

* ... [Text] tracks... -> Converted into spike trains


Empty file added pydapsys/__init__.py
Empty file.
Empty file.
156 changes: 156 additions & 0 deletions pydapsys/neo_convert/abstract_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from abc import ABC, abstractmethod
from typing import Mapping, Sequence, Union, Optional, Iterable, List, Dict

import neo
import numpy as np
import numpy.typing as npt
import quantities as pq
from pydapsys.neoconverter import recording_segment_end

from pydapsys.page import DataPage, WaveformPage
from pydapsys.toc.entry import Root, Stream, StreamType
from pydapsys.util.floats import float_comp


class DapsysToNeoConverter(ABC):
""" Converter to put Dapsys recordings into the neo structure
This abstract base class provides common functionalities to transform Dapsys streams into common neo structures
:param toc: Root of the table of contents
:type toc: class:`pydapsys.toc.entry.Root`
:param pages: Mapping between the id of the data page and itself
:type toc: class:`typing.Mapping[int, pydapsys.page.DataPage]`
"""

def __init__(self, toc: Root, pages: Mapping[int, DataPage]):
self.toc = toc
self.pages = pages

@abstractmethod
def to_neo(self) -> neo.Block:
"""
Create a neo structure based on the given recording
:return: A neo block containing the data from the recording
"""
...

def _pageids_to_event(self, page_ids: Union[Sequence[int], npt.NDArray[np.uint32]], name: str = "") -> neo.Event:
"""Converts data from a sequence (or numpy array) of page ids to a neo event.
The labels will be taken from the page text and the event times from the first timestamp (timestamp_a)
:param page_ids: Page ids of the comment pages
:param name: name of the returned neo event
:return: A neo event containing the text of the comment pages as labels and their first timestamps as times
"""
times = np.empty(len(page_ids), dtype=np.float64)
comments = np.empty(len(page_ids), dtype=str)
for i in range(len(page_ids)):
times[i] = self.pages[page_ids[i]].timestamp_a
comments[i] = self.pages[page_ids[i]].comment
return neo.Event(times=times, labels=comments, units=pq.second, name=name, copy=False)

def textstream_to_event(self, stream: Stream, name: Optional[str] = None) -> neo.Event:
"""Converts data from a text stream to a neo event.
Labels of the event will be the text from the pages and the event times the first timestamp (timestamp_a) from them.
:param stream: Stream to convert
:param name: name of the returned neo event, defaults to the name of the passed stream
:return: A neo event containing the text of the streams comment pages as labels and their first timestamps as times
"""
if stream.stream_type != StreamType.Text:
raise ValueError(f"StreamType.Text required for this operation, not {stream.stream_type.name}")
return self._pageids_to_event(stream.page_ids, name=stream.name if name is None else name)

def _pageids_to_spiketrain(self, page_ids: Union[Sequence[int], npt.NDArray[np.uint32]], t_stop: float,
name: str = "") -> neo.SpikeTrain:
"""Puts data from comment pages into a spike train. Requires an additional parameter t_stop for the equally named,
required parameter on :class:`neo.SpikeTrain`. t_stop must be greater than the last timestamp of the train.
The times of the spike train will be taken from the timestamp_a of the given comment pages.
:param page_ids: Page ids of the comment pages
:param t_stop: t_stop parameter to set on :class:`neo.SpikeTrain`
:param name: Name of the spike train, optional.
:return: A spike train build from the comment pages
"""
return neo.SpikeTrain(
times=np.fromiter((comment.timestamp_a for comment in (self.pages[pid] for pid in page_ids)),
dtype=np.float64, count=len(page_ids)), name=name, units=pq.second, t_stop=t_stop,
copy=False)

def textstream_to_spiketrain(self, stream: Stream, t_stop: float, name: Optional[str] = None) -> neo.SpikeTrain:
"""Puts data from a text stream into a spike train. Requires an additional parameter t_stop for the equally named,
required parameter on :class:`neo.SpikeTrain`. t_stop must be greater than the last timestamp of the train.
The times of the spike train will be taken from the timestamp_a of the streams comment pages.
:param stream: The stream to convert
:param t_stop: t_stop parameter to set on :class:`neo.SpikeTrain`
:param name: Name of the spike train. Will default to the name of the stream
:return: A spike train build from the given text stream
"""
if stream.stream_type != StreamType.Text:
raise ValueError(f"StreamType.Text required for this operation, not {stream.stream_type.name}")
return self._pageids_to_spiketrain(stream.page_ids, t_stop, name=stream.name if name is None else name)

def _pageids_to_events_by_comment_text(self, page_ids: Union[Sequence[int], npt.NDArray[np.uint32]]) -> Iterable[
neo.Event]:
"""Orders a number of comment pages by their text and emits one event for each unique text.
The times are loaded from the comment pages timestamp_a, will have no labels and the name of the events will be
the unique text.
:param page_ids: Ids of the comment pages
:return: An iterable of neo events
"""
comment_string_to_timestamps: Dict[str, List[float]] = dict()
for comment in (self.pages[pid] for pid in page_ids):
comment_string_to_timestamps.setdefault(comment.comment, list()).append(comment.timestamp_a)
for comment_string, comment_timestamps in comment_string_to_timestamps.items():
yield neo.Event(times=np.array(comment_timestamps, dtype=np.float64), units=pq.second, name=comment_string,
copy=False)

def textstream_to_events_by_comment_text(self, stream: Stream) -> Iterable[neo.Event]:
"""Orders the comment pages of a text stream by their text and emits one event for each unique text.
The times are loaded from the comment pages timestamp_a, will have no labels and the name of the events will be
the unique text.
:param stream: A text stream to convert
:return: An iterable of neo events
"""
if stream.stream_type != StreamType.Text:
raise ValueError(f"StreamType.Text required for this operation, not {stream.stream_type.name}")
return self._pageids_to_events_by_comment_text(stream.page_ids)

def _group_recordingsegments(self, rec_pages: Iterable[WaveformPage], tolerance: float = 1e-5) -> Iterable[
List[WaveformPage]]:
"""Groups consecutive recording pages into lists, if the difference between the end of the last page and the start
of the next one is less than the threshold and they have the same sampling interval
:param rec_pages: Recording pages to group. Must be in orderly sequence.
:param tolerance: Tolerance for grouping, defaults to 1e-5
:return: An iterable of lists containing grouped recording pages
"""
page_iter = iter(rec_pages)
current_set: List[WaveformPage] = [next(page_iter)]
for page in page_iter:
if not (float_comp(current_set[-1].interval, page.interval) and \
not float_comp(recording_segment_end(current_set[-1]) + current_set[-1].interval,
page.timestamps[0], epsilon=tolerance)):
current_set.append(page)
else:
yield current_set
current_set = [page]
yield current_set

def waveformstream_to_analogsignals(self, stream: Stream, tolerance: float = 1e-5) -> Iterable[neo.AnalogSignal]:
""" Groups consecutive pages of a waveform stream together, based on the given tolerance and creates one
AnalogSignal from each group.
:param stream: Data stream to convert
:param tolerance: Tolerance for grouping
:return: Analog signals created from grouped recording pages
"""
if stream.stream_type != StreamType.Waveform:
raise ValueError(f"StreamType.Waveform required for this operation, not {stream.stream_type.name}")
for segment_group in self._group_recordingsegments((self.pages[pid] for pid in stream.page_ids),
tolerance=tolerance):
continuous = np.concatenate(list(segment.values for segment in segment_group)).ravel()
yield neo.AnalogSignal(continuous, pq.volt,
t_start=segment_group[0].timestamps[0] * pq.second,
sampling_period=segment_group[0].interval * pq.second, copy=False)
Loading

0 comments on commit f059da1

Please sign in to comment.