diff --git a/idelib/dataset.py b/idelib/dataset.py index 521e2a1..176f13e 100644 --- a/idelib/dataset.py +++ b/idelib/dataset.py @@ -49,6 +49,7 @@ from collections.abc import Iterable, Sequence from datetime import datetime from threading import Lock +from typing import Any, Dict, Optional import warnings import os.path @@ -260,6 +261,11 @@ def __init__(self, stream, name=None, quiet=True, attributes=None): self.loading = True self.filename = getattr(stream, "name", None) + # For keeping user-defined data + self._userdata: Optional[Dict[str, Any]] = None + self._userdataOffset: Optional[int] = None + self._filesize: Optional[int] = None + self._channelDataLock = Lock() # Subsets: used when importing multiple files into the same dataset. diff --git a/idelib/schemata/mide_ide.xml b/idelib/schemata/mide_ide.xml index d975dcb..637cdc8 100644 --- a/idelib/schemata/mide_ide.xml +++ b/idelib/schemata/mide_ide.xml @@ -404,4 +404,29 @@ Statistical data for this block's payload consisting of 3 datapoints (min, mean, max) per subchannel. They are organized as [[sc0min] [sc1min] [sc2min] ...] [[sc0mean] [sc1mean] [sc2mean] ...] [[sc0max] [sc1max] [sc2max] ...]. The format and representation of the stat data exactly matches that of the input samples; that is, if the input samples are uint16_t, each stat entry is also a uint16_t. Super-optional diagnostic element indicating the latency between data acquisition and transfer to the output media. The exact meaning of this value is device-dependent, but may serve as a general indicator of excess activity load, retransmission or congestion (for transmission media) or media wear (for recording media). + + + + An offset (in microseconds) for all sample times. + Application-specific data describing GUI settings, etc. + User-created highlights, marking particular spans and points in time + + The annotation's ID, arbitrary but unique to the file. + A name and/or notes about the annotation. + Annotation start time. + Annotation end time. If present, the annotation will cover a time span. + Application-specific data describing the visual representation of the annotation. + + + + + + + + + + + + + Position of the beginning of the user-defined metadata. This should be the last element in the file. diff --git a/idelib/userdata.py b/idelib/userdata.py new file mode 100644 index 0000000..738494d --- /dev/null +++ b/idelib/userdata.py @@ -0,0 +1,190 @@ +""" +Functions for reading and writing application-specific data from/to the end +of IDE files. This data is intended primarily to retain user preferences for +the display of the `Dataset`. +""" + +import errno +import os.path +import logging +from typing import Any, Dict, Optional, Tuple, Union + +from .dataset import Dataset + +#=============================================================================== +# +#=============================================================================== + +MIN_VOID_SIZE = 9 + +logger = logging.getLogger('idelib') + +#=============================================================================== +# +#=============================================================================== + +def getUserDataPos(dataset: Dataset, + refresh: bool = False) -> Tuple[bool, int, int]: + """ Get the offset of the start of the user data. + + :param dataset: The `Dataset` in which to locate the user data. + :param refresh:: If `True`, ignore any cached values and re-read + from the file. + :return: A tuple containing a bool (wheter or not data exists), + the offset of the user data, and the total length of the file. + Offset and filesize will typically be the same if there is no + user data. + """ + if not refresh and dataset._userdataOffset and dataset._filesize: + return bool(dataset._userdata), dataset._userdataOffset, dataset._filesize + + doc = dataset.ebmldoc + fs = doc.stream + hasdata = False + + oldpos = fs.tell() + filesize = fs.seek(0, os.SEEK_END) + offset = filesize + + # The UserDataOffset is a known, fixed size + example = doc.schema['UserDataOffset'].encode(1, length=8, lengthSize=8) + header = example[:-8] + + try: + # UserDataOffset *should* be right at the end of the file, but + # don't assume so. Start some bytes back and find the header. + pos = offset - int(len(example) * 1.5) + fs.seek(pos, os.SEEK_SET) + chunk = fs.read() + if header in chunk: + fs.seek(pos + chunk.index(header), os.SEEK_SET) + el, _next = doc.parseElement(fs) + offset = el.value + hasdata = True + + except IndexError: + # Problem with parsed chunk; shouldn't happen. + pass + + finally: + fs.seek(oldpos, os.SEEK_SET) + + dataset._userdataOffset = offset + dataset._filesize = filesize + return hasdata, offset, filesize + + +#=============================================================================== +# +#=============================================================================== + +def readUserData(dataset: Dataset, + refresh: bool = False) -> Union[Dict[str, Any], None]: + """ Read application-specific user data from the end of an IDE file. + + :param dataset: The `Dataset` from which to read the user data. + :param refresh:: If `True`, ignore any cached values and re-read + from the file. + :return: A dictionary of user data, or `None` if no user data + could be read from the file (e.g., none exists). + """ + if not refresh and dataset._userdataOffset and dataset._filesize: + return dataset._userdata + + doc = dataset.ebmldoc + fs = doc.stream + oldpos = fs.tell() + + hasdata, offset, filesize = getUserDataPos(dataset, refresh=refresh) + + if not hasdata: + logger.debug('No user data found') + dataset._userdata = None + return None + + try: + fs.seek(offset, os.SEEK_SET) + data, _next = doc.parseElement(fs) + dump = data.dump() + dataset._userdata = dump + return dump + + finally: + fs.seek(oldpos, os.SEEK_SET) + + +#=============================================================================== +# +#=============================================================================== + +def writeUserData(dataset: Dataset, + userdata: Dict[str, Any], + refresh: bool = False): + """ Write user data to the end of an IDE file. + + :param dataset: The `Dataset` from which to read the user data. + :param userdata: A dictionary of user data, or `None` to remove + existing user data. Note that the file will not get smaller if + the user data is removed or the new set of user data is smaller + than existing user data); it is just overwritten with null data + (an EBML `Void` element). + :param refresh: If `True`, ignore any cached values and find the + position in the file to which to write. + """ + schema = dataset.ebmldoc.schema + fs = dataset.ebmldoc.stream + oldpos = fs.tell() + + try: + hasdata, offset, filesize = getUserDataPos(dataset, refresh=refresh) + + if userdata: + # User data consists of a `UserData` element, a `Void`, and `UserDataOffset` + dataBin = schema.encodes({'UserData': userdata or {}}) + offsetBin = schema['UserDataOffset'].encode(offset, length=8, lengthSize=8) + newsize = (len(offsetBin) + len(dataBin) + offset + MIN_VOID_SIZE) + voidBin = schema['Void'].encode(None, length=max(0, filesize - newsize), + lengthSize=8) + else: + # No new userdata, just write 'Void' over any existing userdata + # (or do nothing if there is no existing userdata) + dataset._userdata = userdata + if not hasdata: + return + newsize = filesize + dataBin = offsetBin = b'' + voidBin = schema['Void'].encode(None, length=max(0, filesize - MIN_VOID_SIZE)) + + userblob = dataBin + voidBin + offsetBin + + try: + writable = fs.writable() + except AttributeError: + # In case file-like stream doesn't implement `writable()` + # (e.g., older `ebmlite.threaded_file.ThreadAwareFile`) + mode = getattr(fs, 'mode', '') + writable = '+' in mode or 'w' in mode + + if not writable: + # File/stream is read-only; attempt to create a new file stream. + if not getattr(fs, 'name', None): + raise IOError(errno.EACCES, + f'Could not write user data; ' + f'Dataset stream not writable and has no filename') + + with open(fs.name, 'br+') as newfs: + logger.debug(f'(userdata) Dataset stream read only (mode {fs.mode!r}), ' + 'using new stream') + newfs.seek(offset, os.SEEK_SET) + newfs.write(userblob) + + else: + fs.seek(offset, os.SEEK_SET) + fs.write(userblob) + + dataset._userdata = userdata + logger.debug(f'(userdata) Wrote {len(userblob)} bytes to {dataset} ' + f'(file was {filesize}, now {newsize})') + + finally: + fs.seek(oldpos, os.SEEK_SET) diff --git a/testing/file_streams.py b/testing/file_streams.py index 6741eff..85e5eba 100644 --- a/testing/file_streams.py +++ b/testing/file_streams.py @@ -2,7 +2,9 @@ FILES = [('./testing/SSX70065.IDE', 'rb'), ('./testing/SSX66115.IDE', 'rb'), - ('./test.ide', 'rb')] + ('./test.ide', 'rb'), + ('./testing/SSX_Data.IDE', 'rb'), + ('./testing/with_userdata.IDE', 'rb')] FILE_DICT = {} for fName, mode in FILES: diff --git a/testing/test_userdata.py b/testing/test_userdata.py new file mode 100644 index 0000000..a1874d9 --- /dev/null +++ b/testing/test_userdata.py @@ -0,0 +1,133 @@ +""" +Test reading/writing user data to/from IDE files and streams (files and +file-like). +""" + +import pytest # type: ignore + +from io import BytesIO +import os.path +import shutil + +from idelib import importer +from idelib import userdata + +from testing import file_streams + + +# ============================================================================== +# +# ============================================================================== + +USERDATA = { + 'TimebaseOffset': 12345, + 'WindowLayout': bytearray(b'bogus binary blob'), + 'TimeBaseUTC': [1712769739] +} + +SMALLER_USERDATA = { + 'TimebaseOffset': 54321, +} + +LARGER_USERDATA = { + 'TimebaseOffset': 56789, + 'WindowLayout': bytearray(b'bogus binary blob'), + 'AnnotationList': { + 'Annotation': [{'AnnotationID': 42, 'AnnotationStartTime': 101},], + }, + 'TimeBaseUTC': [35096400] +} + +FILE_WITHOUT_USERDATA = './testing/SSX_Data.IDE' +FILE_WITH_USERDATA = './testing/with_userdata.IDE' + + +# ============================================================================== +# +# ============================================================================== + +def test_read_userdata(): + """ Test reading user data. + """ + doc = importer.openFile(file_streams.makeStreamLike(FILE_WITH_USERDATA)) + data = userdata.readUserData(doc) + assert data == USERDATA + + +def test_read_userdata_no_userdata(): + """ Test reading user data from a file without user data. + """ + doc = importer.openFile(file_streams.makeStreamLike(FILE_WITHOUT_USERDATA)) + data = userdata.readUserData(doc) + assert data is None + + +def test_write_userdata(tmp_path): + """ Test writing (and re-reading) user data to a file without existing + user data. + """ + sourceFile = FILE_WITHOUT_USERDATA + filename = tmp_path / os.path.basename(sourceFile) + + shutil.copyfile(sourceFile, filename) + + with importer.importFile(filename) as doc: + userdata.writeUserData(doc, USERDATA) + + with importer.importFile(filename) as doc: + data = userdata.readUserData(doc) + assert data == USERDATA + + +def test_write_userdata_BytesIO(): + """ Test writing (and re-reading) user data from a non-file stream + without existing user data. + """ + sourceFile = FILE_WITHOUT_USERDATA + + with open(sourceFile, 'rb') as f: + stream = BytesIO(f.read()) + + with importer.openFile(stream) as doc: + userdata.writeUserData(doc, USERDATA) + + data = userdata.readUserData(doc) + assert data == USERDATA + + +def test_larger_userdata(tmp_path): + """ Test overwriting an existing set of user data with a larger one. + """ + sourceFile = FILE_WITH_USERDATA + filename = tmp_path / os.path.basename(sourceFile) + shutil.copyfile(sourceFile, filename) + + originalSize = os.path.getsize(filename) + + with importer.importFile(filename) as doc: + userdata.writeUserData(doc, LARGER_USERDATA) + + with importer.importFile(filename) as doc: + data = userdata.readUserData(doc) + assert data == LARGER_USERDATA + + assert originalSize < os.path.getsize(filename) + + +def test_smaller_userdata(tmp_path): + """ Test overwriting an existing set of user data with a smaller one. + """ + sourceFile = FILE_WITH_USERDATA + filename = tmp_path / os.path.basename(sourceFile) + shutil.copyfile(sourceFile, filename) + + originalSize = os.path.getsize(filename) + + with importer.importFile(filename) as doc: + userdata.writeUserData(doc, SMALLER_USERDATA) + + with importer.importFile(filename) as doc: + data = userdata.readUserData(doc) + assert data == SMALLER_USERDATA + + assert originalSize == os.path.getsize(filename) diff --git a/testing/with_userdata.IDE b/testing/with_userdata.IDE new file mode 100644 index 0000000..36cd0ab Binary files /dev/null and b/testing/with_userdata.IDE differ