Skip to content

Commit

Permalink
Merge pull request #5 from BEL-CO/fix_4-version
Browse files Browse the repository at this point in the history
Fix 4 version
  • Loading branch information
ViridianForge authored Dec 6, 2018
2 parents 2070077 + 71ddc74 commit 9fff187
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 74 deletions.
50 changes: 48 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,48 @@
# mffpy
Reader for EGI's MFF file format.
# Introduction

`mffpy` is a lean reader for EGI's MFF file format. These files are
directories containing several files of mostly xml files, but also binary
files.

The main entry point into the library is class `Reader` that accesses a
selection of functions in the .mff directory to return signal data and its meta
information.

## View the Docs

All documentation and API guidance are generated from the python doc-strings
and this README file using pydoc-markdown. To view the docs:

* install pydoc-markdown: `pip install pydoc-markdown`
* build and run: `pydocmd build; pydocmd serve`
* Navigate to the [docs](http://localhost:8000)

## Example Code

### Example 1: Basic Information

```python
import mffpy
f = mffpy.Reader("./examples/example_1.mff")
print("time and date of the start of recording:", fo.startdatetime)
print("number of channels:", fo.num_channels)
print("sampling rates:", fo.sampling_rates, "(in Hz)")
print("durations:", fo.durations, "(in sec.)")
print("Here's the epoch information")
for i, e in enumerate(fo.epochs):
print("Epoch number", i)
print(e)
```

### Example 2: Reading Samples

```python
import mffpy
fo = mffpy.Reader("./examples/example_1.mff")
fo.set_unit('EEG', 'uV')
eeg_in_mV, t0_EEG = fo.get_physical_samples_from_epoch(fo.epochs[0], dt=0.1)['EEG']
fo.set_unit('EEG', 'V')
eeg_in_V, t0_EEG = fo.get_physical_samples_from_epoch(fo.epochs[0], dt=0.1)['EEG']
print('data in mV:', eeg_in_mV[0])
print('data in V :', eeg_in_V[0])
```
179 changes: 130 additions & 49 deletions mffpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,96 @@

class Reader(MFFDirectory):
"""
A class to read signal data in .mff format.
Attributes
----------
sampling_rates : `dict` of `float`
sampling rate in Hz by channel type
durations : `dict` of `float`
duration in seconds by channel type
startdatetime : `datetime.datetime`
UTC start date and time of the recording
Create an .mff reader
class `Reader` is the main entry point to `mffpy`'s functionality.
Methods
-------
set_unit(channel_type, unit)
set the physical unit of channel type
set_calibration(channel_type, cal)
set the calibration of channel type
get_physical_samples(t0=None, dt=None, channels=None, block_slice=None)
return physical samples fo specified channels in the range t0 ->
t0+dt (in seconds).
get_physical_samples_from_epoch(epoch, t0=None, dt=None, channels=None)
return physical samples fo specified channels in the range t0 ->
t0+dt (in seconds). t0 is offset by the epoch start.
Example use:
```python
import mffpy
fo = mffpy.Reader('./examples/example_1.mff')
fo.set_unit('EEG', 'uV')
X = fo.read_physical_samples_from_epoch(
fo.epochs[0], channels=['EEG'])
```
"""

@cached_property
def sampling_rates(self):
"""sampling rates by channel type"""
"""
```python
Reader.sampling_rates
```
sampling rates by channel type
Return dictionary of sampling rate by channel type. Each
sampling rate is returned in Hz as a float.
"""
return {
fn: bin_file.sampling_rate
for fn, bin_file in self._blobs.items()
}

@cached_property
def durations(self):
"""recorded durations by channel type"""
"""
```python
Reader.durations
```
recorded durations by channel type
Return dictionary of duration by channel type. Each
duration is returned in seconds as a float.
"""
return {
fn: bin_file.duration
for fn, bin_file in self._blobs.items()
}

@cached_property
def startdatetime(self):
"""UTC start time of the recording"""
"""
```python
Reader.startdatetime
```
UTC start date and time of the recording
Return UTC start date and time of the recording. The
returned object is of type `datetime.datetime`.
"""
info = xml_files.open(self.filename('info'))
return info.recordTime

@property
def units(self):
"""
```python
Reader.units
```
Return dictionary of units by channel type. Each unit is returned as a
`str` of SI units (micro: `'u'`).
"""
return {
fn: bin_file.unit
for fn, bin_file in self._blobs.items()
}

@cached_property
def num_channels(self):
"""
```python
Reader.num_channels
```
Return dictionary of number of channels by channel type. Each
number is returned as an `int`.
"""
return {
fn: bin_file.num_channels
for fn, bin_file in self._blobs.items()
}

@cached_property
def _blobs(self):
"""return dictionary of `BinFile` data readers by signal type"""
Expand All @@ -63,16 +106,34 @@ def _blobs(self):
return __blobs

def set_unit(self, channel_type, unit):
"""set physical unit of a channel type"""
"""set output units for a type of channels
Set physical unit of a channel type. The allowed conversion
values for `unit` depend on the original unit. We allow all
combinations of conversions of 'V', 'mV', 'uV'.
**Arguments**
* **`channel_type`**: `str` with the channel type.
* **`unit`**: `str` with the unit you would like to convert to.
**Example use**
```python
import mffpy
fo = mffpy.Reader('./examples/example_1.mff')
fo.set_unit('EEG', 'uV')
```
"""
self._blobs[channel_type].unit = unit

def set_calibration(self, channel_type, cal):
"""set calibration of a channel type"""
self._blobs[channel_type].calibration = cal

def get_physical_samples(self, t0=None, dt=None, channels=None, block_slice=None):
"""return signal data in the range `(t0, t0+dt)` in seconds from
`channels`"""
def get_physical_samples(self, t0=0.0, dt=None, channels=None, block_slice=None):
"""return signal data in the range `(t0, t0+dt)` in seconds from `channels`
"""
if channels is None:
channels = list(self._blobs.keys())

Expand All @@ -82,30 +143,50 @@ def get_physical_samples(self, t0=None, dt=None, channels=None, block_slice=None
if typ in channels
}

def get_physical_samples_from_epoch(self, epoch, t0=None, dt=None, channels=None):
"""return `dict` of signals by channels in the range `(t0, t0+dt)` relative to `epoch.t0`.
Parameters
----------
epoch : `xml_files.Epoch` (default is `None`)
The epoch from which you would like to get data.
`t0` : `float` or `None`
relative offset into the data from epoch start
`dt` : `0<float<epoch.dt-t0` or `None` (default is `None`)
requested signal duration. Value `None` defaults to maximum from start
channels : `list` of `str` or `None` (default is `None`)
Select your channel types. Each channel type will be a key in the
returned `dict`. `None` defaults to all available channels.
Note
----
def get_physical_samples_from_epoch(self, epoch, t0=0.0, dt=None, channels=None):
"""
return samples and start time by channels of an epoch
Returns a `dict` of tuples of [0] signal samples by channel names given
and [1] the start time in seconds, with keys from the list `channels`.
The samples will be in the range `(t0, t0+dt)` taken relative to
`epoch.t0`.
**Arguments**
* **`epoch`**: `xml_files.Epoch` from which you would like to get data.
* **`t0`**: `float` with relative offset in seconds into the data from
epoch start.
* **`dt`**: `float` with requested signal duration in seconds. Value
`None` defaults to maximum starting at `t0`.
* **`channels`**: `list` of channel-type `str` each of which will be a
key in the returned `dict`. `None` defaults to all available channels.
**Note**
* The start time of the returned data is `epoch.t0` seconds from
recording start.
* Only the epoch data can be requested. If you want to pad them, do it
yourself.
* Only the epoch data can be requested. If you want to pad these, do
it yourself.
* No interpolation will be performed to correct for the fact that `t0`
falls in between samples.
**Example use**
```python
import mffpy
fo = mffpy.Reader('./examples/example_1.mff')
X = fo.read_physical_samples_from_epoch(fo.epochs[0], t0, dt)
eeg, t0_eeg = X['EEG']
```
"""
assert isinstance(epoch, xml_files.Epoch), "argument epoch of type %s [requires %s]"%(type(epoch), xml_files.Epoch)
assert t0 is None or t0 >= 0.0, "Only positve `t0` allowed [%s]"%t0
assert t0 >= 0.0, "Only positve `t0` allowed [%s]"%t0
dt = dt if dt is None or 0.0 < dt < epoch.dt-t0 else None
return self.get_physical_samples(
t0, dt, channels, block_slice=epoch.block_slice)
3 changes: 2 additions & 1 deletion mffpy/bin_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ def scale(self):
return self._scale

def get_physical_samples(self, t0=0.0, dt=None, block_slice=None, dtype=np.float32):
return (self.calibration*self.scale*self.read_raw_samples(t0, dt, block_slice=block_slice)).astype(dtype)
samples, start_time = self.read_raw_samples(t0, dt, block_slice=block_slice)
return (self.calibration*self.scale*samples).astype(dtype), start_time
1 change: 1 addition & 0 deletions mffpy/cached_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ def _cached_property(self):
ans = fn(self)
setattr(self, cached_prop, ans)
return ans
_cached_property.__doc__ = fn.__doc__
return _cached_property
32 changes: 22 additions & 10 deletions mffpy/mffdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
from os import listdir
from collections import defaultdict, namedtuple
from os.path import join, isfile, exists, splitext, basename
from os.path import join, exists, splitext, basename
from .cached_property import cached_property

from . import xml_files
Expand All @@ -11,23 +11,27 @@


class MFFDirectory(str):
"""MFF-type directory name.
Methods:
__new__(cls, s) : `s` is the path to an .mff directory.
""".mff directory path
An `MFFDirectory` string contains the path to a valid .mff directory.
"""

_extensions = ('.mff',)
_ext_err = 'Unknown file type ["%s"]'
_re_nu = re.compile(r'\d+')

def __init__(self, filename):
"""initialize new .mff directory instance
**Parameters:**
`filename` (str) - the full path to the .mff directory.
"""
self._find_files_by_type()
self._check()

def _find_files_by_type(self):
"""Reads the directory and sorts filenames by
extensions in property `files_by_type`"""
"""Reads the directory and sorts filenames by extensions in property `files_by_type`
"""
self.files_by_type = defaultdict(list)

for fbase, ext in map(splitext, listdir(self)):
Expand All @@ -41,8 +45,8 @@ def filename(self, basename):
raise ValueError('No file with basename "%s" in directory "%s".'%(basename, super().__str__()))

def info(self, i=None):
"""returns filename `<self>/file.xml` if `i is None`, otherwise
`<self>/file<i>.xml`."""
"""returns filename `<self>/file.xml` if `i is None`, otherwise `<self>/file<i>.xml`
"""
return self.filename('info'+(str(i) if i else ''))

def signals_with_info(self):
Expand All @@ -57,10 +61,18 @@ def signals_with_info(self):

@cached_property
def epochs(self):
"""
```python
Epochs.epochs
```
Return list of epochs. Each epoch is of class `Epoch`.
"""
return xml_files.open(self.filename('epochs'))

def _check(self):
"""Checks the .mff directory for completeness."""
"""Checks the .mff directory for completeness
"""
# MFF directory should have the right extension
assert splitext(self)[1] in self._extensions, self._ext_err%super().__str__()
# For each `signal%i.bin`, there should be an `info%i.xml`
Expand Down
Loading

0 comments on commit 9fff187

Please sign in to comment.