-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EDF time slicing #15
EDF time slicing #15
Changes from 5 commits
1d5fdc4
a0533a7
d9efa22
ab8eafa
66e600e
eaa8e3f
50a326a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import datetime | ||
|
||
from copy import deepcopy | ||
from typing import Union | ||
from pathlib import Path | ||
from typing import Literal, Optional | ||
|
||
|
@@ -17,6 +18,101 @@ | |
from neuroconv.utils import DeepDict, dict_deep_update | ||
|
||
|
||
def get_session_start_time(folder_path: Union[str, Path]): | ||
""" | ||
Retrieve the session start time from metadata in the specified folder. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also advice to name this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed_ in get_recording_start_time |
||
Parameters: | ||
----------- | ||
folder_path : Union[str, Path] | ||
Path to the main session folder, expected to contain a "metaData.json" file with recording start time details. | ||
|
||
Returns: | ||
-------- | ||
datetime.datetime | ||
A datetime object representing the session start time, based on the metadata's year, month, day, hour, minute, | ||
second, and millisecond fields. | ||
|
||
Raises: | ||
------- | ||
AssertionError | ||
If the "metaData.json" file is not found in the specified folder path. | ||
KeyError | ||
If any of the required time fields ("year", "month", "day", "hour", "minute", "second", "msec") are missing | ||
from the metadata. | ||
|
||
Notes: | ||
------ | ||
- The function expects a "recordingStartTime" key in the metadata JSON, which contains start time details. | ||
If not present, the top-level JSON object is assumed to contain the time information. | ||
- The "msec" field in the metadata is converted from milliseconds to microseconds for compatibility with the datetime | ||
microsecond field. | ||
""" | ||
general_metadata_json = folder_path / "metaData.json" | ||
assert general_metadata_json.exists(), f"General metadata json not found in {folder_path}" | ||
## Read metadata | ||
with open(general_metadata_json) as f: | ||
general_metadata = json.load(f) | ||
|
||
if "recordingStartTime" in general_metadata: | ||
start_time_info = general_metadata["recordingStartTime"] | ||
else: | ||
start_time_info = general_metadata | ||
|
||
required_keys = ["year", "month", "day", "hour", "minute", "second", "msec"] | ||
for key in required_keys: | ||
if key not in start_time_info: | ||
raise KeyError(f"Missing required key '{key}' in the metadata") | ||
|
||
session_start_time = datetime.datetime( | ||
year=start_time_info["year"], | ||
month=start_time_info["month"], | ||
day=start_time_info["day"], | ||
hour=start_time_info["hour"], | ||
minute=start_time_info["minute"], | ||
second=start_time_info["second"], | ||
microsecond=start_time_info["msec"] * 1000, # Convert milliseconds to microseconds | ||
) | ||
|
||
return session_start_time | ||
|
||
|
||
def get_miniscope_timestamps(miniscope_folder_path: Union[str, Path]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment here about the file_path vs folder_path? |
||
""" | ||
Retrieve the Miniscope timestamps from a CSV file and convert them to seconds. | ||
|
||
Parameters: | ||
----------- | ||
miniscope_folder_path : Union[str, Path] | ||
Path to the folder containing the Miniscope "timeStamps.csv" file, which includes timestamps in milliseconds. | ||
|
||
Returns: | ||
-------- | ||
np.ndarray | ||
A NumPy array containing the Miniscope timestamps in seconds, converted from the original milliseconds. | ||
|
||
Raises: | ||
------- | ||
AssertionError | ||
If the "timeStamps.csv" file is not found in the specified Miniscope folder path. | ||
|
||
Notes: | ||
------ | ||
- This function expects the timestamps CSV file to have a column named "Time Stamp (ms)" with values in milliseconds. | ||
- The timestamps are converted from milliseconds to seconds for compatibility with other functions that expect time | ||
values in seconds. | ||
""" | ||
timestamps_file_path = miniscope_folder_path / "timeStamps.csv" | ||
assert timestamps_file_path.exists(), f"Miniscope timestamps file not found in {miniscope_folder_path}" | ||
import pandas as pd | ||
|
||
timetsamps_df = pd.read_csv(timestamps_file_path) | ||
timestamps_milliseconds = timetsamps_df["Time Stamp (ms)"].values.astype(float) | ||
timestamps_seconds = timestamps_milliseconds / 1000.0 | ||
|
||
return np.asarray(timestamps_seconds) | ||
|
||
|
||
class MiniscopeImagingExtractor(MultiImagingExtractor): | ||
|
||
def __init__(self, folder_path: DirectoryPath): | ||
|
@@ -205,36 +301,6 @@ def __init__(self, folder_path: DirectoryPath): | |
|
||
self.photon_series_type = "OnePhotonSeries" | ||
|
||
def _get_session_start_time(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another, equally valid option, is to make these functions static or class methods of the class. This is more of taste and preference and depends on how do you want to organize your code. |
||
general_metadata_json = self.session_folder / "metaData.json" | ||
assert general_metadata_json.exists(), f"General metadata json not found in {self.session_folder}" | ||
|
||
## Read metadata | ||
with open(general_metadata_json) as f: | ||
general_metadata = json.load(f) | ||
|
||
if "recordingStartTime" in general_metadata: | ||
start_time_info = general_metadata["recordingStartTime"] | ||
else: | ||
start_time_info = general_metadata | ||
|
||
required_keys = ["year", "month", "day", "hour", "minute", "second", "msec"] | ||
for key in required_keys: | ||
if key not in start_time_info: | ||
raise KeyError(f"Missing required key '{key}' in the metadata") | ||
|
||
session_start_time = datetime.datetime( | ||
year=start_time_info["year"], | ||
month=start_time_info["month"], | ||
day=start_time_info["day"], | ||
hour=start_time_info["hour"], | ||
minute=start_time_info["minute"], | ||
second=start_time_info["second"], | ||
microsecond=start_time_info["msec"] * 1000, # Convert milliseconds to microseconds | ||
) | ||
|
||
return session_start_time | ||
|
||
def get_metadata(self) -> DeepDict: | ||
from neuroconv.tools.roiextractors import get_nwb_imaging_metadata | ||
|
||
|
@@ -243,7 +309,8 @@ def get_metadata(self) -> DeepDict: | |
metadata = dict_deep_update(metadata, default_metadata) | ||
metadata["Ophys"].pop("TwoPhotonSeries", None) | ||
|
||
session_start_time = self._get_session_start_time() | ||
session_start_time = get_session_start_time(folder_path=self.session_folder) | ||
|
||
metadata["NWBFile"].update(session_start_time=session_start_time) | ||
|
||
device_metadata = metadata["Ophys"]["Device"][0] | ||
|
@@ -267,22 +334,13 @@ def get_metadata_schema(self) -> dict: | |
return metadata_schema | ||
|
||
def get_original_timestamps(self) -> np.ndarray: | ||
|
||
timestamps_file_path = self.miniscope_folder / "timeStamps.csv" | ||
assert timestamps_file_path.exists(), f"Miniscope timestamps file not found in {self.miniscope_folder}" | ||
|
||
import pandas as pd | ||
|
||
timetsamps_df = pd.read_csv(timestamps_file_path) | ||
timestamps_milliseconds = timetsamps_df["Time Stamp (ms)"].values.astype(float) | ||
timestamps_seconds = timestamps_milliseconds / 1000.0 | ||
|
||
timestamps_seconds = get_miniscope_timestamps(miniscope_folder_path=self.miniscope_folder) | ||
# Shift when the first timestamp is negative | ||
# TODO: Figure why, I copied from miniscope | ||
# TODO: Figure why, I copied from Miniscope. Need to shift also session_start_time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an answer to my why? or it was your why? The goal of this is to set the timestamps to start at 0, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was part od the TODO. If we saved the session start time from the metadata, and the timestamps refer to that date. When we shift the timestamps to zero we need to shift back the session start time as well. But I left it there as a comment because I am not sure how to handle this. I will open a follow up PR only on this |
||
if timestamps_seconds[0] < 0.0: | ||
timestamps_seconds += abs(timestamps_seconds[0]) | ||
|
||
return np.asarray(timestamps_seconds) | ||
return timestamps_seconds | ||
|
||
def add_to_nwbfile( | ||
self, | ||
|
@@ -308,7 +366,7 @@ def add_to_nwbfile( | |
imaging_extractor.set_times(times=miniscope_timestamps) | ||
|
||
device_metadata = metadata["Ophys"]["Device"][0] | ||
# Cast to string because miniscope extension requires so | ||
# Cast to string because Miniscope extension requires so | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gratzie! |
||
device_metadata["gain"] = str(device_metadata["gain"]) | ||
device_metadata.pop("ewl") | ||
add_miniscope_device(nwbfile=nwbfile, device_metadata=device_metadata) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,33 +1,29 @@ | ||
from pydantic import FilePath | ||
from pathlib import Path | ||
|
||
from neuroconv.basedatainterface import BaseDataInterface | ||
from pynwb import NWBFile, TimeSeries | ||
from pynwb.device import Device | ||
|
||
from mne.io import read_raw_edf | ||
from datetime import datetime, timedelta | ||
import numpy as np | ||
|
||
|
||
class Zaki2024EDFInterface(BaseDataInterface): | ||
|
||
def __init__(self, file_path: FilePath, verbose: bool = False): | ||
|
||
def __init__( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these should be conversion options and not init arguments, as you might want to use run-time information (from other interfaces for example) to determine them. |
||
self, | ||
file_path: Path, | ||
start_datetime_timestamp: datetime = None, | ||
stop_datetime_timestamp: datetime = None, | ||
verbose: bool = False, | ||
): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, if you add them here or in the conversion options (add_to_nwbfile) I think a docstring would be highly beneficial. |
||
self.file_path = Path(file_path) | ||
self.start_datetime_timestamp = start_datetime_timestamp | ||
self.stop_datetime_timestamp = stop_datetime_timestamp | ||
self.verbose = verbose | ||
super().__init__(file_path=file_path) | ||
|
||
def get_timestamps_reference_time(self): | ||
""" | ||
Get datetime object of the first frame of the data in the .edf file. | ||
|
||
Returns | ||
---------- | ||
timestamps_reference_time : datetime.datetime | ||
datetime object of the first frame of the data in the .edf file. | ||
|
||
""" | ||
edf_reader = read_raw_edf(input_fname=self.file_path, verbose=self.verbose) | ||
return edf_reader.info["meas_date"] | ||
super().__init__( | ||
file_path=file_path, | ||
start_datetime_timestamp=start_datetime_timestamp, | ||
stop_datetime_timestamp=stop_datetime_timestamp, | ||
) | ||
|
||
def add_to_nwbfile( | ||
self, nwbfile: NWBFile, stub_test: bool = False, stub_frames: int = 100, **conversion_options | ||
|
@@ -61,13 +57,28 @@ def add_to_nwbfile( | |
edf_reader = read_raw_edf(input_fname=self.file_path, verbose=self.verbose) | ||
data, times = edf_reader.get_data(picks=list(channels_dict.keys()), return_times=True) | ||
data = data.astype("float32") | ||
# TODO select the correct time range | ||
if self.start_datetime_timestamp is not None: | ||
# Get edf start_time in datetime format | ||
edf_start_time = edf_reader.info["meas_date"] | ||
# Convert relative edf timestamps to datetime timestamps | ||
edf_start_time = edf_start_time.replace(tzinfo=None) | ||
edf_datetime_timestamps = [edf_start_time + timedelta(seconds=t) for t in times] | ||
# Find the indices of the timestamps within the time range | ||
start_idx = np.searchsorted(edf_datetime_timestamps, self.start_datetime_timestamp, side="left") | ||
end_idx = np.searchsorted(edf_datetime_timestamps, self.stop_datetime_timestamp, side="right") | ||
else: | ||
start_idx = 0 | ||
end_idx = -1 | ||
|
||
# Slice the data and timestamps within the time range | ||
if stub_test: | ||
data = data[:, :stub_frames] | ||
times = times[:stub_frames] | ||
data = data[:, start_idx : start_idx + stub_frames] | ||
else: | ||
data = data[:, start_idx:end_idx] | ||
|
||
for channel_index, channel_name in enumerate(channels_dict.keys()): | ||
time_series_kwargs = channels_dict[channel_name].copy() | ||
time_series_kwargs.update(data=data[channel_index], timestamps=times) | ||
time_series_kwargs.update(data=data[channel_index], starting_time=0.0, rate=edf_reader.info["sfreq"]) | ||
alessandratrapani marked this conversation as resolved.
Show resolved
Hide resolved
|
||
time_series = TimeSeries(**time_series_kwargs) | ||
nwbfile.add_acquisition(time_series) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,35 @@ | |
|
||
from pathlib import Path | ||
from typing import Union | ||
from datetime import datetime | ||
from datetime import datetime, timedelta | ||
import pandas as pd | ||
import json | ||
from neuroconv.utils import load_dict_from_file, dict_deep_update | ||
|
||
from zaki_2024_nwbconverter import Zaki2024NWBConverter | ||
from interfaces.miniscope_imaging_interface import get_miniscope_timestamps, get_session_start_time | ||
|
||
|
||
def get_miniscope_folder_path(folder_path: Union[str, Path]): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great |
||
Retrieve the path to the Miniscope folder within the given session folder based on metadata. | ||
|
||
Parameters: | ||
----------- | ||
folder_path : Union[str, Path] | ||
Path to the main session folder, which should contain a "metaData.json" file with information about the Miniscope. | ||
|
||
Returns: | ||
-------- | ||
Optional[Path] | ||
Path to the Miniscope folder, formatted to replace any spaces in the Miniscope name with underscores. Returns `None` if the | ||
specified folder is not a directory or if the metadata JSON is missing or misconfigured. | ||
|
||
Raises: | ||
------- | ||
AssertionError | ||
If the "metaData.json" file is not found in the given folder path. | ||
""" | ||
folder_path = Path(folder_path) | ||
if folder_path.is_dir(): | ||
general_metadata_json = folder_path / "metaData.json" | ||
|
@@ -26,6 +46,38 @@ def get_miniscope_folder_path(folder_path: Union[str, Path]): | |
return None | ||
|
||
|
||
def get_edf_slicing_time_range(folder_path: Union[str, Path], miniscope_folder_path: Union[str, Path]): | ||
""" | ||
Calculate the time range for EDF slicing based on session start time and Miniscope timestamps. | ||
|
||
Parameters: | ||
----------- | ||
folder_path : Union[str, Path] | ||
Path to the session folder, which contains metadata.json file produced by Miniscope output. | ||
|
||
miniscope_folder_path : Union[str, Path] | ||
Path to the folder containing Miniscope timeStamps.csv file. | ||
|
||
Returns: | ||
-------- | ||
Tuple[datetime, datetime] | ||
A tuple containing the start and stop timestamps (as datetime objects) for the EDF slicing period. The start timestamp | ||
corresponds to the session's start time adjusted by the first Miniscope timestamp, and the stop timestamp is the session's | ||
start time adjusted by the last Miniscope timestamp. | ||
|
||
""" | ||
folder_path = Path(folder_path) | ||
if folder_path.is_dir() and miniscope_folder_path.is_dir(): | ||
|
||
session_start_time = get_session_start_time(folder_path=folder_path) | ||
miniscope_timestamps = get_miniscope_timestamps(miniscope_folder_path=miniscope_folder_path) | ||
|
||
start_datetime_timestamp = session_start_time + timedelta(seconds=miniscope_timestamps[0]) | ||
stop_datetime_timestamp = session_start_time + timedelta(seconds=miniscope_timestamps[-1]) | ||
|
||
return start_datetime_timestamp, stop_datetime_timestamp | ||
|
||
|
||
def session_to_nwb( | ||
data_dir_path: Union[str, Path], | ||
output_dir_path: Union[str, Path], | ||
|
@@ -119,12 +171,24 @@ def session_to_nwb( | |
print(f"No freezing output csv file found at {freezing_output_file_path}") | ||
|
||
# Add EEG, EMG, Temperature and Activity signals | ||
# TODO discuss how to slice this data | ||
datetime_obj = datetime.strptime(date_str, "%Y_%m_%d") | ||
reformatted_date_str = datetime_obj.strftime("_%m%d%y") | ||
edf_file_path = data_dir_path / "Ca_EEG_EDF" / (subject_id + "_EDF") / (subject_id + reformatted_date_str + ".edf") | ||
|
||
if edf_file_path.is_file() and include_eeg_emg_signals: | ||
source_data.update(dict(EDFSignals=dict(file_path=edf_file_path))) | ||
|
||
start_datetime_timestamp, stop_datetime_timestamp = get_edf_slicing_time_range( | ||
folder_path=folder_path, miniscope_folder_path=miniscope_folder_path | ||
) | ||
source_data.update( | ||
dict( | ||
EDFSignals=dict( | ||
file_path=edf_file_path, | ||
start_datetime_timestamp=start_datetime_timestamp, | ||
stop_datetime_timestamp=stop_datetime_timestamp, | ||
) | ||
) | ||
) | ||
conversion_options.update(dict(EDFSignals=dict(stub_test=stub_test))) | ||
elif verbose and not include_eeg_emg_signals: | ||
print(f"The EEG, EMG, Temperature and Activity signals will not be included for session {session_id}") | ||
|
@@ -182,10 +246,10 @@ def session_to_nwb( | |
# Parameters for conversion | ||
data_dir_path = Path("D:/") | ||
subject_id = "Ca_EEG3-4" | ||
task = "NeutralExposure" | ||
task = "OfflineDay1Session1" | ||
session_id = subject_id + "_" + task | ||
output_dir_path = Path("D:/cai_lab_conversion_nwb/") | ||
stub_test = True | ||
stub_test = False | ||
session_times_file_path = data_dir_path / "Ca_EEG_Experiment" / subject_id / (subject_id + "_SessionTimes.csv") | ||
df = pd.read_csv(session_times_file_path) | ||
session_row = df[df["Session"] == task].iloc[0] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we just pass the file path of the json directly instead of passing the folder and then locating it within? What do you think?
I know you are moving these functions only but is something to consider if you think that a small breaking change is worth avoiding the problem of a different file structure in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first design choice was to pass the file path actually. I don't remember why I changed my mind, probably to avoid to repeat pice of code. But if you also think passing the file_path is the correct way, I will change it back.