Skip to content

Commit

Permalink
Merge pull request #170 from JDASoftwareGroup/fix/empty_index_arrow_e…
Browse files Browse the repository at this point in the history
…xception

fix ArrowNotImplementedError for empty indices
  • Loading branch information
fjetter authored Oct 25, 2019
2 parents 4021840 + e3816c2 commit 13df5b1
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 18 deletions.
8 changes: 8 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
Changelog
=========

Version 3.5.1 (2019-10-25)
==========================
- Fix potential ``pyarrow.lib.ArrowNotImplementedError`` when trying to store or pickle empty
:class:`~kartothek.core.index.ExplicitSecondaryIndex` objects
- Fix pickling of :class:`~kartothek.core.index.ExplicitSecondaryIndex` unloaded in
:func:`~kartothek.io_components.read.dispatch_metapartitions_from_factory`


Version 3.5.0 (2019-10-21)
==========================

Expand Down
29 changes: 22 additions & 7 deletions kartothek/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from copy import copy
from typing import Any, Dict, Iterable, List, Optional, Set, TypeVar, Union
from typing import Any, Dict, Iterable, List, Optional, Set, TypeVar, Union, cast

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -561,6 +561,8 @@ def __init__(
):
if (index_dct is None) and not index_storage_key:
raise ValueError("No valid index source specified")
if not index_storage_key and not index_dct and dtype is None:
raise ValueError("Trying to create non-typesafe index")
self.index_storage_key = index_storage_key
super(ExplicitSecondaryIndex, self).__init__(
column=column,
Expand All @@ -569,15 +571,24 @@ def __init__(
normalize_dtype=normalize_dtype,
)

def copy(self, **kwargs) -> "IndexBase":
def copy(self, **kwargs) -> "ExplicitSecondaryIndex":
if kwargs:
index_storage_key = None
index_storage_key = kwargs.pop("index_storage_key", None)
else:
index_storage_key = self.index_storage_key
return super(IndexBase, self).copy(
index_storage_key=index_storage_key, **kwargs
return cast(
ExplicitSecondaryIndex,
super(IndexBase, self).copy(index_storage_key=index_storage_key, **kwargs),
)

def unload(self) -> "IndexBase":
"""
Drop index data to safe memory.
"""
idx = self.copy(index_dct={}, index_storage_key=self.index_storage_key)
idx._index_dct_available = False
return idx

def __eq__(self, other) -> bool:
if not isinstance(other, ExplicitSecondaryIndex):
return False
Expand Down Expand Up @@ -845,9 +856,13 @@ def _index_dct_to_table(index_dct: IndexDictType, column: str, dtype: pa.DataTyp
# Additional note: pyarrow.array is supposed to infer type automatically.
# But the inferred type is not enough to hold np.uint64. Until this is fixed in
# upstream Arrow, we have to retain the following line
keys = np.array(list(keys))
if not index_dct:
# the np.array dtype will be double which arrow cannot convert to the target type, so use an empty list instead
labeled_array = pa.array([], type=dtype)
else:
keys = np.array(list(keys))
labeled_array = pa.array(keys, type=dtype)

labeled_array = pa.array(keys, type=dtype)
partition_array = pa.array(list(index_dct.values()))

return pa.Table.from_arrays(
Expand Down
4 changes: 4 additions & 0 deletions kartothek/io_components/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,9 @@ def build_indices(self, columns):
are overwritten
:return: self
"""
if self.label is None:
return self

new_indices = {}
for col in columns:
possible_values = set()
Expand Down Expand Up @@ -1678,6 +1681,7 @@ def parse_input_to_metapartition(
'Use the `secondary_indices` keyword argument of "write" and "update" functions instead.',
DeprecationWarning,
)
indices = {k: v for k, v in indices.items() if v}
_ensure_valid_indices(
mp_indices=indices, secondary_indices=expected_secondary_indices, data=data
)
Expand Down
2 changes: 1 addition & 1 deletion kartothek/io_components/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def dispatch_metapartitions_from_factory(
base_df = base_df[base_df.index.map(label_filter)]

indices_to_dispatch = {
name: ix.copy(index_dct={})
name: ix.unload()
for name, ix in dataset_factory.indices.items()
if isinstance(ix, ExplicitSecondaryIndex)
}
Expand Down
63 changes: 54 additions & 9 deletions tests/core/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,51 @@ def test_index_update_wrong_col():
)


def test_index_empty(store):
@pytest.mark.parametrize(
"dtype",
[
pa.binary(),
pa.bool_(),
pa.date32(),
pa.float32(),
pa.float64(),
pa.int64(),
pa.int8(),
pa.string(),
pa.timestamp("ns"),
],
)
def test_index_empty(store, dtype):
storage_key = "dataset_uuid/some_index.parquet"
index1 = ExplicitSecondaryIndex(
column="col", index_dct={}, dtype=pa.int64(), index_storage_key=storage_key
column="col", index_dct={}, dtype=dtype, index_storage_key=storage_key
)
key1 = index1.store(store, "dataset_uuid")

index2 = ExplicitSecondaryIndex(column="col", index_storage_key=key1).load(store)
assert index1 == index2

index3 = pickle.loads(pickle.dumps(index1))
assert index1 == index3


def test_pickle_without_load(store):
storage_key = "dataset_uuid/some_index.parquet"
index1 = ExplicitSecondaryIndex(
column="col", index_dct={1: ["part_1"]}, index_storage_key=storage_key
)
key1 = index1.store(store, "dataset_uuid")

index2 = ExplicitSecondaryIndex(column="col", index_storage_key=key1)
assert index2 != index1

index3 = pickle.loads(pickle.dumps(index2))
assert index3 == index2

index4 = index3.load(store)
assert index4 == index1
assert index4 != index2


def test_index_no_source():
with pytest.raises(ValueError) as e:
Expand Down Expand Up @@ -735,13 +770,23 @@ def test_serialization_normalization(key):
)


def test_serialization_no_indices(store):
index = ExplicitSecondaryIndex(column="col", index_dct={1: ["part_1"]})
storage_key = index.store(store=store, dataset_uuid="uuid")
@pytest.mark.parametrize("with_index_dct", [True, False])
def test_unload(with_index_dct):
storage_key = "dataset_uuid/some_index.parquet"
index1 = ExplicitSecondaryIndex(
column="col",
index_dct={1: ["part_1"]} if with_index_dct else None,
index_storage_key=storage_key,
)

index2 = index1.unload()
assert not index2.loaded
assert index2.index_storage_key == storage_key

# Create index without `index_dct`
index = ExplicitSecondaryIndex(column="col", index_storage_key=storage_key)
index3 = pickle.loads(pickle.dumps(index2))
assert index2 == index3

index2 = pickle.loads(pickle.dumps(index))

assert index == index2
def test_fail_type_unsafe():
with pytest.raises(ValueError, match="Trying to create non-typesafe index"):
ExplicitSecondaryIndex(column="col", index_dct={})
2 changes: 1 addition & 1 deletion tests/io/dask/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
from distributed import Client

import kartothek
import kartothek.core._time
from kartothek.core.testing import cm_frozen_time

_client = None
Expand Down

0 comments on commit 13df5b1

Please sign in to comment.