From f6c475ed8fb1e1f75467cc1c12947300529ca678 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 19 Jun 2024 17:18:23 +0200 Subject: [PATCH] Store tables as PARQUET files (#419) * Ensure correct boolean dtype in misc table index * Remove unneeded code * Use pyarrow to read CSV files * Start debugging * Continue debugging * Fix tests * Remove unneeded code * Improve code * Fix test for older pandas versions * Exclude benchmark folder from tests * Test other implementation * Remove support for Python 3.8 * Store tables as PARQUET * Cleanup code + Table.levels * Use dict for CSV dtype mappings * Rename helper function * Simplify code * Add helper function for CSV schema * Fix typo in docstring * Remove levels attribute * Merge stash * Remove levels from doctest output * Convert method to property * Add comment * Simplify code * Simplify code * Add test for md5sum of parquet file * Switch back to snappy compression * Fix linter * Store hash inside parquet file * Fix code coverage * Stay with CSV as default table format * Test pyarrow==15.0.2 * Test pyarrow==14.0.2 * Test pyarrow==13.0 * Test pyarrow==12.0 * Test pyarrow==11.0 * Test pyarrow==10.0 * Test pyarrow==10.0.1 * Require pyarrow>=10.0.1 * Test pandas<2.1.0 * Add explanations for requirements * Add test using minimum pip requirements * Fix alphabetical order of requirements * Enhance test matrix definition * Debug failing test * Test different hash method * Use different hashing approach * Require pandas>=2.2.0 and fix hashes * CI: re-enable all minimal requriements * Hashing algorithm to respect row order * Clean up tests * Fix minimum install of audiofile * Fix docstring of Table.load() * Fix docstring of Database.load() * Ensure correct order in time when storing tables * Simplify comment * Add docstring to _load_pickle() * Fix _save_parquet() docstring * Improve comment in _dataframe_hash() * Document arguments of test_table_update... * Relax test for table saving order * Update audformat/core/table.py Co-authored-by: ChristianGeng * Revert "Update audformat/core/table.py" This reverts commit 3f21e3c41ae42cf8c37d01175bc82a5ea0b5fbea. * Use numpy representation for hashing (#436) * Use numpy representation for hashing * Enable tests and require pandas>=1.4.1 * Use numpy<2.0 in minimum test * Skip doctests in minimum * Require pandas>=2.1.0 * Require numpy<=2.0.0 in minimum test * Remove print statements * Fix numpy<2.0.0 for minimum test * Remove max_rows argument * Simplify code * Use test class * CI: remove pyarrow from branch to start test --------- Co-authored-by: ChristianGeng --- .github/workflows/test.yml | 15 +- audformat/core/database.py | 6 +- audformat/core/define.py | 3 + audformat/core/table.py | 532 ++++++++++++++++++++++++++++++------- audformat/core/utils.py | 59 ++-- pyproject.toml | 7 +- tests/test_database.py | 29 +- tests/test_misc_table.py | 7 + tests/test_table.py | 380 ++++++++++++++++++++++++-- 9 files changed, 906 insertions(+), 132 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 021def1d..9473ffc4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,12 +15,13 @@ jobs: os: [ ubuntu-20.04, windows-latest, macOS-latest ] python-version: [ '3.10' ] include: - - os: ubuntu-latest - python-version: '3.8' - os: ubuntu-latest python-version: '3.9' - os: ubuntu-latest python-version: '3.11' + - os: ubuntu-latest + python-version: '3.9' + requirements: 'minimum' steps: - uses: actions/checkout@v4 @@ -50,6 +51,16 @@ jobs: pip install -r requirements.txt pip install -r tests/requirements.txt + - name: Downgrade to minimum dependencies + run: | + pip install "audeer==2.0.0" + pip install "audiofile==0.4.0" + pip install "numpy<2.0.0" + pip install "pandas==2.1.0" + pip install "pyarrow==10.0.1" + pip install "pyyaml==5.4.1" + if: matrix.requirements == 'minimum' + - name: Test with pytest run: | python -m pytest diff --git a/audformat/core/database.py b/audformat/core/database.py index 5eb72e68..2772f0a4 100644 --- a/audformat/core/database.py +++ b/audformat/core/database.py @@ -979,7 +979,7 @@ def save( r"""Save database to disk. Creates a header ``/.yaml`` - and for every table a file ``/..[csv,pkl]``. + and for every table a file ``/..[csv,parquet,pkl]``. Existing files will be overwritten. If ``update_other_formats`` is provided, @@ -1383,7 +1383,7 @@ def load( r"""Load database from disk. Expects a header ``/.yaml`` - and for every table a file ``/..[csv|pkl]`` + and for every table a file ``/..[csv|parquet|pkl]`` Media files should be located under ``root``. Args: @@ -1409,7 +1409,7 @@ def load( Raises: FileNotFoundError: if the database header file cannot be found under ``root`` - RuntimeError: if a CSV table file is newer + RuntimeError: if a CSV or PARQUET table file is newer than the corresponding PKL file """ diff --git a/audformat/core/define.py b/audformat/core/define.py index 37cffa4c..addd9f79 100644 --- a/audformat/core/define.py +++ b/audformat/core/define.py @@ -337,6 +337,9 @@ class TableStorageFormat(DefineBase): CSV = "csv" """File extension for tables stored in CSV format.""" + PARQUET = "parquet" + """File extension for tables stored in PARQUET format.""" + PICKLE = "pkl" """File extension for tables stored in PKL format.""" diff --git a/audformat/core/table.py b/audformat/core/table.py index 18a4b863..30924953 100644 --- a/audformat/core/table.py +++ b/audformat/core/table.py @@ -1,11 +1,15 @@ from __future__ import annotations # allow typing without string import copy +import hashlib import os import pickle import typing import pandas as pd +import pyarrow as pa +import pyarrow.csv as csv +import pyarrow.parquet as parquet import audeer @@ -14,8 +18,6 @@ from audformat.core.column import Column from audformat.core.common import HeaderBase from audformat.core.common import HeaderDict -from audformat.core.common import to_audformat_dtype -from audformat.core.common import to_pandas_dtype from audformat.core.errors import BadIdError from audformat.core.index import filewise_index from audformat.core.index import index_type @@ -442,10 +444,10 @@ def load( ): r"""Load table data from disk. - Tables can be stored as PKL and/or CSV files to disk. - If both files are present + Tables are stored as CSV, PARQUET and/or PKL files to disk. + If the PKL file exists, it will load the PKL file - as long as its modification date is newer, + as long as its modification date is the newest, otherwise it will raise an error and ask to delete one of the files. @@ -454,48 +456,64 @@ def load( Raises: RuntimeError: if table file(s) are missing - RuntimeError: if CSV file is newer than PKL file + RuntimeError: if CSV or PARQUET file is newer than PKL file """ path = audeer.path(path) - pkl_file = f"{path}.{define.TableStorageFormat.PICKLE}" csv_file = f"{path}.{define.TableStorageFormat.CSV}" + parquet_file = f"{path}.{define.TableStorageFormat.PARQUET}" + pkl_file = f"{path}.{define.TableStorageFormat.PICKLE}" - if not os.path.exists(pkl_file) and not os.path.exists(csv_file): + if ( + not os.path.exists(pkl_file) + and not os.path.exists(csv_file) + and not os.path.exists(parquet_file) + ): raise RuntimeError( - f"No file found for table with path '{path}.{{pkl|csv}}'" + f"No file found for table with path '{path}.{{csv|parquet|pkl}}'" ) - # Load from PKL if file exists and is newer then CSV file. - # If both are written by Database.save() this is the case + # Load from PKL if file exists + # and is newer than CSV or PARQUET file. + # If files are written by Database.save() + # this is always the case # as it stores first the PKL file pickled = False if os.path.exists(pkl_file): - if os.path.exists(csv_file) and os.path.getmtime( - csv_file - ) > os.path.getmtime(pkl_file): - raise RuntimeError( - f"The table CSV file '{csv_file}' is newer " - f"than the table PKL file '{pkl_file}'. " - "If you want to load from the CSV file, " - "please delete the PKL file. " - "If you want to load from the PKL file, " - "please delete the CSV file." - ) + for file in [parquet_file, csv_file]: + if os.path.exists(file) and os.path.getmtime(file) > os.path.getmtime( + pkl_file + ): + ext = audeer.file_extension(file).upper() + raise RuntimeError( + f"The table {ext} file '{file}' is newer " + f"than the table PKL file '{pkl_file}'. " + f"If you want to load from the {ext} file, " + "please delete the PKL file. " + "If you want to load from the PKL file, " + f"please delete the {ext} file." + ) pickled = True if pickled: try: self._load_pickled(pkl_file) except (AttributeError, ValueError, EOFError) as ex: - # if exception is raised (e.g. unsupported pickle protocol) - # try to load from CSV and save it again + # If exception is raised + # (e.g. unsupported pickle protocol) + # try to load from PARQUET or CSV + # and save it again # otherwise raise error - if os.path.exists(csv_file): + if os.path.exists(parquet_file): + self._load_parquet(parquet_file) + self._save_pickled(pkl_file) + elif os.path.exists(csv_file): self._load_csv(csv_file) self._save_pickled(pkl_file) else: raise ex + elif os.path.exists(parquet_file): + self._load_parquet(parquet_file) else: self._load_csv(csv_file) @@ -581,17 +599,34 @@ def save( path = audeer.path(path) define.TableStorageFormat._assert_has_attribute_value(storage_format) - pickle_file = path + f".{define.TableStorageFormat.PICKLE}" - csv_file = path + f".{define.TableStorageFormat.CSV}" - - # Make sure the CSV file is always written first - # as it is expected to be older by load() + csv_file = f"{path}.{define.TableStorageFormat.CSV}" + parquet_file = f"{path}.{define.TableStorageFormat.PARQUET}" + pickle_file = f"{path}.{define.TableStorageFormat.PICKLE}" + + # Ensure the following storage order: + # 1. PARQUET file + # 2. CSV file + # 3. PKL file + # The PKl is expected to be the oldest by load(), + # the order of PARQUET and CSV file + # is only a convention for now. if storage_format == define.TableStorageFormat.PICKLE: + if update_other_formats and os.path.exists(parquet_file): + self._save_parquet(parquet_file) if update_other_formats and os.path.exists(csv_file): self._save_csv(csv_file) self._save_pickled(pickle_file) + if storage_format == define.TableStorageFormat.PARQUET: + self._save_parquet(parquet_file) + if update_other_formats and os.path.exists(csv_file): + self._save_csv(csv_file) + if update_other_formats and os.path.exists(pickle_file): + self._save_pickled(pickle_file) + if storage_format == define.TableStorageFormat.CSV: + if update_other_formats and os.path.exists(parquet_file): + self._save_parquet(parquet_file) self._save_csv(csv_file) if update_other_formats and os.path.exists(pickle_file): self._save_pickled(pickle_file) @@ -800,73 +835,75 @@ def _get_by_index( # Returns `df, df_is_copy` raise NotImplementedError() + @property + def _levels_and_dtypes(self) -> typing.Dict[str, str]: + r"""Levels and dtypes of index columns. + + Returns: + dictionary with index levels (column names) + and associated audformat data type + + """ + # The returned dictionary is used + # to infer index column names and dtypes + # when reading CSV files. + raise NotImplementedError() # pragma: no cover + def _load_csv(self, path: str): - schemes = self.db.schemes - converters = {} - dtypes = {} - - if hasattr(self, "type"): - # filewise or segmented table - dtypes[define.IndexField.FILE] = define.DataType.STRING - if self.type == define.IndexType.SEGMENTED: - dtypes[define.IndexField.START] = define.DataType.TIME - dtypes[define.IndexField.END] = define.DataType.TIME - else: - # misc table - dtypes = self.levels + r"""Load table from CSV file. - # index columns - levels = list(dtypes) - dtypes = {level: to_pandas_dtype(dtype) for level, dtype in dtypes.items()} + The loaded table is stored under ``self._df``. - # other columns - columns = list(self.columns) - for column_id, column in self.columns.items(): - if column.scheme_id is not None: - dtypes[column_id] = schemes[column.scheme_id].to_pandas_dtype() - else: - dtypes[column_id] = "object" - - # replace dtype with converter for dates or timestamps - dtypes_wo_converters = {} - for column_id, dtype in dtypes.items(): - if dtype == "datetime64[ns]": - converters[column_id] = lambda x: pd.to_datetime(x) - elif dtype == "timedelta64[ns]": - converters[column_id] = lambda x: pd.to_timedelta(x) - else: - dtypes_wo_converters[column_id] = dtype + Loading a CSV file with :func:`pandas.read_csv()` is slower + than the method applied here. + We first load the CSV file as a :class:`pyarrow.Table` + and convert it to a dataframe afterwards. + + Args: + path: path to table, including file extension - # read csv - df = pd.read_csv( + """ + levels = list(self._levels_and_dtypes.keys()) + columns = list(self.columns.keys()) + table = csv.read_csv( path, - usecols=levels + columns, - dtype=dtypes_wo_converters, - index_col=levels, - converters=converters, - float_precision="round_trip", + read_options=csv.ReadOptions( + column_names=levels + columns, + skip_rows=1, + ), + convert_options=csv.ConvertOptions( + column_types=self._pyarrow_csv_schema(), + strings_can_be_null=True, + ), ) + df = self._pyarrow_table_to_dataframe(table, from_csv=True) + + self._df = df + + def _load_parquet(self, path: str): + r"""Load table from PARQUET file. - # For an empty CSV file - # converters will not set the correct dtype - # and we need to correct it manually - if len(df) == 0: - # fix index - converter_dtypes = { - level: dtype - for level, dtype in dtypes.items() - if level in converters and level in levels - } - df.index = utils.set_index_dtypes(df.index, converter_dtypes) - # fix columns - for column_id in columns: - if column_id in converters: - dtype = dtypes[column_id] - df[column_id] = df[column_id].astype(dtype) + The loaded table is stored under ``self._df``. + + Args: + path: path to table, including file extension + + """ + # Read PARQUET file + table = parquet.read_table(path) + df = self._pyarrow_table_to_dataframe(table) self._df = df def _load_pickled(self, path: str): + r"""Load table from PKL file. + + The loaded table is stored under ``self._df``. + + Args: + path: path to table, including file extension + + """ # Older versions of audformat used xz compression # which produced smaller files, # but was slower. @@ -891,14 +928,226 @@ def _load_pickled(self, path: str): self._df = df + def _pyarrow_convert_dtypes( + self, + df: pd.DataFrame, + *, + convert_all: bool = False, + ) -> pd.DataFrame: + r"""Convert dtypes that are not handled by pyarrow. + + This adjusts dtypes in a dataframe, + that could not be set correctly + when converting to the dataframe + from pyarrow. + + Args: + df: dataframe, + convert_all: if ``False``, + converts all columns with + ``"object"`` audformat dtype, + and all columns with a scheme with labels. + If ``"True"``, + it converts additionally all columns with + ``"bool"``, ``"int"``, and ``"time"`` audformat dtypes + + Returns: + dataframe with converted dtypes + + """ + # Collect columns with dtypes, + # that cannot directly be converted + # from pyarrow to pandas + bool_columns = [] + int_columns = [] + time_columns = [] + object_columns = [] + + # Collect columns + # with scheme labels + labeled_columns = [] + + # Collect columns, + # belonging to the table index + # (not the index of the provided dataframe) + index_columns = [] + + # --- Index --- + index_columns += list(self._levels_and_dtypes.keys()) + for level, dtype in self._levels_and_dtypes.items(): + if dtype == define.DataType.BOOL: + bool_columns.append(level) + elif dtype == define.DataType.INTEGER: + int_columns.append(level) + elif dtype == define.DataType.TIME: + time_columns.append(level) + elif dtype == define.DataType.OBJECT: + object_columns.append(level) + + # --- Columns --- + for column_id, column in self.columns.items(): + if column.scheme_id is not None: + scheme = self.db.schemes[column.scheme_id] + if scheme.labels is not None: + labeled_columns.append(column_id) + elif scheme.dtype == define.DataType.BOOL: + bool_columns.append(column_id) + elif scheme.dtype == define.DataType.INTEGER: + int_columns.append(column_id) + elif scheme.dtype == define.DataType.TIME: + time_columns.append(column_id) + elif scheme.dtype == define.DataType.OBJECT: + object_columns.append(column_id) + else: + # No scheme defaults to `object` dtype + object_columns.append(column_id) + + if convert_all: + for column in bool_columns: + df[column] = df[column].astype("boolean") + for column in int_columns: + df[column] = df[column].astype("Int64") + for column in time_columns: + df[column] = df[column].astype("timedelta64[ns]") + for column in object_columns: + df[column] = df[column].astype("object") + df[column] = df[column].replace(pd.NA, None) + for column in labeled_columns: + scheme = self.db.schemes[self.columns[column].scheme_id] + labels = scheme._labels_to_list() + if len(labels) > 0 and isinstance(labels[0], int): + # allow nullable + labels = pd.array(labels, dtype="int64") + dtype = pd.api.types.CategoricalDtype( + categories=labels, + ordered=False, + ) + df[column] = df[column].astype(dtype) + return df + + def _pyarrow_csv_schema(self) -> pa.Schema: + r"""Data type mapping for reading CSV file with pyarrow. + + This provides a schema, + defining pyarrow dtypes + for the columns of a CSV file. + + The dtypes are extracted from the audformat schemes, + and converted to the pyarrow dtypes. + + Returns: + pyarrow schema for reading a CSV file + + """ + # Mapping from audformat to pyarrow dtypes + to_pyarrow_dtype = { + define.DataType.BOOL: pa.bool_(), + define.DataType.DATE: pa.timestamp("ns"), + define.DataType.FLOAT: pa.float64(), + define.DataType.INTEGER: pa.int64(), + define.DataType.STRING: pa.string(), + # A better fitting type would be `pa.duration("ns")`, + # but this is not yet supported + # when reading CSV files + define.DataType.TIME: pa.string(), + } + + # Collect pyarrow dtypes + # of all columns, + # including index columns. + # The dtypes are stored as a tuple + # ``(column, dtype)``, + # and are used to create + # the pyarrow.Schema + # used when reading the CSV file + pyarrow_dtypes = [] + # Index + for level, dtype in self._levels_and_dtypes.items(): + if dtype in to_pyarrow_dtype: + pyarrow_dtypes.append((level, to_pyarrow_dtype[dtype])) + # Columns + for column_id, column in self.columns.items(): + if column.scheme_id is not None: + dtype = self.db.schemes[column.scheme_id].dtype + if dtype in to_pyarrow_dtype: + pyarrow_dtypes.append((column_id, to_pyarrow_dtype[dtype])) + + return pa.schema(pyarrow_dtypes) + + def _pyarrow_table_to_dataframe( + self, + table: pa.Table, + *, + from_csv: bool = False, + ) -> pd.DataFrame: + r"""Convert pyarrow table to pandas dataframe. + + Args: + table: pyarrow table + from_csv: if ``True`` it assumes, + that ``table`` was created by reading a CSV file, + and it will convert all needed dtypes + + Returns: + dataframe + + """ + df = table.to_pandas( + deduplicate_objects=False, + types_mapper={ + pa.string(): pd.StringDtype(), + }.get, # we have to provide a callable, not a dict + ) + # Adjust dtypes and set index + df = self._pyarrow_convert_dtypes(df, convert_all=from_csv) + index_columns = list(self._levels_and_dtypes.keys()) + df = self._set_index(df, index_columns) + return df + def _save_csv(self, path: str): # Load table before opening CSV file # to avoid creating a CSV file # that is newer than the PKL file - df = self.df + df = self.df # loads table with open(path, "w") as fp: df.to_csv(fp, encoding="utf-8") + def _save_parquet(self, path: str): + r"""Save table as PARQUET file. + + A PARQUET file is written in a non-deterministic way, + and we cannot track changes by its MD5 sum. + To make changes trackable, + we store a hash in its metadata. + + The hash is calculated from the pyarrow schema + (to track column names and data types) + and the pandas dataframe + (to track values and order or rows), + from which the PARQUET file is generated. + + The hash of the PARQUET file can then be read by:: + + pyarrow.parquet.read_schema(path).metadata[b"hash"].decode() + + Args: + path: path, including file extension + + """ + table = pa.Table.from_pandas(self.df.reset_index(), preserve_index=False) + + # Create hash of table + table_hash = hashlib.md5() + table_hash.update(_schema_hash(table)) + table_hash.update(_dataframe_hash(self.df)) + + # Store in metadata of file, + # see https://stackoverflow.com/a/58978449 + metadata = {"hash": table_hash.hexdigest()} + table = table.replace_schema_metadata({**metadata, **table.schema.metadata}) + + parquet.write_table(table, path, compression="snappy") + def _save_pickled(self, path: str): self.df.to_pickle( path, @@ -939,6 +1188,31 @@ def _set_column(self, column_id: str, column: Column) -> Column: return column + def _set_index(self, df: pd.DataFrame, columns: typing.Sequence) -> pd.DataFrame: + r"""Set columns as index. + + Setting of index columns is performed inplace! + + Args: + df: dataframe + columns: columns to be set as index of dataframe + + Returns: + updated dataframe + + """ + # When assigning more than one column, + # a MultiIndex is assigned. + # Setting a MultiIndex does not always preserve pandas dtypes, + # so we need to set them manually. + # + if len(columns) > 1: + dtypes = {column: df[column].dtype for column in columns} + df.set_index(columns, inplace=True) + if len(columns) > 1: + df.index = utils.set_index_dtypes(df.index, dtypes) + return df + class MiscTable(Base): r"""Miscellaneous table. @@ -1084,8 +1358,7 @@ def __init__( f"{levels}, " f"but names must be non-empty and unique." ) - - dtypes = [to_audformat_dtype(dtype) for dtype in utils._dtypes(index)] + dtypes = utils._audformat_dtypes(index) self.levels = {level: dtype for level, dtype in zip(levels, dtypes)} super().__init__( @@ -1099,6 +1372,17 @@ def __init__( def _get_by_index(self, index: pd.Index) -> pd.DataFrame: return self.df.loc[index] + @property + def _levels_and_dtypes(self) -> typing.Dict[str, str]: + r"""Levels and dtypes of index columns. + + Returns: + dictionary with index levels (column names) + and associated audformat data type + + """ + return self.levels + class Table(Base): r"""Table conform to :ref:`table specifications `. @@ -1499,6 +1783,22 @@ def _get_by_index( return result + @property + def _levels_and_dtypes(self) -> typing.Dict[str, str]: + r"""Levels and dtypes of index columns. + + Returns: + dictionary with index levels (column names) + and associated audformat data type + + """ + levels_and_dtypes = {} + levels_and_dtypes[define.IndexField.FILE] = define.DataType.STRING + if self.type == define.IndexType.SEGMENTED: + levels_and_dtypes[define.IndexField.START] = define.DataType.TIME + levels_and_dtypes[define.IndexField.END] = define.DataType.TIME + return levels_and_dtypes + def _assert_table_index( table: Base, @@ -1544,6 +1844,40 @@ def _assert_table_index( ) +def _dataframe_hash(df: pd.DataFrame) -> bytes: + """Hash a dataframe. + + The hash value takes into account: + + * index of dataframe + * values of the dataframe + * order of dataframe rows + + It does not consider: + + * column names of dataframe + * dtypes of dataframe + + Args: + df: dataframe + + Returns: + MD5 hash in bytes + + """ + md5 = hashlib.md5() + for _, y in df.reset_index().items(): + # Convert every column to a numpy array, + # and hash its string representation + if y.dtype == "Int64": + # Enforce consistent conversion to numpy.array + # for integers across different pandas versions + # (since pandas 2.2.x, Int64 is converted to float if it contains ) + y = y.astype("float") + md5.update(bytes(str(y.to_numpy()), "utf-8")) + return md5.digest() + + def _maybe_convert_dtype_to_string( index: pd.Index, ) -> pd.Index: @@ -1566,3 +1900,23 @@ def _maybe_update_scheme( for scheme in table.db.schemes.values(): if table._id == scheme.labels: scheme.replace_labels(table._id) + + +def _schema_hash(table: pa.Table) -> bytes: + r"""Hash pyarrow table schema. + + Args: + table: pyarrow table + + Returns: + MD5 hash in bytes + + """ + schema_str = table.schema.to_string( + # schema.metadata contains pandas related information, + # and the used pyarrow and pandas version, + # and needs to be excluded + show_field_metadata=False, + show_schema_metadata=False, + ) + return hashlib.md5(schema_str.encode()).digest() diff --git a/audformat/core/utils.py b/audformat/core/utils.py index 4e5d6015..fdb0b411 100644 --- a/audformat/core/utils.py +++ b/audformat/core/utils.py @@ -929,8 +929,7 @@ def is_index_alike( # check dtypes dtypes = set() for obj in objs: - ds = [to_audformat_dtype(dtype) for dtype in _dtypes(obj)] - dtypes.add(tuple(ds)) + dtypes.add(tuple(_audformat_dtypes(obj))) if len(dtypes) > 1: return False @@ -2017,7 +2016,7 @@ def _assert_index_alike( dtypes = [] for obj in objs: - ds = [to_audformat_dtype(dtype) for dtype in _dtypes(obj)] + ds = _audformat_dtypes(obj) dtypes.append(tuple(ds) if len(ds) > 1 else ds[0]) dtypes = list(dict.fromkeys(dtypes)) if len(dtypes) > 1: @@ -2026,12 +2025,18 @@ def _assert_index_alike( raise ValueError(msg) -def _dtypes(obj): - r"""List of dtypes of object.""" - if isinstance(obj, pd.MultiIndex): - return list(obj.dtypes) - else: - return [obj.dtype] +def _audformat_dtypes(index) -> typing.List[str]: + r"""List of audformat data types of index. + + Args: + index: index + + Returns: + audformat data types of index + + """ + dtypes = _pandas_dtypes(index) + return [to_audformat_dtype(dtype) for dtype in dtypes] def _is_same_dtype(d1, d2) -> bool: @@ -2051,12 +2056,20 @@ def _is_same_dtype(d1, d2) -> bool: return d1.name == d2.name -def _levels(obj): - r"""List of dtypes of object.""" - if isinstance(obj, pd.MultiIndex): - return list(obj.names) +def _levels(index) -> typing.List[str]: + r"""List of levels of index. + + Args: + index: index + + Returns: + index levels + + """ + if isinstance(index, pd.MultiIndex): + return list(index.names) else: - return [obj.name] + return [index.name] def _maybe_convert_filewise_index( @@ -2101,7 +2114,7 @@ def _maybe_convert_pandas_dtype( """ levels = _levels(index) - dtypes = _dtypes(index) + dtypes = _pandas_dtypes(index) # Ensure integers are stored as Int64 int_dtypes = { @@ -2152,3 +2165,19 @@ def _maybe_convert_single_level_multi_index( objs[idx].index = obj.index.get_level_values(0) return objs + + +def _pandas_dtypes(index) -> typing.List[typing.Any]: + r"""List of pandas dtypes of index. + + Args: + index: index + + Returns: + pandas data types of index + + """ + if isinstance(index, pd.MultiIndex): + return list(index.dtypes) + else: + return [index.dtype] diff --git a/pyproject.toml b/pyproject.toml index 3d263b93..13c329eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,21 +23,21 @@ classifiers = [ 'Operating System :: OS Independent', 'Programming Language :: Python', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', 'Topic :: Scientific/Engineering', ] -requires-python = '>=3.8' +requires-python = '>=3.9' # pandas >=2.1.0 dependencies = [ 'audeer >=2.0.0', 'audiofile >=0.4.0', 'iso-639', 'iso3166', 'oyaml', + 'pandas >=2.1.0', # for pyarrow -> timedelta conversion + 'pyarrow >=10.0.1', # for pyarrow strings in pandas 'pyyaml >=5.4.1', - 'pandas >=1.4.1', ] # Get version dynamically from git # (needs setuptools_scm tools config below) @@ -78,6 +78,7 @@ addopts = ''' --cov-report term-missing --cov-report xml --ignore=docs/ + --ignore=benchmarks/ ''' diff --git a/tests/test_database.py b/tests/test_database.py index dee4e658..67dfa2cf 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -446,6 +446,12 @@ def test_map_files(num_workers): @pytest.mark.parametrize( "db, storage_format, load_data, num_workers", [ + ( + audformat.testing.create_db(minimal=True), + audformat.define.TableStorageFormat.PARQUET, + False, + 1, + ), ( audformat.testing.create_db(minimal=True), audformat.define.TableStorageFormat.CSV, @@ -458,6 +464,12 @@ def test_map_files(num_workers): False, 1, ), + ( + audformat.testing.create_db(), + audformat.define.TableStorageFormat.PARQUET, + False, + 4, + ), ( audformat.testing.create_db(), audformat.define.TableStorageFormat.CSV, @@ -479,6 +491,11 @@ def test_map_files(num_workers): ], ) def test_save_and_load(tmpdir, db, storage_format, load_data, num_workers): + all_formats = audformat.define.TableStorageFormat._attribute_values() + non_cache_formats = [ + ext for ext in all_formats if ext != audformat.define.TableStorageFormat.PICKLE + ] + assert db.root is None audformat.testing.create_attachment_files(db, tmpdir) db.save( @@ -490,7 +507,7 @@ def test_save_and_load(tmpdir, db, storage_format, load_data, num_workers): expected_formats = [storage_format] for table_id in db.tables: - for ext in audformat.define.TableStorageFormat._attribute_values(): + for ext in all_formats: table_file = os.path.join(tmpdir, f"db.{table_id}.{ext}") if ext in expected_formats: assert os.path.exists(table_file) @@ -498,7 +515,7 @@ def test_save_and_load(tmpdir, db, storage_format, load_data, num_workers): assert not os.path.exists(table_file) # Test update other formats - if storage_format == audformat.define.TableStorageFormat.CSV and db.tables: + if storage_format in non_cache_formats and db.tables: db2 = audformat.testing.create_db() assert db2.root is None db2.save( @@ -508,7 +525,7 @@ def test_save_and_load(tmpdir, db, storage_format, load_data, num_workers): ) assert db.root == tmpdir - # Load prefers PKL files over CSV files, + # Load prefers PKL files, # which means we are loading the second database here db_load = audformat.Database.load( tmpdir, @@ -621,14 +638,16 @@ def test_save_and_load(tmpdir, db, storage_format, load_data, num_workers): # Test missing table if db.tables: table_id = list(db.tables)[0] - for ext in audformat.define.TableStorageFormat._attribute_values(): + for ext in all_formats: table_file = os.path.join(tmpdir, f"db.{table_id}.{ext}") if os.path.exists(table_file): os.remove(table_file) # The replace part handles Windows paths table_path = table_file[:-4].replace("\\", "\\\\") - error_msg = r"No file found for table with path " rf"'{table_path}.{{pkl|csv}}'" + error_msg = ( + r"No file found for table with path " rf"'{table_path}.{{csv|parquet|pkl}}'" + ) with pytest.raises(RuntimeError, match=error_msg): db = audformat.Database.load( tmpdir, diff --git a/tests/test_misc_table.py b/tests/test_misc_table.py index 7d9bfa41..683ad4a6 100644 --- a/tests/test_misc_table.py +++ b/tests/test_misc_table.py @@ -511,6 +511,13 @@ def test_dtype_column( "index_object, index_values, index_dtype, " "expected_pandas_dtype, expected_audformat_dtype", [ + ( + pd.Index, + ["0"], + None, + "object", + audformat.define.DataType.OBJECT, + ), ( pd.Index, [], diff --git a/tests/test_table.py b/tests/test_table.py index 348e455c..2b5536de 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -1,8 +1,12 @@ import os +import random +import re +import time import typing import numpy as np import pandas as pd +import pyarrow.parquet as parquet import pytest import audeer @@ -1118,22 +1122,25 @@ def test_load(tmpdir): with pytest.raises(EOFError): table_loaded.load(path_no_ext) - # repeat with CSV file as fall back - table.save( - path_no_ext, - storage_format=audformat.define.TableStorageFormat.CSV, - ) - with open(path_pkl, "wb"): - pass - table_loaded = audformat.Table() - table_loaded.columns = table.columns - table_loaded._db = table._db - table_loaded.load(path_no_ext) - pd.testing.assert_frame_equal(table.df, table_loaded.df) + # repeat with CSV|PARQUET file as fall back + for ext in [ + audformat.define.TableStorageFormat.CSV, + audformat.define.TableStorageFormat.PARQUET, + ]: + table.save(path_no_ext, storage_format=ext) + with open(path_pkl, "wb"): + pass + table_loaded = audformat.Table() + table_loaded.columns = table.columns + table_loaded._db = table._db + table_loaded.load(path_no_ext) + pd.testing.assert_frame_equal(table.df, table_loaded.df) - # check if pickle file was recovered from CSV - df = pd.read_pickle(path_pkl) - pd.testing.assert_frame_equal(table.df, df) + # check if pickle file was recovered + df = pd.read_pickle(path_pkl) + pd.testing.assert_frame_equal(table.df, df) + + os.remove(f"{path_no_ext}.{ext}") def test_load_old_pickle(tmpdir): @@ -1204,6 +1211,169 @@ def test_map(table, map): pd.testing.assert_frame_equal(result, expected) +@pytest.mark.parametrize("storage_format", ["csv", "parquet"]) +class TestHash: + r"""Test if PARQUET file hash changes with table. + + We store a MD5 sum associated with the dataframe, + that was used to create the file, + in the metadata of the PARQUET file. + Those MD5 sum is supposed to change, + if any of the table rows, (index) columns changes, + the data type of the entries changes, + or the name of a column changes. + + Args: + tmpdir: tmpdir fixture + storage_format: storage format of table file + + """ + + def db(self, tmpdir, storage_format): + r"""Create minimal database with scheme and table.""" + self.db_root = audeer.path(tmpdir, "db") + self.storage_format = storage_format + self.table_file = audeer.path(self.db_root, f"db.table.{storage_format}") + db = audformat.Database("mydb") + db.schemes["int"] = audformat.Scheme("int") + index = audformat.segmented_index(["f1", "f2"], [0, 1], [1, 2]) + db["table"] = audformat.Table(index) + db["table"]["column"] = audformat.Column(scheme_id="int") + db["table"]["column"].set([0, 1]) + db.save(self.db_root, storage_format=self.storage_format) + return db + + def md5(self) -> str: + r"""Get MD5 sum for table file.""" + if self.storage_format == "csv": + return audeer.md5(self.table_file) + elif self.storage_format == "parquet": + return parquet.read_schema(self.table_file).metadata[b"hash"].decode() + + def test_change_index(self, tmpdir, storage_format): + r"""Change table index.""" + db = self.db(tmpdir, storage_format) + md5 = self.md5() + index = audformat.segmented_index(["f1", "f1"], [0, 1], [1, 2]) + db["table"] = audformat.Table(index) + db["table"]["column"] = audformat.Column(scheme_id="int") + db["table"]["column"].set([0, 1]) + db.save(self.db_root, storage_format=self.storage_format) + assert self.md5() != md5 + + def test_change_column_name(self, tmpdir, storage_format): + r"""Change table column name.""" + db = self.db(tmpdir, storage_format) + md5 = self.md5() + index = audformat.segmented_index(["f1", "f2"], [0, 1], [1, 2]) + db["table"] = audformat.Table(index) + db["table"]["col"] = audformat.Column(scheme_id="int") + db["table"]["col"].set([0, 1]) + db.save(self.db_root, storage_format=self.storage_format) + assert self.md5() != md5 + + def test_change_column_order(self, tmpdir, storage_format): + r"""Change order of table columns.""" + db = self.db(tmpdir, storage_format) + index = audformat.segmented_index(["f1", "f2"], [0, 1], [1, 2]) + db["table"] = audformat.Table(index) + db["table"]["col1"] = audformat.Column(scheme_id="int") + db["table"]["col1"].set([0, 1]) + db["table"]["col2"] = audformat.Column(scheme_id="int") + db["table"]["col2"].set([0, 1]) + db.save(self.db_root, storage_format=self.storage_format) + md5 = self.md5() + db["table"] = audformat.Table(index) + db["table"]["col2"] = audformat.Column(scheme_id="int") + db["table"]["col2"].set([0, 1]) + db["table"]["col1"] = audformat.Column(scheme_id="int") + db["table"]["col1"].set([0, 1]) + db.save(self.db_root, storage_format=self.storage_format) + assert self.md5() != md5 + + def test_change_row_order(self, tmpdir, storage_format): + r"""Change order of table rows.""" + db = self.db(tmpdir, storage_format) + md5 = self.md5() + index = audformat.segmented_index(["f2", "f1"], [1, 0], [2, 1]) + db["table"] = audformat.Table(index) + db["table"]["column"] = audformat.Column(scheme_id="int") + db["table"]["column"].set([1, 0]) + db.save(self.db_root, storage_format=storage_format) + assert self.md5() != md5 + + def test_change_values(self, tmpdir, storage_format): + r"""Change table values.""" + db = self.db(tmpdir, storage_format) + md5 = self.md5() + index = audformat.segmented_index(["f1", "f2"], [0, 1], [1, 2]) + db["table"] = audformat.Table(index) + db["table"]["column"] = audformat.Column(scheme_id="int") + db["table"]["column"].set([1, 0]) + db.save(self.db_root, storage_format=self.storage_format) + assert self.md5() != md5 + + def test_copy_table(self, tmpdir, storage_format): + r"""Replace table with identical copy.""" + db = self.db(tmpdir, storage_format) + md5 = self.md5() + table = db["table"].copy() + db["table"] = table + db.save(self.db_root, storage_format=self.storage_format) + assert self.md5() == md5 + + +@pytest.mark.parametrize( + "table_id, expected_hash", + [ + ( + "files", + "a66a22ee4158e0e5100f1d797155ad81", + ), + ( + "segments", + "f69eb4a5d19da71e5da00a9b13beb3db", + ), + ( + "misc", + "331f79758b195cb9b7d0e8889e830eb2", + ), + ], +) +def test_parquet_hash_reproducibility(tmpdir, table_id, expected_hash): + r"""Test reproducibility of binary PARQUET files. + + When storing the same dataframe + to different PARQUET files, + the files will slightly vary + and have different MD5 sums. + + To provide a reproducible hash, + in order to judge if a table has changed, + we calculate the hash of the table + and store it in the metadata + of the schema + of a the table. + + """ + random.seed(1) # ensure the same random table values are created + db = audformat.testing.create_db() + + # Write to PARQUET file and check if correct hash is stored + path_wo_ext = audeer.path(tmpdir, table_id) + path = f"{path_wo_ext}.parquet" + db[table_id].save(path_wo_ext, storage_format="parquet") + metadata = parquet.read_schema(path).metadata + assert metadata[b"hash"].decode() == expected_hash + + # Load table from PARQUET file, and overwrite it + db[table_id].load(path_wo_ext) + os.remove(path) + db[table_id].save(path_wo_ext, storage_format="parquet") + metadata = parquet.read_schema(path).metadata + assert metadata[b"hash"].decode() == expected_hash + + @pytest.mark.parametrize( "files", [ @@ -1403,6 +1573,100 @@ def test_pick_index(table, index, expected): pd.testing.assert_index_equal(table.index, expected) +@pytest.mark.parametrize( + "storage_format", + [ + pytest.param( + "csv", + marks=pytest.mark.skip(reason="CSV does not support numpy arrays"), + ), + "parquet", + "pkl", + ], +) +def test_save_and_load(tmpdir, storage_format): + r"""Test saving and loading of a table. + + Ensures the table dataframe representation + is identical after saving and loading a table. + + Args: + tmpdir: tmpdir fixture + storage_format: storage format + the table should be written to disk. + This will also be used as file extension + + """ + db = audformat.testing.create_db() + + # Extend database with more table/scheme combinations + db.schemes["int-labels"] = audformat.Scheme( + dtype=audformat.define.DataType.INTEGER, + labels=[0, 1], + ) + db.schemes["object"] = audformat.Scheme(audformat.define.DataType.OBJECT) + index = pd.MultiIndex.from_arrays( + [[0, 1], ["a", "b"]], + names=["idx1", "idx2"], + ) + index = audformat.utils.set_index_dtypes( + index, + { + "idx1": audformat.define.DataType.INTEGER, + "idx2": audformat.define.DataType.OBJECT, + }, + ) + db["multi-misc"] = audformat.MiscTable(index) + db["multi-misc"]["int"] = audformat.Column(scheme_id="int-labels") + db["multi-misc"]["int"].set([0, pd.NA]) + db["multi-misc"]["bool"] = audformat.Column(scheme_id="bool") + db["multi-misc"]["bool"].set([True, pd.NA]) + db["multi-misc"]["arrays"] = audformat.Column(scheme_id="object") + db["multi-misc"]["arrays"].set([np.array([0, 1]), np.array([2, 3])]) + db["multi-misc"]["lists"] = audformat.Column(scheme_id="object") + db["multi-misc"]["lists"].set([[0, 1], [2, 3]]) + db["multi-misc"]["no-scheme"] = audformat.Column() + db["multi-misc"]["no-scheme"].set([0, 1]) + + for table_id in list(db): + expected_df = db[table_id].get() + path_wo_ext = audeer.path(tmpdir, table_id) + path = f"{path_wo_ext}.{storage_format}" + db[table_id].save(path_wo_ext, storage_format=storage_format) + assert os.path.exists(path) + db[table_id].load(path_wo_ext) + pd.testing.assert_frame_equal(db[table_id].df, expected_df) + + +@pytest.mark.parametrize( + "storage_format, expected_error, expected_error_msg", + [ + ( + "non-existing", + audformat.errors.BadValueError, + re.escape( + "Bad value 'non-existing', expected one of ['csv', 'parquet', 'pkl']" + ), + ), + ], +) +def test_save_errors(tmpdir, storage_format, expected_error, expected_error_msg): + r"""Test errors when saving a table. + + Args: + tmpdir: tmpdir fixture + storage_format: storage format of table + expected_error: expected error, e.g. ``ValueError`` + expected_error_msg: expected test of error message + + """ + db = audformat.testing.create_db() + table_id = list(db)[0] + path_wo_ext = audeer.path(tmpdir, table_id) + with pytest.raises(expected_error, match=expected_error_msg): + db[table_id].save(path_wo_ext, storage_format=storage_format) + + @pytest.mark.parametrize( "num_files,num_segments_per_file,values", [ @@ -1875,3 +2139,89 @@ def test_update(table, overwrite, others): for column_id, column in other.columns.items(): assert column.scheme == table[column_id].scheme assert column.rater == table[column_id].rater + + +@pytest.mark.parametrize("update_other_formats", [True, False]) +@pytest.mark.parametrize( + "storage_format, existing_formats", + [ + ("csv", []), + ("csv", []), + ("csv", ["pkl"]), + ("csv", ["parquet", "pkl"]), + ("pkl", ["parquet"]), + ("pkl", ["csv"]), + ("pkl", ["parquet", "csv"]), + ("parquet", ["pkl"]), + ("parquet", ["csv"]), + ("parquet", ["pkl", "csv"]), + ], +) +def test_update_other_formats( + tmpdir, + storage_format, + existing_formats, + update_other_formats, +): + r"""Tests updating of other table formats. + + When a table is stored with `audformat.Table.save()` + as CSV, PARQUET, or PKL file, + a user might select + that all other existing file representations of the table + are updated as well. + E.g. if a PKL file of the same table exists, + and a user saves to a CSV file + with the argument `update_other_formats=True`, + it should write the table to the CSV and PKL file. + + Args: + tmpdir: tmpdir fixture + storage_format: storage format of table + existing_formats: formats the table should be stored in + before saving to ``storage_format`` + update_other_formats: if tables specified in ``existing_formats`` + should be updated when saving ``storage_format`` + + """ + db = audformat.testing.create_db() + + table_id = "files" + table_file = audeer.path(tmpdir, "table") + + # Create existing table files and pause for a short time + old_mtime = {} + for ext in existing_formats: + db[table_id].save( + table_file, + storage_format=ext, + update_other_formats=False, + ) + old_mtime[ext] = os.path.getmtime(f"{table_file}.{ext}") + time.sleep(0.05) + + # Store table to requested format + db[table_id].save( + table_file, + storage_format=storage_format, + update_other_formats=update_other_formats, + ) + + # Collect mtimes of existing table files + mtime = {} + formats = existing_formats + [storage_format] + for ext in formats: + mtime[ext] = os.path.getmtime(f"{table_file}.{ext}") + + # Ensure mtimes are correct + if update_other_formats: + if "pickle" in formats and "csv" in formats: + assert mtime["pickle"] >= mtime["csv"] + if "pickle" in formats and "parquet" in formats: + assert mtime["pickle"] >= mtime["parquet"] + if "csv" in formats and "parquet" in formats: + assert mtime["csv"] >= mtime["parquet"] + else: + for ext in existing_formats: + assert mtime[ext] == old_mtime[ext] + assert mtime[storage_format] > old_mtime[ext]