Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES-295: Save Lab metadata in IDE files #149

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions idelib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from collections.abc import Iterable, Sequence
from datetime import datetime
from threading import Lock
from typing import Any, Dict, Optional
import warnings

import os.path
Expand Down Expand Up @@ -260,6 +261,11 @@ def __init__(self, stream, name=None, quiet=True, attributes=None):
self.loading = True
self.filename = getattr(stream, "name", None)

# For keeping user-defined data
self._userdata: Optional[Dict[str, Any]] = None
self._userdataOffset: Optional[int] = None
self._filesize: Optional[int] = None

self._channelDataLock = Lock()

# Subsets: used when importing multiple files into the same dataset.
Expand Down
25 changes: 25 additions & 0 deletions idelib/schemata/mide_ide.xml
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,29 @@
<BinaryElement name="ChannelDataMinMeanMax" id="0xBC" multiple="0" minver="1" precache="1">Statistical data for this block's payload consisting of 3 datapoints (min, mean, max) per subchannel. They are organized as [[sc0min] [sc1min] [sc2min] ...] [[sc0mean] [sc1mean] [sc2mean] ...] [[sc0max] [sc1max] [sc2max] ...]. The format and representation of the stat data exactly matches that of the input samples; that is, if the input samples are uint16_t, each stat entry is also a uint16_t.</BinaryElement>
<UIntegerElement name="MediaWriteLatency" id="0xBE" multiple="0" minver="2" precache="0">Super-optional diagnostic element indicating the latency between data acquisition and transfer to the output media. The exact meaning of this value is device-dependent, but may serve as a general indicator of excess activity load, retransmission or congestion (for transmission media) or media wear (for recording media).</UIntegerElement>
</MasterElement>

<!-- User-supplied metadata -->
<MasterElement name="UserData" id="0x10200000" multiple="0" mandatory="0">
<IntegerElement name="TimebaseOffset" id="0x10200010" multiple="0" mandatory="0"> An offset (in microseconds) for all sample times. </IntegerElement>
<BinaryElement name="WindowLayout" id="0x10200011" multiple="0" mandatory="0"> Application-specific data describing GUI settings, etc. </BinaryElement>
<MasterElement name="AnnotationList" id="0x10200020" multiple="0" mandatory="0"> User-created highlights, marking particular spans and points in time
<MasterElement name="Annotation" id="0x10200021" multiple="1" mandatory="0">
<UIntegerElement name="AnnotationID" id="0x10200022" multiple="0" mandatory="1" /> The annotation's ID, arbitrary but unique to the file.
<UnicodeElement name="AnnotationText" id="0x10200023" multiple="0" mandatory="0" /> A name and/or notes about the annotation.
<IntegerElement name="AnnotationStartTime" id="0x10200024" multiple="0" mandatory="1" /> Annotation start time.
<IntegerElement name="AnnotationEndTime" id="0x10200025" multiple="0" mandatory="0" /> Annotation end time. If present, the annotation will cover a time span.
<BinaryElement name="AnnotationStyle" id="0x10200026" multiple="0" mandatory="0" /> Application-specific data describing the visual representation of the annotation.
</MasterElement>
</MasterElement>

<!-- Reused existing elements, which override those in the RecordingProperties, etc. -->
<UIntegerElement name="TimeBaseUTC" id="0x5462" />
<MasterElement name="CalibrationList" id="0x4B00" />
<MasterElement name="WarningList" id="0x5360" />
<MasterElement name="ChannelList" id="0x5270" />

<!-- Additional user data can be supplied with Attribute elements. -->
</MasterElement>

<UIntegerElement name="UserDataOffset" id="0x10200001" multiple="0" mandatory="0">Position of the beginning of the user-defined metadata. This should be the last element in the file.</UIntegerElement>
</Schema>
190 changes: 190 additions & 0 deletions idelib/userdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""
Functions for reading and writing application-specific data from/to the end
of IDE files. This data is intended primarily to retain user preferences for
the display of the `Dataset`.
"""

import errno
import os.path
import logging
from typing import Any, Dict, Optional, Tuple, Union

from .dataset import Dataset

#===============================================================================
#
#===============================================================================

MIN_VOID_SIZE = 9

logger = logging.getLogger('idelib')

#===============================================================================
#
#===============================================================================

def getUserDataPos(dataset: Dataset,
refresh: bool = False) -> Tuple[bool, int, int]:
""" Get the offset of the start of the user data.

:param dataset: The `Dataset` in which to locate the user data.
:param refresh:: If `True`, ignore any cached values and re-read
from the file.
:return: A tuple containing a bool (wheter or not data exists),
the offset of the user data, and the total length of the file.
Offset and filesize will typically be the same if there is no
user data.
"""
if not refresh and dataset._userdataOffset and dataset._filesize:
return bool(dataset._userdata), dataset._userdataOffset, dataset._filesize

doc = dataset.ebmldoc
fs = doc.stream
hasdata = False

oldpos = fs.tell()
filesize = fs.seek(0, os.SEEK_END)
offset = filesize

# The UserDataOffset is a known, fixed size
example = doc.schema['UserDataOffset'].encode(1, length=8, lengthSize=8)
header = example[:-8]

try:
# UserDataOffset *should* be right at the end of the file, but
# don't assume so. Start some bytes back and find the header.
pos = offset - int(len(example) * 1.5)
fs.seek(pos, os.SEEK_SET)
chunk = fs.read()
if header in chunk:
fs.seek(pos + chunk.index(header), os.SEEK_SET)
el, _next = doc.parseElement(fs)
offset = el.value
hasdata = True

except IndexError:
# Problem with parsed chunk; shouldn't happen.
pass

finally:
fs.seek(oldpos, os.SEEK_SET)

dataset._userdataOffset = offset
dataset._filesize = filesize
return hasdata, offset, filesize


#===============================================================================
#
#===============================================================================

def readUserData(dataset: Dataset,
refresh: bool = False) -> Union[Dict[str, Any], None]:
""" Read application-specific user data from the end of an IDE file.

:param dataset: The `Dataset` from which to read the user data.
:param refresh:: If `True`, ignore any cached values and re-read
from the file.
:return: A dictionary of user data, or `None` if no user data
could be read from the file (e.g., none exists).
"""
if not refresh and dataset._userdataOffset and dataset._filesize:
return dataset._userdata

doc = dataset.ebmldoc
fs = doc.stream
oldpos = fs.tell()

hasdata, offset, filesize = getUserDataPos(dataset, refresh=refresh)

if not hasdata:
logger.debug('No user data found')
dataset._userdata = None
return None

try:
fs.seek(offset, os.SEEK_SET)
data, _next = doc.parseElement(fs)
dump = data.dump()
dataset._userdata = dump
return dump

finally:
fs.seek(oldpos, os.SEEK_SET)


#===============================================================================
#
#===============================================================================

def writeUserData(dataset: Dataset,
userdata: Dict[str, Any],
refresh: bool = False):
""" Write user data to the end of an IDE file.

:param dataset: The `Dataset` from which to read the user data.
:param userdata: A dictionary of user data, or `None` to remove
existing user data. Note that the file will not get smaller if
the user data is removed or the new set of user data is smaller
than existing user data); it is just overwritten with null data
(an EBML `Void` element).
:param refresh: If `True`, ignore any cached values and find the
position in the file to which to write.
"""
schema = dataset.ebmldoc.schema
fs = dataset.ebmldoc.stream
oldpos = fs.tell()

try:
hasdata, offset, filesize = getUserDataPos(dataset, refresh=refresh)

if userdata:
# User data consists of a `UserData` element, a `Void`, and `UserDataOffset`
dataBin = schema.encodes({'UserData': userdata or {}})
offsetBin = schema['UserDataOffset'].encode(offset, length=8, lengthSize=8)
newsize = (len(offsetBin) + len(dataBin) + offset + MIN_VOID_SIZE)
voidBin = schema['Void'].encode(None, length=max(0, filesize - newsize),
lengthSize=8)
else:
# No new userdata, just write 'Void' over any existing userdata
# (or do nothing if there is no existing userdata)
dataset._userdata = userdata
if not hasdata:
return
newsize = filesize
dataBin = offsetBin = b''
voidBin = schema['Void'].encode(None, length=max(0, filesize - MIN_VOID_SIZE))

userblob = dataBin + voidBin + offsetBin

try:
writable = fs.writable()
except AttributeError:
# In case file-like stream doesn't implement `writable()`
# (e.g., older `ebmlite.threaded_file.ThreadAwareFile`)
mode = getattr(fs, 'mode', '')
writable = '+' in mode or 'w' in mode

if not writable:
# File/stream is read-only; attempt to create a new file stream.
if not getattr(fs, 'name', None):
raise IOError(errno.EACCES,
f'Could not write user data; '
f'Dataset stream not writable and has no filename')

with open(fs.name, 'br+') as newfs:
logger.debug(f'(userdata) Dataset stream read only (mode {fs.mode!r}), '
'using new stream')
newfs.seek(offset, os.SEEK_SET)
newfs.write(userblob)

else:
fs.seek(offset, os.SEEK_SET)
fs.write(userblob)

dataset._userdata = userdata
logger.debug(f'(userdata) Wrote {len(userblob)} bytes to {dataset} '
f'(file was {filesize}, now {newsize})')

finally:
fs.seek(oldpos, os.SEEK_SET)
4 changes: 3 additions & 1 deletion testing/file_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

FILES = [('./testing/SSX70065.IDE', 'rb'),
('./testing/SSX66115.IDE', 'rb'),
('./test.ide', 'rb')]
('./test.ide', 'rb'),
('./testing/SSX_Data.IDE', 'rb'),
('./testing/with_userdata.IDE', 'rb')]
FILE_DICT = {}

for fName, mode in FILES:
Expand Down
133 changes: 133 additions & 0 deletions testing/test_userdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""
Test reading/writing user data to/from IDE files and streams (files and
file-like).
"""

import pytest # type: ignore

from io import BytesIO
import os.path
import shutil

from idelib import importer
from idelib import userdata

from testing import file_streams


# ==============================================================================
#
# ==============================================================================

USERDATA = {
'TimebaseOffset': 12345,
'WindowLayout': bytearray(b'bogus binary blob'),
'TimeBaseUTC': [1712769739]
}

SMALLER_USERDATA = {
'TimebaseOffset': 54321,
}

LARGER_USERDATA = {
'TimebaseOffset': 56789,
'WindowLayout': bytearray(b'bogus binary blob'),
'AnnotationList': {
'Annotation': [{'AnnotationID': 42, 'AnnotationStartTime': 101},],
},
'TimeBaseUTC': [35096400]
}

FILE_WITHOUT_USERDATA = './testing/SSX_Data.IDE'
FILE_WITH_USERDATA = './testing/with_userdata.IDE'


# ==============================================================================
#
# ==============================================================================

def test_read_userdata():
""" Test reading user data.
"""
doc = importer.openFile(file_streams.makeStreamLike(FILE_WITH_USERDATA))
data = userdata.readUserData(doc)
assert data == USERDATA


def test_read_userdata_no_userdata():
""" Test reading user data from a file without user data.
"""
doc = importer.openFile(file_streams.makeStreamLike(FILE_WITHOUT_USERDATA))
data = userdata.readUserData(doc)
assert data is None


def test_write_userdata(tmp_path):
""" Test writing (and re-reading) user data to a file without existing
user data.
"""
sourceFile = FILE_WITHOUT_USERDATA
filename = tmp_path / os.path.basename(sourceFile)

shutil.copyfile(sourceFile, filename)

with importer.importFile(filename) as doc:
userdata.writeUserData(doc, USERDATA)

with importer.importFile(filename) as doc:
data = userdata.readUserData(doc)
assert data == USERDATA


def test_write_userdata_BytesIO():
""" Test writing (and re-reading) user data from a non-file stream
without existing user data.
"""
sourceFile = FILE_WITHOUT_USERDATA

with open(sourceFile, 'rb') as f:
stream = BytesIO(f.read())

with importer.openFile(stream) as doc:
userdata.writeUserData(doc, USERDATA)

data = userdata.readUserData(doc)
assert data == USERDATA


def test_larger_userdata(tmp_path):
""" Test overwriting an existing set of user data with a larger one.
"""
sourceFile = FILE_WITH_USERDATA
filename = tmp_path / os.path.basename(sourceFile)
shutil.copyfile(sourceFile, filename)

originalSize = os.path.getsize(filename)

with importer.importFile(filename) as doc:
userdata.writeUserData(doc, LARGER_USERDATA)

with importer.importFile(filename) as doc:
data = userdata.readUserData(doc)
assert data == LARGER_USERDATA

assert originalSize < os.path.getsize(filename)


def test_smaller_userdata(tmp_path):
""" Test overwriting an existing set of user data with a smaller one.
"""
sourceFile = FILE_WITH_USERDATA
filename = tmp_path / os.path.basename(sourceFile)
shutil.copyfile(sourceFile, filename)

originalSize = os.path.getsize(filename)

with importer.importFile(filename) as doc:
userdata.writeUserData(doc, SMALLER_USERDATA)

with importer.importFile(filename) as doc:
data = userdata.readUserData(doc)
assert data == SMALLER_USERDATA

assert originalSize == os.path.getsize(filename)
Binary file added testing/with_userdata.IDE
Binary file not shown.
Loading