-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
309 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
"""SNIRF module for conversion to FIF.""" | ||
|
||
# Author: Robert Luke <[email protected]> | ||
# | ||
# License: BSD (3-clause) | ||
|
||
from ._snirf import read_raw_snirf |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
# Authors: Robert Luke <[email protected]> | ||
# | ||
# License: BSD (3-clause) | ||
|
||
import re as re | ||
import numpy as np | ||
|
||
from ..base import BaseRaw | ||
from ..meas_info import create_info | ||
from ...annotations import Annotations | ||
from ...utils import logger, verbose, fill_doc, warn, requires_h5py | ||
|
||
|
||
@requires_h5py | ||
@fill_doc | ||
def read_raw_snirf(fname, preload=False, verbose=None): | ||
"""Reader for a SNIRF fNIRS recording. | ||
This reader supports the .snirf file type only. Not the .jnirs version. | ||
Only support for continuous wave SNIRF data is currently implemented. | ||
Parameters | ||
---------- | ||
fname : str | ||
Path to the SNIRF data file. | ||
%(preload)s | ||
%(verbose)s | ||
Returns | ||
------- | ||
raw : instance of RawSNIRF | ||
A Raw object containing SNIRF data. | ||
See Also | ||
-------- | ||
mne.io.Raw : Documentation of attribute and methods. | ||
""" | ||
return RawSNIRF(fname, preload, verbose) | ||
|
||
|
||
def _open(fname): | ||
return open(fname, 'r', encoding='latin-1') | ||
|
||
|
||
@requires_h5py | ||
@fill_doc | ||
class RawSNIRF(BaseRaw): | ||
"""Raw object from a SNIRF fNIRS file. | ||
Parameters | ||
---------- | ||
fname : str | ||
Path to the SNIRF data file. | ||
%(preload)s | ||
%(verbose)s | ||
See Also | ||
-------- | ||
mne.io.Raw : Documentation of attribute and methods. | ||
""" | ||
|
||
@verbose | ||
def __init__(self, fname, preload=False, verbose=None): | ||
from ...externals.pymatreader.utils import _import_h5py | ||
h5py = _import_h5py() | ||
logger.info('Loading %s' % fname) | ||
|
||
dat = h5py.File(fname, 'r') | ||
|
||
if 'data2' in dat['nirs'].keys(): | ||
print("File contains multiple recordings. " | ||
"This is not supported.") | ||
|
||
data = np.array(dat.get('/nirs/data1/dataTimeSeries')).T | ||
last_samps = data.shape[1] - 1 | ||
|
||
samplingrate_raw = np.array(dat.get('nirs/data1/time')) | ||
sampling_rate = 0 | ||
if samplingrate_raw.shape == (2, 1): | ||
# specified as onset/samplerate | ||
warn("Onset sample rate SNIRF not yet supported") | ||
else: | ||
# specified as time points | ||
fs_diff = np.around(np.diff(samplingrate_raw), decimals=4) | ||
if len(np.unique(fs_diff)) == 1: | ||
# Uniformly sampled data | ||
sampling_rate = 1. / np.unique(fs_diff) | ||
else: | ||
# print(np.unique(fs_diff)) | ||
warn("Non uniform sampled data not supported") | ||
if sampling_rate == 0: | ||
warn("Unable to extract sample rate") | ||
|
||
sources = np.array(dat.get('nirs/probe/sourceLabels')) | ||
detectors = np.array(dat.get('nirs/probe/detectorLabels')) | ||
sources = [s.decode('UTF-8') for s in sources] | ||
detectors = [d.decode('UTF-8') for d in detectors] | ||
|
||
# Extract source and detector locations | ||
detPos3D = np.array(dat.get('nirs/probe/detectorPos3D')) | ||
srcPos3D = np.array(dat.get('nirs/probe/sourcePos3D')) | ||
|
||
assert len(sources) == srcPos3D.shape[0] | ||
assert len(detectors) == detPos3D.shape[0] | ||
|
||
if detPos3D.shape[1] == 2: | ||
detPos3D = np.hstack((detPos3D, np.zeros((len(detectors), 1)))) | ||
srcPos3D = np.hstack((srcPos3D, np.zeros((len(sources), 1)))) | ||
|
||
# Extract wavelengths | ||
fnirs_wavelengths = np.array(dat.get('nirs/probe/wavelengths')) | ||
fnirs_wavelengths = [int(w) for w in fnirs_wavelengths] | ||
|
||
# Extract channels | ||
def atoi(text): | ||
return int(text) if text.isdigit() else text | ||
|
||
def natural_keys(text): | ||
return [atoi(c) for c in re.split(r'(\d+)', text)] | ||
|
||
channels = np.array([name for name in dat['nirs']['data1'].keys()]) | ||
channels_idx = np.array(['measurementList' in n for n in channels]) | ||
channels = channels[channels_idx] | ||
channels = sorted(channels, key=natural_keys) | ||
|
||
chnames = [] | ||
for chan in channels: | ||
src_idx = int(np.array(dat.get('nirs/data1/' + | ||
chan + '/sourceIndex'))[0]) | ||
det_idx = int(np.array(dat.get('nirs/data1/' + | ||
chan + '/detectorIndex'))[0]) | ||
wve_idx = int(np.array(dat.get('nirs/data1/' + | ||
chan + '/wavelengthIndex'))[0]) | ||
ch_name = sources[src_idx - 1] + '_' + detectors[det_idx - 1] + \ | ||
' ' + str(fnirs_wavelengths[wve_idx - 1]) | ||
chnames.append(ch_name) | ||
|
||
# Create mne structure | ||
info = create_info(chnames, | ||
sampling_rate, | ||
ch_types='fnirs_cw_amplitude') | ||
|
||
subject_info = {} | ||
names = np.array(dat.get('nirs/metaDataTags/SubjectID')) | ||
subject_info['first_name'] = names[0].decode('UTF-8') | ||
info.update(subject_info=subject_info) | ||
|
||
LengthUnit = np.array(dat.get('/nirs/metaDataTags/LengthUnit')) | ||
LengthUnit = LengthUnit[0].decode('UTF-8') | ||
loc_scal = 1 | ||
if "cm" in LengthUnit: | ||
loc_scal = 100 | ||
elif "mm" in LengthUnit: | ||
loc_scal = 1000 | ||
|
||
for idx, chan in enumerate(channels): | ||
src_idx = int(np.array(dat.get('nirs/data1/' + | ||
chan + '/sourceIndex'))[0]) | ||
det_idx = int(np.array(dat.get('nirs/data1/' + | ||
chan + '/detectorIndex'))[0]) | ||
wve_idx = int(np.array(dat.get('nirs/data1/' + | ||
chan + '/wavelengthIndex'))[0]) | ||
info['chs'][idx]['loc'][0:3] = srcPos3D[src_idx - 1, :] / loc_scal | ||
info['chs'][idx]['loc'][3:6] = srcPos3D[src_idx - 1, :] / loc_scal | ||
info['chs'][idx]['loc'][6:9] = detPos3D[det_idx - 1, :] / loc_scal | ||
info['chs'][idx]['loc'][9] = fnirs_wavelengths[wve_idx - 1] | ||
|
||
super(RawSNIRF, self).__init__(info, preload, filenames=[fname], | ||
last_samps=[last_samps], | ||
verbose=verbose) | ||
|
||
# Extract annotations | ||
annot = Annotations([], [], []) | ||
for key in dat['nirs'].keys(): | ||
if 'stim' in key: | ||
data = np.array(dat.get('/nirs/' + key + '/data')) | ||
if not data.shape == (): | ||
annot.append(data[:, 0], 1.0, key[4:]) | ||
self.set_annotations(annot) | ||
|
||
# Reorder channels to match expected ordering in MNE | ||
num_chans = len(self.ch_names) | ||
chans = [] | ||
for idx in range(int(num_chans / 2)): | ||
chans.append(idx) | ||
chans.append(idx + int(num_chans / 2)) | ||
self.pick(picks=chans) | ||
|
||
def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | ||
"""Read a segment of data from a file.""" | ||
from ...externals.pymatreader.utils import _import_h5py | ||
h5py = _import_h5py() | ||
dat = h5py.File(self._filenames[0], 'r') | ||
dat = np.array(dat.get('/nirs/data1/dataTimeSeries')).T | ||
|
||
data[:] = dat[idx, start:stop] | ||
|
||
return data |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# -*- coding: utf-8 -*- | ||
# Authors: Robert Luke <[email protected]> | ||
# simplified BSD-3 license | ||
|
||
import os.path as op | ||
from numpy.testing import assert_allclose | ||
|
||
from mne.datasets.testing import data_path, requires_testing_data | ||
from mne.utils import run_tests_if_main, requires_h5py | ||
from mne.io import read_raw_snirf, read_raw_nirx | ||
from mne.io.tests.test_raw import _test_raw_reader | ||
|
||
fname_snirf_15_2_short = op.join(data_path(download=False), | ||
'SNIRF', | ||
'snirf_1_3_nirx_15_2_recording_w_short.snirf') | ||
|
||
fname_original = op.join(data_path(download=False), | ||
'NIRx', 'nirx_15_2_recording_w_short') | ||
|
||
|
||
@requires_testing_data | ||
@requires_h5py | ||
def test_snirf_basic(): | ||
"""Test reading NIRX files.""" | ||
raw = read_raw_snirf(fname_snirf_15_2_short, preload=True) | ||
|
||
# Test data import | ||
assert raw._data.shape == (26, 145) | ||
assert raw.info['sfreq'] == 12.5 | ||
|
||
# Test channel naming | ||
assert raw.info['ch_names'][:4] == ["S1_D1 760", "S1_D1 850", | ||
"S1_D9 760", "S1_D9 850"] | ||
assert raw.info['ch_names'][24:26] == ["S5_D13 760", "S5_D13 850"] | ||
|
||
# Test frequency encoding | ||
assert raw.info['chs'][0]['loc'][9] == 760 | ||
assert raw.info['chs'][1]['loc'][9] == 850 | ||
|
||
assert_allclose([-8.6765 * 1e-2, 0.0049 * 1e-2, -2.6167 * 1e-2], | ||
raw.info['chs'][1]['loc'][3:6], rtol=0.02) | ||
|
||
assert 'fnirs_cw_amplitude' in raw | ||
|
||
|
||
@requires_testing_data | ||
@requires_h5py | ||
def test_snirf_against_nirx(): | ||
"""Test against file snirf was created from.""" | ||
raw = read_raw_snirf(fname_snirf_15_2_short, preload=True) | ||
raw_orig = read_raw_nirx(fname_original, preload=True) | ||
|
||
# Check annotations are the same | ||
assert_allclose(raw.annotations.onset, raw_orig.annotations.onset) | ||
assert_allclose([float(d) for d in raw.annotations.description], | ||
[float(d) for d in raw_orig.annotations.description]) | ||
assert_allclose(raw.annotations.duration, raw_orig.annotations.duration) | ||
|
||
# Check names are the same | ||
assert raw.info['ch_names'] == raw_orig.info['ch_names'] | ||
|
||
# Check frequencies are the same | ||
num_chans = len(raw.ch_names) | ||
new_chs = raw.info['chs'] | ||
ori_chs = raw_orig.info['chs'] | ||
assert_allclose([new_chs[idx]['loc'][9] for idx in range(num_chans)], | ||
[ori_chs[idx]['loc'][9] for idx in range(num_chans)]) | ||
|
||
assert_allclose(raw.get_data(), raw_orig.get_data()) | ||
|
||
|
||
@requires_testing_data | ||
@requires_h5py | ||
def test_snirf_standard(): | ||
"""Test standard operations.""" | ||
_test_raw_reader(read_raw_snirf, fname=fname_snirf_15_2_short, | ||
boundary_decimal=0) # low fs | ||
|
||
|
||
run_tests_if_main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters