From f6043ecaeb9bf2d12bd8d12160d6c50572f5ccfb Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 25 Oct 2019 10:27:19 +0200 Subject: [PATCH 1/4] fix ArrowNotImplementedError for empty indices --- CHANGES.rst | 6 ++++++ kartothek/core/index.py | 8 ++++++-- tests/core/test_index.py | 18 ++++++++++++++++-- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 45b4a607..db8c730a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,12 @@ Changelog ========= +Version 3.5.1 (2019-10-25) +========================== +- Fix potential ``pyarrow.lib.ArrowNotImplementedError`` when trying to store or pickle empty + :class:`~kartothek.core.index.ExplicitSecondaryIndex` objects + + Version 3.5.0 (2019-10-21) ========================== diff --git a/kartothek/core/index.py b/kartothek/core/index.py index 127d836f..854d81c3 100644 --- a/kartothek/core/index.py +++ b/kartothek/core/index.py @@ -845,9 +845,13 @@ def _index_dct_to_table(index_dct: IndexDictType, column: str, dtype: pa.DataTyp # Additional note: pyarrow.array is supposed to infer type automatically. # But the inferred type is not enough to hold np.uint64. Until this is fixed in # upstream Arrow, we have to retain the following line - keys = np.array(list(keys)) + if not index_dct: + # the np.array dtype will be double which arrow cannot convert to the target type, so use an empty list instead + labeled_array = pa.array([], type=dtype) + else: + keys = np.array(list(keys)) + labeled_array = pa.array(keys, type=dtype) - labeled_array = pa.array(keys, type=dtype) partition_array = pa.array(list(index_dct.values())) return pa.Table.from_arrays( diff --git a/tests/core/test_index.py b/tests/core/test_index.py index 951f7e67..ec3cb440 100644 --- a/tests/core/test_index.py +++ b/tests/core/test_index.py @@ -144,10 +144,24 @@ def test_index_update_wrong_col(): ) -def test_index_empty(store): +@pytest.mark.parametrize( + "dtype", + [ + pa.binary(), + pa.bool_(), + pa.date32(), + pa.float32(), + pa.float64(), + pa.int64(), + pa.int8(), + pa.string(), + pa.timestamp("ns"), + ], +) +def test_index_empty(store, dtype): storage_key = "dataset_uuid/some_index.parquet" index1 = ExplicitSecondaryIndex( - column="col", index_dct={}, dtype=pa.int64(), index_storage_key=storage_key + column="col", index_dct={}, dtype=dtype, index_storage_key=storage_key ) key1 = index1.store(store, "dataset_uuid") From b6ad518b101a7a8a82412f3673782ea760299944 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 25 Oct 2019 11:22:16 +0200 Subject: [PATCH 2/4] fix pickling of unloaded indices --- CHANGES.rst | 2 ++ kartothek/core/index.py | 18 +++++++++++---- kartothek/io_components/read.py | 2 +- tests/core/test_index.py | 40 ++++++++++++++++++++++++++------- 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index db8c730a..374cb220 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,8 @@ Version 3.5.1 (2019-10-25) ========================== - Fix potential ``pyarrow.lib.ArrowNotImplementedError`` when trying to store or pickle empty :class:`~kartothek.core.index.ExplicitSecondaryIndex` objects +- Fix pickling of :class:`~kartothek.core.index.ExplicitSecondaryIndex` unloaded in + :func:`~kartothek.io_components.read.dispatch_metapartitions_from_factory` Version 3.5.0 (2019-10-21) diff --git a/kartothek/core/index.py b/kartothek/core/index.py index 854d81c3..79eaa4f7 100644 --- a/kartothek/core/index.py +++ b/kartothek/core/index.py @@ -2,7 +2,7 @@ import logging from copy import copy -from typing import Any, Dict, Iterable, List, Optional, Set, TypeVar, Union +from typing import Any, Dict, Iterable, List, Optional, Set, TypeVar, Union, cast import numpy as np import pandas as pd @@ -569,15 +569,25 @@ def __init__( normalize_dtype=normalize_dtype, ) - def copy(self, **kwargs) -> "IndexBase": + def copy(self, **kwargs) -> "ExplicitSecondaryIndex": if kwargs: index_storage_key = None else: index_storage_key = self.index_storage_key - return super(IndexBase, self).copy( - index_storage_key=index_storage_key, **kwargs + return cast( + ExplicitSecondaryIndex, + super(IndexBase, self).copy(index_storage_key=index_storage_key, **kwargs), ) + def unload(self) -> "IndexBase": + """ + Drop index data to safe memory. + """ + idx = self.copy(index_dct={}) + idx._index_dct_available = False + idx.index_storage_key = self.index_storage_key + return idx + def __eq__(self, other) -> bool: if not isinstance(other, ExplicitSecondaryIndex): return False diff --git a/kartothek/io_components/read.py b/kartothek/io_components/read.py index 50b4f9c3..74ea9ae2 100644 --- a/kartothek/io_components/read.py +++ b/kartothek/io_components/read.py @@ -65,7 +65,7 @@ def dispatch_metapartitions_from_factory( base_df = base_df[base_df.index.map(label_filter)] indices_to_dispatch = { - name: ix.copy(index_dct={}) + name: ix.unload() for name, ix in dataset_factory.indices.items() if isinstance(ix, ExplicitSecondaryIndex) } diff --git a/tests/core/test_index.py b/tests/core/test_index.py index ec3cb440..a7af0d99 100644 --- a/tests/core/test_index.py +++ b/tests/core/test_index.py @@ -168,6 +168,27 @@ def test_index_empty(store, dtype): index2 = ExplicitSecondaryIndex(column="col", index_storage_key=key1).load(store) assert index1 == index2 + index3 = pickle.loads(pickle.dumps(index1)) + assert index1 == index3 + + +def test_pickle_without_load(store): + storage_key = "dataset_uuid/some_index.parquet" + index1 = ExplicitSecondaryIndex( + column="col", index_dct={1: ["part_1"]}, index_storage_key=storage_key + ) + key1 = index1.store(store, "dataset_uuid") + + index2 = ExplicitSecondaryIndex(column="col", index_storage_key=key1) + assert index2 != index1 + + index3 = pickle.loads(pickle.dumps(index2)) + assert index3 == index2 + + index4 = index3.load(store) + assert index4 == index1 + assert index4 != index2 + def test_index_no_source(): with pytest.raises(ValueError) as e: @@ -749,13 +770,16 @@ def test_serialization_normalization(key): ) -def test_serialization_no_indices(store): - index = ExplicitSecondaryIndex(column="col", index_dct={1: ["part_1"]}) - storage_key = index.store(store=store, dataset_uuid="uuid") - - # Create index without `index_dct` - index = ExplicitSecondaryIndex(column="col", index_storage_key=storage_key) +def test_unload(): + storage_key = "dataset_uuid/some_index.parquet" + index1 = ExplicitSecondaryIndex( + column="col", index_dct={1: ["part_1"]}, index_storage_key=storage_key + ) + assert index1.loaded - index2 = pickle.loads(pickle.dumps(index)) + index2 = index1.unload() + assert not index2.loaded + assert index2.index_storage_key == storage_key - assert index == index2 + index3 = pickle.loads(pickle.dumps(index2)) + assert index2 == index3 From 8532b47d6462091e8235bf49e5cd35c1698571a0 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 25 Oct 2019 11:42:09 +0200 Subject: [PATCH 3/4] fix running single distributed tests --- tests/io/dask/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/io/dask/conftest.py b/tests/io/dask/conftest.py index cb79f899..5391b1af 100644 --- a/tests/io/dask/conftest.py +++ b/tests/io/dask/conftest.py @@ -5,7 +5,7 @@ import pytest from distributed import Client -import kartothek +import kartothek.core._time from kartothek.core.testing import cm_frozen_time _client = None From e3816c224191b2e7f68f1d6906fbc633572625f8 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 25 Oct 2019 12:05:40 +0200 Subject: [PATCH 4/4] do not create untyped indices --- kartothek/core/index.py | 7 ++++--- kartothek/io_components/metapartition.py | 4 ++++ tests/core/test_index.py | 13 ++++++++++--- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/kartothek/core/index.py b/kartothek/core/index.py index 79eaa4f7..bbe8fc81 100644 --- a/kartothek/core/index.py +++ b/kartothek/core/index.py @@ -561,6 +561,8 @@ def __init__( ): if (index_dct is None) and not index_storage_key: raise ValueError("No valid index source specified") + if not index_storage_key and not index_dct and dtype is None: + raise ValueError("Trying to create non-typesafe index") self.index_storage_key = index_storage_key super(ExplicitSecondaryIndex, self).__init__( column=column, @@ -571,7 +573,7 @@ def __init__( def copy(self, **kwargs) -> "ExplicitSecondaryIndex": if kwargs: - index_storage_key = None + index_storage_key = kwargs.pop("index_storage_key", None) else: index_storage_key = self.index_storage_key return cast( @@ -583,9 +585,8 @@ def unload(self) -> "IndexBase": """ Drop index data to safe memory. """ - idx = self.copy(index_dct={}) + idx = self.copy(index_dct={}, index_storage_key=self.index_storage_key) idx._index_dct_available = False - idx.index_storage_key = self.index_storage_key return idx def __eq__(self, other) -> bool: diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index a99069cb..48819466 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -1221,6 +1221,9 @@ def build_indices(self, columns): are overwritten :return: self """ + if self.label is None: + return self + new_indices = {} for col in columns: possible_values = set() @@ -1678,6 +1681,7 @@ def parse_input_to_metapartition( 'Use the `secondary_indices` keyword argument of "write" and "update" functions instead.', DeprecationWarning, ) + indices = {k: v for k, v in indices.items() if v} _ensure_valid_indices( mp_indices=indices, secondary_indices=expected_secondary_indices, data=data ) diff --git a/tests/core/test_index.py b/tests/core/test_index.py index a7af0d99..f013e2b9 100644 --- a/tests/core/test_index.py +++ b/tests/core/test_index.py @@ -770,12 +770,14 @@ def test_serialization_normalization(key): ) -def test_unload(): +@pytest.mark.parametrize("with_index_dct", [True, False]) +def test_unload(with_index_dct): storage_key = "dataset_uuid/some_index.parquet" index1 = ExplicitSecondaryIndex( - column="col", index_dct={1: ["part_1"]}, index_storage_key=storage_key + column="col", + index_dct={1: ["part_1"]} if with_index_dct else None, + index_storage_key=storage_key, ) - assert index1.loaded index2 = index1.unload() assert not index2.loaded @@ -783,3 +785,8 @@ def test_unload(): index3 = pickle.loads(pickle.dumps(index2)) assert index2 == index3 + + +def test_fail_type_unsafe(): + with pytest.raises(ValueError, match="Trying to create non-typesafe index"): + ExplicitSecondaryIndex(column="col", index_dct={})