Skip to content

Commit

Permalink
Merge pull request #1311 from proektlab/sbxload-update
Browse files Browse the repository at this point in the history
Scanbox format updates
  • Loading branch information
pgunn authored Mar 29, 2024
2 parents 54915d3 + ec9ca6e commit 16cb274
Show file tree
Hide file tree
Showing 10 changed files with 686 additions and 245 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ __pycache__/
*.pyc
*.npy
*.mat
!testdata/*.mat
*.hdf5
*.mmap
*.npz
Expand Down
255 changes: 10 additions & 245 deletions caiman/base/movies.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import caiman.base.traces
import caiman.mmapping
import caiman.summary_images
import caiman.utils.sbx_utils
import caiman.utils.visualization

try:
Expand Down Expand Up @@ -1310,7 +1311,7 @@ def load(file_name: Union[str, list[str]],
logging.error('movies.py:load(): channel parameter is not supported for single movie input')

if os.path.exists(file_name):
_, extension = os.path.splitext(file_name)[:2]
basename, extension = os.path.splitext(file_name)

extension = extension.lower()
if extension == '.mat':
Expand Down Expand Up @@ -1575,10 +1576,11 @@ def load(file_name: Union[str, list[str]],

elif extension == '.sbx':
logging.debug('sbx')
if subindices is not None:
return movie(sbxreadskip(file_name[:-4], subindices), fr=fr).astype(outtype)
else:
return movie(sbxread(file_name[:-4], k=0, n_frames=np.inf), fr=fr).astype(outtype)
meta_data = caiman.utils.sbx_utils.sbx_meta_data(basename)
input_arr = caiman.utils.sbx_utils.sbxread(basename, subindices)
return movie(input_arr, fr=fr,
file_name=os.path.split(file_name)[-1],
meta_data=meta_data).astype(outtype)

elif extension == '.sima':
raise Exception("movies.py:load(): FATAL: sima support was removed in 1.9.8")
Expand Down Expand Up @@ -1706,229 +1708,6 @@ def _load_behavior(file_name:str) -> Any:
file_name=os.path.split(file_name)[-1],
meta_data=meta_data)

####
# TODO: Consider pulling these functions that work with .mat files into a separate file


def loadmat_sbx(filename: str):
"""
this wrapper should be called instead of directly calling spio.loadmat
It solves the problem of not properly recovering python dictionaries
from mat files. It calls the function check keys to fix all entries
which are still mat-objects
"""
data_ = scipy.io.loadmat(filename, struct_as_record=False, squeeze_me=True)
_check_keys(data_)
return data_


def _check_keys(checkdict:dict) -> None:
"""
checks if entries in dictionary are mat-objects. If yes todict is called to change them to nested dictionaries.
Modifies its parameter in-place.
"""

for key in checkdict:
if isinstance(checkdict[key], scipy.io.matlab.mio5_params.mat_struct):
checkdict[key] = _todict(checkdict[key])


def _todict(matobj) -> dict:
"""
A recursive function which constructs from matobjects nested dictionaries
"""

ret = {}
for strg in matobj._fieldnames:
elem = matobj.__dict__[strg]
if isinstance(elem, scipy.io.matlab.mio5_params.mat_struct):
ret[strg] = _todict(elem)
else:
ret[strg] = elem
return ret


def sbxread(filename: str, k: int = 0, n_frames=np.inf) -> np.ndarray:
"""
Args:
filename: str
filename should be full path excluding .sbx
"""
# Check if contains .sbx and if so just truncate
if '.sbx' in filename:
filename = filename[:-4]

# Load info
info = loadmat_sbx(filename + '.mat')['info']

# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
max_idx = os.path.getsize(filename + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1

# Parameters
N = max_idx + 1 # Last frame
N = np.minimum(N, n_frames)

nSamples = info['sz'][1] * info['recordsPerBuffer'] * 2 * info['nChan']

# Open File
fo = open(filename + '.sbx')

# Note: SBX files store the values strangely, its necessary to subtract the values from the max int16 to get the correct ones
fo.seek(k * nSamples, 0)
ii16 = np.iinfo(np.uint16)
x = ii16.max - np.fromfile(fo, dtype='uint16', count=int(nSamples / 2 * N))
x = x.reshape((int(info['nChan']), int(info['sz'][1]), int(info['recordsPerBuffer']), int(N)), order='F')

x = x[0, :, :, :]

fo.close()

return x.transpose([2, 1, 0])


def sbxreadskip(filename: str, subindices: slice) -> np.ndarray:
"""
Args:
filename: str
filename should be full path excluding .sbx
slice: pass a slice to slice along the last dimension
"""
# Check if contains .sbx and if so just truncate
if '.sbx' in filename:
filename = filename[:-4]

# Load info
info = loadmat_sbx(filename + '.mat')['info']

# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
max_idx = int(os.path.getsize(filename + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1)

# Parameters
if isinstance(subindices, slice):
if subindices.start is None:
start = 0
else:
start = subindices.start

if subindices.stop is None:
N = max_idx + 1 # Last frame
else:
N = np.minimum(subindices.stop, max_idx + 1).astype(int)

if subindices.step is None:
skip = 1
else:
skip = subindices.step

iterable_elements = range(start, N, skip)

else:

N = len(subindices)
iterable_elements = subindices
skip = 0

N_time = len(list(iterable_elements))

nSamples = info['sz'][1] * info['recordsPerBuffer'] * 2 * info['nChan']
assert nSamples >= 0

# Open File
fo = open(filename + '.sbx')

# Note: SBX files store the values strangely, its necessary to subtract the values from the max int16 to get the correct ones

counter = 0

if skip == 1:
# Note: SBX files store the values strangely, its necessary to subtract the values from the max int16 to get the correct ones
assert start * nSamples > 0
fo.seek(start * nSamples, 0)
ii16 = np.iinfo(np.uint16)
x = ii16.max - np.fromfile(fo, dtype='uint16', count=int(nSamples / 2 * (N - start)))
x = x.reshape((int(info['nChan']), int(info['sz'][1]), int(info['recordsPerBuffer']), int(N - start)),
order='F')

x = x[0, :, :, :]

else:
for k in iterable_elements:
assert k >= 0
if counter % 100 == 0:
logging.debug(f'Reading Iteration: {k}')
fo.seek(k * nSamples, 0)
ii16 = np.iinfo(np.uint16)
tmp = ii16.max - \
np.fromfile(fo, dtype='uint16', count=int(nSamples / 2 * 1))

tmp = tmp.reshape((int(info['nChan']), int(info['sz'][1]), int(info['recordsPerBuffer'])), order='F')
if counter == 0:
x = np.zeros((tmp.shape[0], tmp.shape[1], tmp.shape[2], N_time))

x[:, :, :, counter] = tmp
counter += 1

x = x[0, :, :, :]
fo.close()

return x.transpose([2, 1, 0])


def sbxshape(filename: str) -> tuple[int, int, int]:
"""
Args:
filename should be full path excluding .sbx
"""
# TODO: Document meaning of return values

# Check if contains .sbx and if so just truncate
if '.sbx' in filename:
filename = filename[:-4]

# Load info
info = loadmat_sbx(filename + '.mat')['info']

# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
max_idx = os.path.getsize(filename + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1
N = max_idx + 1 # Last frame
x = (int(info['sz'][1]), int(info['recordsPerBuffer']), int(N))
return x


def to_3D(mov2D:np.ndarray, shape:tuple, order='F') -> np.ndarray:
"""
Expand Down Expand Up @@ -2285,23 +2064,9 @@ def get_file_size(file_name, var_name_hdf5:str='mov') -> tuple[tuple, Union[int,
raise Exception('Variable not found. Use one of the above')
T, dims = siz[0], siz[1:]
elif extension in ('.sbx'):
info = loadmat_sbx(file_name[:-4]+ '.mat')['info']
dims = tuple((info['sz']).astype(int))
# Defining number of channels/size factor
if info['channels'] == 1:
info['nChan'] = 2
factor = 1
elif info['channels'] == 2:
info['nChan'] = 1
factor = 2
elif info['channels'] == 3:
info['nChan'] = 1
factor = 2

# Determine number of frames in whole file
T = int(os.path.getsize(
file_name[:-4] + '.sbx') / info['recordsPerBuffer'] / info['sz'][1] * factor / 4 - 1)

shape = caiman.utils.sbx_utils.sbx_shape(file_name[:-4])
T = shape[-1]
dims = (shape[2], shape[1])
else:
raise Exception('Unknown file type')
dims = tuple(dims)
Expand Down
Loading

0 comments on commit 16cb274

Please sign in to comment.