-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EDF time slicing #15
EDF time slicing #15
Changes from all commits
1d5fdc4
a0533a7
d9efa22
ab8eafa
66e600e
eaa8e3f
50a326a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import datetime | ||
|
||
from copy import deepcopy | ||
from typing import Union | ||
from pathlib import Path | ||
from typing import Literal, Optional | ||
|
||
|
@@ -17,6 +18,99 @@ | |
from neuroconv.utils import DeepDict, dict_deep_update | ||
|
||
|
||
def get_recording_start_time(file_path: Union[str, Path]): | ||
""" | ||
Retrieve the recording start time from metadata in the specified folder. | ||
|
||
Parameters: | ||
----------- | ||
file_path : Union[str, Path] | ||
Path to the "metaData.json" file with recording start time details. | ||
|
||
Returns: | ||
-------- | ||
datetime.datetime | ||
A datetime object representing the session start time, based on the metadata's year, month, day, hour, minute, | ||
second, and millisecond fields. | ||
|
||
Raises: | ||
------- | ||
AssertionError | ||
If the "metaData.json" file is not found in the specified folder path. | ||
KeyError | ||
If any of the required time fields ("year", "month", "day", "hour", "minute", "second", "msec") are missing | ||
from the metadata. | ||
|
||
Notes: | ||
------ | ||
- The function expects a "recordingStartTime" key in the metadata JSON, which contains start time details. | ||
If not present, the top-level JSON object is assumed to contain the time information. | ||
- The "msec" field in the metadata is converted from milliseconds to microseconds for compatibility with the datetime | ||
microsecond field. | ||
""" | ||
|
||
## Read metadata | ||
with open(file_path) as f: | ||
general_metadata = json.load(f) | ||
|
||
if "recordingStartTime" in general_metadata: | ||
start_time_info = general_metadata["recordingStartTime"] | ||
else: | ||
start_time_info = general_metadata | ||
|
||
required_keys = ["year", "month", "day", "hour", "minute", "second", "msec"] | ||
for key in required_keys: | ||
if key not in start_time_info: | ||
raise KeyError(f"Missing required key '{key}' in the metadata") | ||
|
||
session_start_time = datetime.datetime( | ||
year=start_time_info["year"], | ||
month=start_time_info["month"], | ||
day=start_time_info["day"], | ||
hour=start_time_info["hour"], | ||
minute=start_time_info["minute"], | ||
second=start_time_info["second"], | ||
microsecond=start_time_info["msec"] * 1000, # Convert milliseconds to microseconds | ||
) | ||
|
||
return session_start_time | ||
|
||
|
||
def get_miniscope_timestamps(file_path: Union[str, Path]): | ||
""" | ||
Retrieve the Miniscope timestamps from a CSV file and convert them to seconds. | ||
|
||
Parameters: | ||
----------- | ||
file_path : Union[str, Path] | ||
Path to the Miniscope "timeStamps.csv" file, which includes timestamps in milliseconds. | ||
|
||
Returns: | ||
-------- | ||
np.ndarray | ||
A NumPy array containing the Miniscope timestamps in seconds, converted from the original milliseconds. | ||
|
||
Raises: | ||
------- | ||
AssertionError | ||
If the "timeStamps.csv" file is not found in the specified Miniscope folder path. | ||
|
||
Notes: | ||
------ | ||
- This function expects the timestamps CSV file to have a column named "Time Stamp (ms)" with values in milliseconds. | ||
- The timestamps are converted from milliseconds to seconds for compatibility with other functions that expect time | ||
values in seconds. | ||
""" | ||
|
||
import pandas as pd | ||
|
||
timetsamps_df = pd.read_csv(file_path) | ||
timestamps_milliseconds = timetsamps_df["Time Stamp (ms)"].values.astype(float) | ||
timestamps_seconds = timestamps_milliseconds / 1000.0 | ||
|
||
return np.asarray(timestamps_seconds) | ||
|
||
|
||
class MiniscopeImagingExtractor(MultiImagingExtractor): | ||
|
||
def __init__(self, folder_path: DirectoryPath): | ||
|
@@ -205,36 +299,6 @@ def __init__(self, folder_path: DirectoryPath): | |
|
||
self.photon_series_type = "OnePhotonSeries" | ||
|
||
def _get_session_start_time(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another, equally valid option, is to make these functions static or class methods of the class. This is more of taste and preference and depends on how do you want to organize your code. |
||
general_metadata_json = self.session_folder / "metaData.json" | ||
assert general_metadata_json.exists(), f"General metadata json not found in {self.session_folder}" | ||
|
||
## Read metadata | ||
with open(general_metadata_json) as f: | ||
general_metadata = json.load(f) | ||
|
||
if "recordingStartTime" in general_metadata: | ||
start_time_info = general_metadata["recordingStartTime"] | ||
else: | ||
start_time_info = general_metadata | ||
|
||
required_keys = ["year", "month", "day", "hour", "minute", "second", "msec"] | ||
for key in required_keys: | ||
if key not in start_time_info: | ||
raise KeyError(f"Missing required key '{key}' in the metadata") | ||
|
||
session_start_time = datetime.datetime( | ||
year=start_time_info["year"], | ||
month=start_time_info["month"], | ||
day=start_time_info["day"], | ||
hour=start_time_info["hour"], | ||
minute=start_time_info["minute"], | ||
second=start_time_info["second"], | ||
microsecond=start_time_info["msec"] * 1000, # Convert milliseconds to microseconds | ||
) | ||
|
||
return session_start_time | ||
|
||
def get_metadata(self) -> DeepDict: | ||
from neuroconv.tools.roiextractors import get_nwb_imaging_metadata | ||
|
||
|
@@ -243,7 +307,10 @@ def get_metadata(self) -> DeepDict: | |
metadata = dict_deep_update(metadata, default_metadata) | ||
metadata["Ophys"].pop("TwoPhotonSeries", None) | ||
|
||
session_start_time = self._get_session_start_time() | ||
general_metadata_json = self.session_folder / "metaData.json" | ||
assert general_metadata_json.exists(), f"General metadata json not found in {self.session_folder}" | ||
session_start_time = get_recording_start_time(file_path=general_metadata_json) | ||
|
||
metadata["NWBFile"].update(session_start_time=session_start_time) | ||
|
||
device_metadata = metadata["Ophys"]["Device"][0] | ||
|
@@ -267,22 +334,15 @@ def get_metadata_schema(self) -> dict: | |
return metadata_schema | ||
|
||
def get_original_timestamps(self) -> np.ndarray: | ||
|
||
timestamps_file_path = self.miniscope_folder / "timeStamps.csv" | ||
assert timestamps_file_path.exists(), f"Miniscope timestamps file not found in {self.miniscope_folder}" | ||
|
||
import pandas as pd | ||
|
||
timetsamps_df = pd.read_csv(timestamps_file_path) | ||
timestamps_milliseconds = timetsamps_df["Time Stamp (ms)"].values.astype(float) | ||
timestamps_seconds = timestamps_milliseconds / 1000.0 | ||
|
||
timestamps_seconds = get_miniscope_timestamps(file_path=timestamps_file_path) | ||
# Shift when the first timestamp is negative | ||
# TODO: Figure why, I copied from miniscope | ||
# TODO: Figure why, I copied from Miniscope. Need to shift also session_start_time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an answer to my why? or it was your why? The goal of this is to set the timestamps to start at 0, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was part od the TODO. If we saved the session start time from the metadata, and the timestamps refer to that date. When we shift the timestamps to zero we need to shift back the session start time as well. But I left it there as a comment because I am not sure how to handle this. I will open a follow up PR only on this |
||
if timestamps_seconds[0] < 0.0: | ||
timestamps_seconds += abs(timestamps_seconds[0]) | ||
|
||
return np.asarray(timestamps_seconds) | ||
return timestamps_seconds | ||
|
||
def add_to_nwbfile( | ||
self, | ||
|
@@ -308,7 +368,7 @@ def add_to_nwbfile( | |
imaging_extractor.set_times(times=miniscope_timestamps) | ||
|
||
device_metadata = metadata["Ophys"]["Device"][0] | ||
# Cast to string because miniscope extension requires so | ||
# Cast to string because Miniscope extension requires so | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gratzie! |
||
device_metadata["gain"] = str(device_metadata["gain"]) | ||
device_metadata.pop("ewl") | ||
add_miniscope_device(nwbfile=nwbfile, device_metadata=device_metadata) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,57 @@ | ||
from pydantic import FilePath | ||
from pathlib import Path | ||
|
||
from neuroconv.basedatainterface import BaseDataInterface | ||
from pynwb import NWBFile, TimeSeries | ||
from pynwb.device import Device | ||
|
||
from mne.io import read_raw_edf | ||
from datetime import datetime, timedelta | ||
import numpy as np | ||
|
||
|
||
class Zaki2024EDFInterface(BaseDataInterface): | ||
|
||
def __init__(self, file_path: FilePath, verbose: bool = False): | ||
|
||
def __init__( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these should be conversion options and not init arguments, as you might want to use run-time information (from other interfaces for example) to determine them. |
||
self, | ||
file_path: Path, | ||
verbose: bool = False, | ||
): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, if you add them here or in the conversion options (add_to_nwbfile) I think a docstring would be highly beneficial. |
||
self.file_path = Path(file_path) | ||
self.verbose = verbose | ||
super().__init__(file_path=file_path) | ||
|
||
def get_timestamps_reference_time(self): | ||
def add_to_nwbfile( | ||
self, | ||
nwbfile: NWBFile, | ||
stub_test: bool = False, | ||
stub_frames: int = 100, | ||
start_datetime_timestamp: datetime = None, | ||
stop_datetime_timestamp: datetime = None, | ||
**conversion_options, | ||
) -> NWBFile: | ||
""" | ||
Get datetime object of the first frame of the data in the .edf file. | ||
Adds data from EEG, EMG, temperature, and activity channels to an NWBFile. | ||
|
||
Returns | ||
Parameters | ||
---------- | ||
timestamps_reference_time : datetime.datetime | ||
datetime object of the first frame of the data in the .edf file. | ||
nwbfile : NWBFile | ||
The NWBFile object to which data will be added. | ||
stub_test : bool, optional | ||
If True, loads only a subset of frames (controlled by `stub_frames` parameter) | ||
to facilitate testing and faster execution. Default is False. | ||
stub_frames : int, optional | ||
The number of frames to load if `stub_test` is True. Default is 100. | ||
start_datetime_timestamp : datetime, optional | ||
The starting timestamp for slicing the data. If specified, data will be included | ||
only from this time onward. Default is None, which includes data from the start. | ||
stop_datetime_timestamp : datetime, optional | ||
The ending timestamp for slicing the data. If specified, data will be included | ||
only up to this time. Default is None, which includes data until the end. | ||
**conversion_options | ||
Additional options for data conversion (not currently used directly in this function). | ||
|
||
Returns | ||
------- | ||
NWBFile | ||
The NWBFile object with added data and metadata from the specified channels. | ||
""" | ||
edf_reader = read_raw_edf(input_fname=self.file_path, verbose=self.verbose) | ||
return edf_reader.info["meas_date"] | ||
|
||
def add_to_nwbfile( | ||
self, nwbfile: NWBFile, stub_test: bool = False, stub_frames: int = 100, **conversion_options | ||
) -> NWBFile: | ||
|
||
channels_dict = { | ||
"Temp": { | ||
"name": "TemperatureSignal", | ||
|
@@ -61,13 +80,33 @@ def add_to_nwbfile( | |
edf_reader = read_raw_edf(input_fname=self.file_path, verbose=self.verbose) | ||
data, times = edf_reader.get_data(picks=list(channels_dict.keys()), return_times=True) | ||
data = data.astype("float32") | ||
# TODO select the correct time range | ||
if start_datetime_timestamp is not None: | ||
# Get edf start_time in datetime format | ||
edf_start_time = edf_reader.info["meas_date"] | ||
# Convert relative edf timestamps to datetime timestamps | ||
edf_start_time = edf_start_time.replace(tzinfo=None) | ||
edf_datetime_timestamps = [edf_start_time + timedelta(seconds=t) for t in times] | ||
# Find the indices of the timestamps within the time range | ||
start_idx = np.searchsorted(edf_datetime_timestamps, start_datetime_timestamp, side="left") | ||
end_idx = np.searchsorted(edf_datetime_timestamps, stop_datetime_timestamp, side="right") | ||
starting_time = edf_datetime_timestamps[start_idx] - start_datetime_timestamp | ||
starting_time = starting_time.total_seconds() | ||
else: | ||
start_idx = 0 | ||
end_idx = -1 | ||
starting_time = times[start_idx] | ||
|
||
# Slice the data and timestamps within the time range | ||
if stub_test: | ||
data = data[:, :stub_frames] | ||
times = times[:stub_frames] | ||
data = data[:, start_idx : start_idx + stub_frames] | ||
else: | ||
data = data[:, start_idx:end_idx] | ||
|
||
for channel_index, channel_name in enumerate(channels_dict.keys()): | ||
time_series_kwargs = channels_dict[channel_name].copy() | ||
time_series_kwargs.update(data=data[channel_index], timestamps=times) | ||
time_series_kwargs.update( | ||
data=data[channel_index], starting_time=starting_time, rate=edf_reader.info["sfreq"] | ||
) | ||
time_series = TimeSeries(**time_series_kwargs) | ||
nwbfile.add_acquisition(time_series) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also advice to name this
get_{something}_start_time
now that is not a method of the class or modify the docstring to make this clear.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed_ in get_recording_start_time