Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable zarr backend testing in data tests [1] #1056

Merged
merged 12 commits into from
Sep 13, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Added automated EFS volume creation and mounting to the `submit_aws_job` helper function. [PR #1018](https://github.com/catalystneuro/neuroconv/pull/1018)

## Improvements
* Add writing to zarr test for to the test on data [PR #1056](https://github.com/catalystneuro/neuroconv/pull/1056)
* Modified the CI to avoid running doctests twice [PR #1077](https://github.com/catalystneuro/neuroconv/pull/#1077)

## v0.6.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_metadata(self):
metadata = super().get_metadata()
behavior_metadata = {
self.metadata_key_name: [
dict(name=f"Video: {Path(file_path).stem}", description="Video recorded by camera.", unit="Frames")
dict(name=f"Video {Path(file_path).stem}", description="Video recorded by camera.", unit="Frames")
for file_path in self.source_data["file_paths"]
]
}
Expand Down
93 changes: 70 additions & 23 deletions src/neuroconv/tools/testing/data_interface_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ class DataInterfaceTestMixin:

@pytest.fixture
def setup_interface(self, request):

"""Add this as a fixture when you want freshly created interface in the test."""
self.test_name: str = ""
self.conversion_options = self.conversion_options or dict()
self.interface = self.data_interface_cls(**self.interface_kwargs)

return self.interface, self.test_name
Expand All @@ -88,31 +87,26 @@ def check_conversion_options_schema_valid(self):
schema = self.interface.get_conversion_options_schema()
Draft7Validator.check_schema(schema=schema)

def check_metadata_schema_valid(self):
def test_metadata_schema_valid(self, setup_interface):
schema = self.interface.get_metadata_schema()
Draft7Validator.check_schema(schema=schema)

def check_metadata(self):
schema = self.interface.get_metadata_schema()
# Validate metadata now happens on the class itself
metadata = self.interface.get_metadata()
if "session_start_time" not in metadata["NWBFile"]:
metadata["NWBFile"].update(session_start_time=datetime.now().astimezone())
# handle json encoding of datetimes and other tricky types
metadata_for_validation = json.loads(json.dumps(metadata, cls=_NWBMetaDataEncoder))
validate(metadata_for_validation, schema)
self.check_extracted_metadata(metadata)

def check_no_metadata_mutation(self):
"""Ensure the metadata object was not altered by `add_to_nwbfile` method."""
metadata = self.interface.get_metadata()
metadata["NWBFile"].update(session_start_time=datetime.now().astimezone())

metadata_in = deepcopy(metadata)
def test_no_metadata_mutation(self, setup_interface):
"""Ensure the metadata object is not altered by `add_to_nwbfile` method."""

nwbfile = mock_NWBFile()

metadata = self.interface.get_metadata()
metadata_before_add_method = deepcopy(metadata)

self.interface.add_to_nwbfile(nwbfile=nwbfile, metadata=metadata, **self.conversion_options)

assert metadata == metadata_in
assert metadata == metadata_before_add_method

def check_run_conversion_with_backend(self, nwbfile_path: str, backend: Literal["hdf5", "zarr"] = "hdf5"):
metadata = self.interface.get_metadata()
Expand Down Expand Up @@ -223,6 +217,26 @@ def run_custom_checks(self):
"""Override this in child classes to inject additional custom checks."""
pass

@pytest.mark.parametrize("backend", ["hdf5", "zarr"])
def test_run_conversion_with_backend(self, setup_interface, tmp_path, backend):

nwbfile_path = str(tmp_path / f"conversion_with_backend{backend}-{self.test_name}.nwb")

metadata = self.interface.get_metadata()
if "session_start_time" not in metadata["NWBFile"]:
metadata["NWBFile"].update(session_start_time=datetime.now().astimezone())

self.interface.run_conversion(
nwbfile_path=nwbfile_path,
overwrite=True,
metadata=metadata,
backend=backend,
**self.conversion_options,
)

if backend == "zarr":
self.check_basic_zarr_read(nwbfile_path)

def test_all_conversion_checks(self, setup_interface, tmp_path):
interface, test_name = setup_interface

Expand All @@ -231,16 +245,13 @@ def test_all_conversion_checks(self, setup_interface, tmp_path):
self.nwbfile_path = nwbfile_path

# Now run the checks using the setup objects
self.check_metadata_schema_valid()
self.check_conversion_options_schema_valid()
self.check_metadata()
self.check_no_metadata_mutation()
self.check_configure_backend_for_equivalent_nwbfiles()

self.check_run_conversion_in_nwbconverter_with_backend(nwbfile_path=nwbfile_path, backend="hdf5")
self.check_run_conversion_in_nwbconverter_with_backend_configuration(nwbfile_path=nwbfile_path, backend="hdf5")

self.check_run_conversion_with_backend(nwbfile_path=nwbfile_path, backend="hdf5")
self.check_run_conversion_with_backend_configuration(nwbfile_path=nwbfile_path, backend="hdf5")

self.check_read_nwb(nwbfile_path=nwbfile_path)
Expand Down Expand Up @@ -733,16 +744,13 @@ def test_all_conversion_checks(self, setup_interface, tmp_path):
nwbfile_path = str(tmp_path / f"{self.__class__.__name__}_{self.test_name}.nwb")

# Now run the checks using the setup objects
self.check_metadata_schema_valid()
self.check_conversion_options_schema_valid()
self.check_metadata()
self.check_no_metadata_mutation()
self.check_configure_backend_for_equivalent_nwbfiles()

self.check_run_conversion_in_nwbconverter_with_backend(nwbfile_path=nwbfile_path, backend="hdf5")
self.check_run_conversion_in_nwbconverter_with_backend_configuration(nwbfile_path=nwbfile_path, backend="hdf5")

self.check_run_conversion_with_backend(nwbfile_path=nwbfile_path, backend="hdf5")
self.check_run_conversion_with_backend_configuration(nwbfile_path=nwbfile_path, backend="hdf5")

self.check_read_nwb(nwbfile_path=nwbfile_path)
Expand Down Expand Up @@ -813,7 +821,7 @@ def check_read_nwb(self, nwbfile_path: str):
with NWBHDF5IO(path=nwbfile_path, mode="r", load_namespaces=True) as io:
nwbfile = io.read()
video_type = Path(self.interface_kwargs["file_paths"][0]).suffix[1:]
assert f"Video: video_{video_type}" in nwbfile.acquisition
assert f"Video video_{video_type}" in nwbfile.acquisition

def check_interface_set_aligned_timestamps(self):
all_unaligned_timestamps = self.interface.get_original_timestamps()
Expand Down Expand Up @@ -883,6 +891,29 @@ class MedPCInterfaceMixin(DataInterfaceTestMixin, TemporalAlignmentMixin):
A mixin for testing MedPC interfaces.
"""

def test_metadata_schema_valid(self):
pass

def test_run_conversion_with_backend(self):
pass

def test_no_metadata_mutation(self):
pass

def check_metadata_schema_valid(self):
schema = self.interface.get_metadata_schema()
Draft7Validator.check_schema(schema=schema)

def check_metadata(self):
schema = self.interface.get_metadata_schema()
metadata = self.interface.get_metadata()
if "session_start_time" not in metadata["NWBFile"]:
metadata["NWBFile"].update(session_start_time=datetime.now().astimezone())
# handle json encoding of datetimes and other tricky types
metadata_for_validation = json.loads(json.dumps(metadata, cls=_NWBMetaDataEncoder))
validate(metadata_for_validation, schema)
self.check_extracted_metadata(metadata)

def check_no_metadata_mutation(self, metadata: dict):
"""Ensure the metadata object was not altered by `add_to_nwbfile` method."""

Expand Down Expand Up @@ -1220,6 +1251,22 @@ def check_read_nwb(self, nwbfile_path: str):
class TDTFiberPhotometryInterfaceMixin(DataInterfaceTestMixin, TemporalAlignmentMixin):
"""Mixin for testing TDT Fiber Photometry interfaces."""

def test_metadata_schema_valid(self):
pass

def test_no_metadata_mutation(self):
pass

def test_run_conversion_with_backend(self):
pass

def test_no_metadata_mutation(self):
pass

def check_metadata_schema_valid(self):
schema = self.interface.get_metadata_schema()
Draft7Validator.check_schema(schema=schema)

def check_no_metadata_mutation(self, metadata: dict):
"""Ensure the metadata object was not altered by `add_to_nwbfile` method."""

Expand Down
6 changes: 3 additions & 3 deletions tests/test_behavior/test_video_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ def test_video_external_mode(self):
nwbfile = io.read()
module = nwbfile.acquisition
metadata = self.nwb_converter.get_metadata()
self.assertListEqual(list1=list(module["Video: test1"].external_file[:]), list2=self.video_files[0:2])
self.assertListEqual(list1=list(module["Video: test3"].external_file[:]), list2=[self.video_files[2]])
self.assertListEqual(list1=list(module["Video test1"].external_file[:]), list2=self.video_files[0:2])
self.assertListEqual(list1=list(module["Video test3"].external_file[:]), list2=[self.video_files[2]])

def test_video_irregular_timestamps(self):
aligned_timestamps = [np.array([1.0, 2.0, 4.0]), np.array([5.0, 6.0, 7.0])]
Expand All @@ -157,7 +157,7 @@ def test_video_irregular_timestamps(self):
expected_timestamps = timestamps = np.array([1.0, 2.0, 4.0, 55.0, 56.0, 57.0])
with NWBHDF5IO(path=self.nwbfile_path, mode="r") as io:
nwbfile = io.read()
np.testing.assert_array_equal(expected_timestamps, nwbfile.acquisition["Video: test1"].timestamps[:])
np.testing.assert_array_equal(expected_timestamps, nwbfile.acquisition["Video test1"].timestamps[:])

def test_starting_frames_type_error(self):
timestamps = [np.array([2.2, 2.4, 2.6]), np.array([3.2, 3.4, 3.6])]
Expand Down
35 changes: 16 additions & 19 deletions tests/test_on_data/ecephys/test_recording_interfaces.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime
from platform import python_version
from sys import platform
from typing import Literal

import numpy as np
import pytest
Expand Down Expand Up @@ -180,32 +179,30 @@ class TestEDFRecordingInterface(RecordingExtractorInterfaceTestMixin):
def check_extracted_metadata(self, metadata: dict):
assert metadata["NWBFile"]["session_start_time"] == datetime(2022, 3, 2, 10, 42, 19)

def test_interface_alignment(self):
interface_kwargs = self.interface_kwargs
def test_all_conversion_checks(self, setup_interface, tmp_path):
# Create a unique test name and file path
nwbfile_path = str(tmp_path / f"{self.__class__.__name__}.nwb")
self.nwbfile_path = nwbfile_path

# Now run the checks using the setup objects
self.check_conversion_options_schema_valid()
self.check_metadata()

# TODO - debug hanging I/O from pyedflib
# self.check_interface_get_original_timestamps()
# self.check_interface_get_timestamps()
# self.check_align_starting_time_internal()
# self.check_align_starting_time_external()
# self.check_interface_align_timestamps()
# self.check_shift_timestamps_by_start_time()
# self.check_interface_original_timestamps_inmutability()
self.check_run_conversion_with_backend(nwbfile_path=nwbfile_path, backend="hdf5")

self.check_nwbfile_temporal_alignment()
self.check_read_nwb(nwbfile_path=nwbfile_path)

# EDF has simultaneous access issues; can't have multiple interfaces open on the same file at once...
def check_run_conversion_in_nwbconverter_with_backend(
self, nwbfile_path: str, backend: Literal["hdf5", "zarr"] = "hdf5"
):
def test_metadata_schema_valid(self):
pass

def check_run_conversion_in_nwbconverter_with_backend_configuration(
self, nwbfile_path: str, backend: Literal["hdf5", "zarr"] = "hdf5"
):
def test_no_metadata_mutation(self):
pass

def check_run_conversion_with_backend(self, nwbfile_path: str, backend: Literal["hdf5", "zarr"] = "hdf5"):
def test_run_conversion_with_backend(self):
pass

def test_interface_alignment(self):
pass


Expand Down
Loading