diff --git a/CHANGES.rst b/CHANGES.rst index 45b4a607..374cb220 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,14 @@ Changelog ========= +Version 3.5.1 (2019-10-25) +========================== +- Fix potential ``pyarrow.lib.ArrowNotImplementedError`` when trying to store or pickle empty + :class:`~kartothek.core.index.ExplicitSecondaryIndex` objects +- Fix pickling of :class:`~kartothek.core.index.ExplicitSecondaryIndex` unloaded in + :func:`~kartothek.io_components.read.dispatch_metapartitions_from_factory` + + Version 3.5.0 (2019-10-21) ========================== diff --git a/kartothek/core/index.py b/kartothek/core/index.py index 127d836f..bbe8fc81 100644 --- a/kartothek/core/index.py +++ b/kartothek/core/index.py @@ -2,7 +2,7 @@ import logging from copy import copy -from typing import Any, Dict, Iterable, List, Optional, Set, TypeVar, Union +from typing import Any, Dict, Iterable, List, Optional, Set, TypeVar, Union, cast import numpy as np import pandas as pd @@ -561,6 +561,8 @@ def __init__( ): if (index_dct is None) and not index_storage_key: raise ValueError("No valid index source specified") + if not index_storage_key and not index_dct and dtype is None: + raise ValueError("Trying to create non-typesafe index") self.index_storage_key = index_storage_key super(ExplicitSecondaryIndex, self).__init__( column=column, @@ -569,15 +571,24 @@ def __init__( normalize_dtype=normalize_dtype, ) - def copy(self, **kwargs) -> "IndexBase": + def copy(self, **kwargs) -> "ExplicitSecondaryIndex": if kwargs: - index_storage_key = None + index_storage_key = kwargs.pop("index_storage_key", None) else: index_storage_key = self.index_storage_key - return super(IndexBase, self).copy( - index_storage_key=index_storage_key, **kwargs + return cast( + ExplicitSecondaryIndex, + super(IndexBase, self).copy(index_storage_key=index_storage_key, **kwargs), ) + def unload(self) -> "IndexBase": + """ + Drop index data to safe memory. + """ + idx = self.copy(index_dct={}, index_storage_key=self.index_storage_key) + idx._index_dct_available = False + return idx + def __eq__(self, other) -> bool: if not isinstance(other, ExplicitSecondaryIndex): return False @@ -845,9 +856,13 @@ def _index_dct_to_table(index_dct: IndexDictType, column: str, dtype: pa.DataTyp # Additional note: pyarrow.array is supposed to infer type automatically. # But the inferred type is not enough to hold np.uint64. Until this is fixed in # upstream Arrow, we have to retain the following line - keys = np.array(list(keys)) + if not index_dct: + # the np.array dtype will be double which arrow cannot convert to the target type, so use an empty list instead + labeled_array = pa.array([], type=dtype) + else: + keys = np.array(list(keys)) + labeled_array = pa.array(keys, type=dtype) - labeled_array = pa.array(keys, type=dtype) partition_array = pa.array(list(index_dct.values())) return pa.Table.from_arrays( diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index a99069cb..48819466 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -1221,6 +1221,9 @@ def build_indices(self, columns): are overwritten :return: self """ + if self.label is None: + return self + new_indices = {} for col in columns: possible_values = set() @@ -1678,6 +1681,7 @@ def parse_input_to_metapartition( 'Use the `secondary_indices` keyword argument of "write" and "update" functions instead.', DeprecationWarning, ) + indices = {k: v for k, v in indices.items() if v} _ensure_valid_indices( mp_indices=indices, secondary_indices=expected_secondary_indices, data=data ) diff --git a/kartothek/io_components/read.py b/kartothek/io_components/read.py index 50b4f9c3..74ea9ae2 100644 --- a/kartothek/io_components/read.py +++ b/kartothek/io_components/read.py @@ -65,7 +65,7 @@ def dispatch_metapartitions_from_factory( base_df = base_df[base_df.index.map(label_filter)] indices_to_dispatch = { - name: ix.copy(index_dct={}) + name: ix.unload() for name, ix in dataset_factory.indices.items() if isinstance(ix, ExplicitSecondaryIndex) } diff --git a/tests/core/test_index.py b/tests/core/test_index.py index 951f7e67..f013e2b9 100644 --- a/tests/core/test_index.py +++ b/tests/core/test_index.py @@ -144,16 +144,51 @@ def test_index_update_wrong_col(): ) -def test_index_empty(store): +@pytest.mark.parametrize( + "dtype", + [ + pa.binary(), + pa.bool_(), + pa.date32(), + pa.float32(), + pa.float64(), + pa.int64(), + pa.int8(), + pa.string(), + pa.timestamp("ns"), + ], +) +def test_index_empty(store, dtype): storage_key = "dataset_uuid/some_index.parquet" index1 = ExplicitSecondaryIndex( - column="col", index_dct={}, dtype=pa.int64(), index_storage_key=storage_key + column="col", index_dct={}, dtype=dtype, index_storage_key=storage_key ) key1 = index1.store(store, "dataset_uuid") index2 = ExplicitSecondaryIndex(column="col", index_storage_key=key1).load(store) assert index1 == index2 + index3 = pickle.loads(pickle.dumps(index1)) + assert index1 == index3 + + +def test_pickle_without_load(store): + storage_key = "dataset_uuid/some_index.parquet" + index1 = ExplicitSecondaryIndex( + column="col", index_dct={1: ["part_1"]}, index_storage_key=storage_key + ) + key1 = index1.store(store, "dataset_uuid") + + index2 = ExplicitSecondaryIndex(column="col", index_storage_key=key1) + assert index2 != index1 + + index3 = pickle.loads(pickle.dumps(index2)) + assert index3 == index2 + + index4 = index3.load(store) + assert index4 == index1 + assert index4 != index2 + def test_index_no_source(): with pytest.raises(ValueError) as e: @@ -735,13 +770,23 @@ def test_serialization_normalization(key): ) -def test_serialization_no_indices(store): - index = ExplicitSecondaryIndex(column="col", index_dct={1: ["part_1"]}) - storage_key = index.store(store=store, dataset_uuid="uuid") +@pytest.mark.parametrize("with_index_dct", [True, False]) +def test_unload(with_index_dct): + storage_key = "dataset_uuid/some_index.parquet" + index1 = ExplicitSecondaryIndex( + column="col", + index_dct={1: ["part_1"]} if with_index_dct else None, + index_storage_key=storage_key, + ) + + index2 = index1.unload() + assert not index2.loaded + assert index2.index_storage_key == storage_key - # Create index without `index_dct` - index = ExplicitSecondaryIndex(column="col", index_storage_key=storage_key) + index3 = pickle.loads(pickle.dumps(index2)) + assert index2 == index3 - index2 = pickle.loads(pickle.dumps(index)) - assert index == index2 +def test_fail_type_unsafe(): + with pytest.raises(ValueError, match="Trying to create non-typesafe index"): + ExplicitSecondaryIndex(column="col", index_dct={}) diff --git a/tests/io/dask/conftest.py b/tests/io/dask/conftest.py index cb79f899..5391b1af 100644 --- a/tests/io/dask/conftest.py +++ b/tests/io/dask/conftest.py @@ -5,7 +5,7 @@ import pytest from distributed import Client -import kartothek +import kartothek.core._time from kartothek.core.testing import cm_frozen_time _client = None