Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scanbox format updates #1311

Merged
merged 7 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ __pycache__/
*.pyc
*.npy
*.mat
!testdata/*.mat
*.hdf5
*.mmap
*.npz
Expand Down
255 changes: 10 additions & 245 deletions caiman/base/movies.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import caiman.base.traces
import caiman.mmapping
import caiman.summary_images
import caiman.utils.sbx_utils
import caiman.utils.visualization

try:
Expand Down Expand Up @@ -1310,7 +1311,7 @@ def load(file_name: Union[str, list[str]],
logging.error('movies.py:load(): channel parameter is not supported for single movie input')

if os.path.exists(file_name):
_, extension = os.path.splitext(file_name)[:2]
basename, extension = os.path.splitext(file_name)

extension = extension.lower()
if extension == '.mat':
Expand Down Expand Up @@ -1575,10 +1576,11 @@ def load(file_name: Union[str, list[str]],

elif extension == '.sbx':
logging.debug('sbx')
if subindices is not None:
return movie(sbxreadskip(file_name[:-4], subindices), fr=fr).astype(outtype)
else:
return movie(sbxread(file_name[:-4], k=0, n_frames=np.inf), fr=fr).astype(outtype)
meta_data = caiman.utils.sbx_utils.sbx_meta_data(basename)
input_arr = caiman.utils.sbx_utils.sbxread(basename, subindices)
return movie(input_arr, fr=fr,
file_name=os.path.split(file_name)[-1],
meta_data=meta_data).astype(outtype)

elif extension == '.sima':
raise Exception("movies.py:load(): FATAL: sima support was removed in 1.9.8")
Expand Down Expand Up @@ -1706,229 +1708,6 @@ def _load_behavior(file_name:str) -> Any:
file_name=os.path.split(file_name)[-1],
meta_data=meta_data)

####
# TODO: Consider pulling these functions that work with .mat files into a separate file


def loadmat_sbx(filename: str):
"""
this wrapper should be called instead of directly calling spio.loadmat

It solves the problem of not properly recovering python dictionaries
from mat files. It calls the function check keys to fix all entries
which are still mat-objects
"""
data_ = scipy.io.loadmat(filename, struct_as_record=False, squeeze_me=True)
_check_keys(data_)
return data_


def _check_keys(checkdict:dict) -> None:
"""
checks if entries in dictionary are mat-objects. If yes todict is called to change them to nested dictionaries.
Modifies its parameter in-place.
"""

for key in checkdict:
if isinstance(checkdict[key], scipy.io.matlab.mio5_params.mat_struct):
checkdict[key] = _todict(checkdict[key])


def _todict(matobj) -> dict:
"""
A recursive function which constructs from matobjects nested dictionaries
"""

ret = {}
for strg in matobj._fieldnames:
elem = matobj.__dict__[strg]
if isinstance(elem, scipy.io.matlab.mio5_params.mat_struct):
ret[strg] = _todict(elem)
else:
ret[strg] = elem
return ret


def sbxread(filename: str, k: int = 0, n_frames=np.inf) -> np.ndarray:
"""
Args:
filename: str
filename should be full path excluding .sbx
"""
# Check if contains .sbx and if so just truncate
if '.sbx' in filename:
filename = filename[:-4]

# Load info
info = loadmat_sbx(filename + '.mat')['info']

# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
max_idx = os.path.getsize(filename + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1

# Parameters
N = max_idx + 1 # Last frame
N = np.minimum(N, n_frames)

nSamples = info['sz'][1] * info['recordsPerBuffer'] * 2 * info['nChan']

# Open File
fo = open(filename + '.sbx')

# Note: SBX files store the values strangely, its necessary to subtract the values from the max int16 to get the correct ones
fo.seek(k * nSamples, 0)
ii16 = np.iinfo(np.uint16)
x = ii16.max - np.fromfile(fo, dtype='uint16', count=int(nSamples / 2 * N))
x = x.reshape((int(info['nChan']), int(info['sz'][1]), int(info['recordsPerBuffer']), int(N)), order='F')

x = x[0, :, :, :]

fo.close()

return x.transpose([2, 1, 0])


def sbxreadskip(filename: str, subindices: slice) -> np.ndarray:
"""
Args:
filename: str
filename should be full path excluding .sbx

slice: pass a slice to slice along the last dimension
"""
# Check if contains .sbx and if so just truncate
if '.sbx' in filename:
filename = filename[:-4]

# Load info
info = loadmat_sbx(filename + '.mat')['info']

# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
max_idx = int(os.path.getsize(filename + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1)

# Parameters
if isinstance(subindices, slice):
if subindices.start is None:
start = 0
else:
start = subindices.start

if subindices.stop is None:
N = max_idx + 1 # Last frame
else:
N = np.minimum(subindices.stop, max_idx + 1).astype(int)

if subindices.step is None:
skip = 1
else:
skip = subindices.step

iterable_elements = range(start, N, skip)

else:

N = len(subindices)
iterable_elements = subindices
skip = 0

N_time = len(list(iterable_elements))

nSamples = info['sz'][1] * info['recordsPerBuffer'] * 2 * info['nChan']
assert nSamples >= 0

# Open File
fo = open(filename + '.sbx')

# Note: SBX files store the values strangely, its necessary to subtract the values from the max int16 to get the correct ones

counter = 0

if skip == 1:
# Note: SBX files store the values strangely, its necessary to subtract the values from the max int16 to get the correct ones
assert start * nSamples > 0
fo.seek(start * nSamples, 0)
ii16 = np.iinfo(np.uint16)
x = ii16.max - np.fromfile(fo, dtype='uint16', count=int(nSamples / 2 * (N - start)))
x = x.reshape((int(info['nChan']), int(info['sz'][1]), int(info['recordsPerBuffer']), int(N - start)),
order='F')

x = x[0, :, :, :]

else:
for k in iterable_elements:
assert k >= 0
if counter % 100 == 0:
logging.debug(f'Reading Iteration: {k}')
fo.seek(k * nSamples, 0)
ii16 = np.iinfo(np.uint16)
tmp = ii16.max - \
np.fromfile(fo, dtype='uint16', count=int(nSamples / 2 * 1))

tmp = tmp.reshape((int(info['nChan']), int(info['sz'][1]), int(info['recordsPerBuffer'])), order='F')
if counter == 0:
x = np.zeros((tmp.shape[0], tmp.shape[1], tmp.shape[2], N_time))

x[:, :, :, counter] = tmp
counter += 1

x = x[0, :, :, :]
fo.close()

return x.transpose([2, 1, 0])


def sbxshape(filename: str) -> tuple[int, int, int]:
"""
Args:
filename should be full path excluding .sbx
"""
# TODO: Document meaning of return values

# Check if contains .sbx and if so just truncate
if '.sbx' in filename:
filename = filename[:-4]

# Load info
info = loadmat_sbx(filename + '.mat')['info']

# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
max_idx = os.path.getsize(filename + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1
N = max_idx + 1 # Last frame
x = (int(info['sz'][1]), int(info['recordsPerBuffer']), int(N))
return x


def to_3D(mov2D:np.ndarray, shape:tuple, order='F') -> np.ndarray:
"""
Expand Down Expand Up @@ -2285,23 +2064,9 @@ def get_file_size(file_name, var_name_hdf5:str='mov') -> tuple[tuple, Union[int,
raise Exception('Variable not found. Use one of the above')
T, dims = siz[0], siz[1:]
elif extension in ('.sbx'):
info = loadmat_sbx(file_name[:-4]+ '.mat')['info']
dims = tuple((info['sz']).astype(int))
# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
T = int(os.path.getsize(
file_name[:-4] + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1)

shape = caiman.utils.sbx_utils.sbx_shape(file_name[:-4])
T = shape[-1]
dims = (shape[2], shape[1])
else:
raise Exception('Unknown file type')
dims = tuple(dims)
Expand Down
Loading
Loading