Skip to content

Commit

Permalink
Merge branch 'master' into type-eager
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Dec 11, 2019
2 parents a622d17 + 5ae03b0 commit 440e748
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 23 deletions.
12 changes: 10 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@
Changelog
=========

Version 3.6.1 (2019-12-XX)
==========================
Version Unreleased
==================

Improvements
^^^^^^^^^^^^

- Add more explicit typing to :mod:`kartothek.io.eager`.

Version 3.6.1 (2019-12-11)
==========================

Bug fixes
^^^^^^^^^
* Fix a regression introduced in 3.5.0 where predicates which allow multiple
values for a field would generate duplicates

Version 3.6.0 (2019-12-03)
==========================

Expand Down
4 changes: 3 additions & 1 deletion kartothek/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ def eval_operator(self, op: str, value: ValueType) -> Set[str]:
if index_arr is None:
index_arr = np.array(list(index_dct.keys()))

index = filter_array_like(index_arr, op, value, strict_date_types=True)
index = filter_array_like(
index_arr, op, value, strict_date_types=True, column_name=self.column
)
allowed_values = index_arr[index]
# Need to determine allowed values to include predicates like `in`
for value in allowed_values:
Expand Down
4 changes: 2 additions & 2 deletions kartothek/io_components/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def dispatch_metapartitions_from_factory(
logical_conjunction = list(
zip(dispatch_by, ["=="] * len(dispatch_by), group_name)
)
for label in group.index:
for label in group.index.unique():
mps.append(
MetaPartition.from_partition(
partition=dataset_factory.partitions[label],
Expand All @@ -98,7 +98,7 @@ def dispatch_metapartitions_from_factory(
)
yield mps
else:
for part_label in base_df.index:
for part_label in base_df.index.unique():
part = dataset_factory.partitions[part_label]

yield MetaPartition.from_partition(
Expand Down
30 changes: 25 additions & 5 deletions kartothek/serialization/_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,15 @@ def filter_df_from_predicates(
for conjunction in predicates:
inner_indexer = np.ones(len(df), dtype=bool)
for column, op, value in conjunction:
column_name = ensure_unicode_string_type(column)
filter_array_like(
df[ensure_unicode_string_type(column)].values,
df[column_name].values,
op,
value,
inner_indexer,
inner_indexer,
strict_date_types=strict_date_types,
column_name=column_name,
)
indexer = inner_indexer | indexer
return df[indexer]
Expand Down Expand Up @@ -326,7 +328,9 @@ def _handle_timelike_values(array_value_type, value, value_dtype, strict_date_ty
return value, value_dtype


def _ensure_type_stability(array_like, value, strict_date_types, require_ordered):
def _ensure_type_stability(
array_like, value, strict_date_types, require_ordered, column_name=None
):
"""
Ensure that the provided value and the provided array will have compatible
types, such that comparisons are unambiguous.
Expand All @@ -351,6 +355,9 @@ def _ensure_type_stability(array_like, value, strict_date_types, require_ordered
Indicate if the operator to be evaluated will require a notion of
ordering. In the case of pd.Categorical we will then assume a
lexicographical ordering and cast the pd.CategoricalDtype accordingly
column_name: str, optional
Name of the column where `array_like` originates from, used for nicer
error messages.
"""

value_dtype = pd.Series(value).dtype
Expand All @@ -375,8 +382,12 @@ def _ensure_type_stability(array_like, value, strict_date_types, require_ordered
type_comp = (value_dtype.kind, array_value_type.kind)

if len(set(type_comp)) > 1 and type_comp not in compatible_types:
if column_name is None:
column_name = "<unknown>"
raise TypeError(
f"Unexpected type encountered. Expected {array_value_type.kind} but got {value_dtype.kind}."
f"Unexpected type for predicate: Column {column_name!r} has pandas "
f"type '{array_value_type}', but predicate value {value!r} has "
f"pandas type '{value_dtype}' (Python type '{type(value)}')."
)
if "M" in type_comp:
value, value_dtype = _handle_timelike_values(
Expand All @@ -386,7 +397,13 @@ def _ensure_type_stability(array_like, value, strict_date_types, require_ordered


def filter_array_like(
array_like, op, value, mask=None, out=None, strict_date_types=False
array_like,
op,
value,
mask=None,
out=None,
strict_date_types=False,
column_name=None,
):
"""
Filter an array-like object using operations defined in the predicates
Expand All @@ -407,6 +424,9 @@ def filter_array_like(
array is returned.
strict_date_types: bool
If False (default), cast all datelike values to datetime64 for comparison.
column_name: str, optional
Name of the column where `array_like` originates from, used for nicer
error messages.
"""
if mask is None:
mask = np.ones(len(array_like), dtype=bool)
Expand All @@ -422,7 +442,7 @@ def filter_array_like(

require_ordered = "<" in op or ">" in op
array_like, value = _ensure_type_stability(
array_like, value, strict_date_types, require_ordered
array_like, value, strict_date_types, require_ordered, column_name
)

with np.errstate(invalid="ignore"):
Expand Down
32 changes: 22 additions & 10 deletions kartothek/serialization/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,22 +258,28 @@ def _normalize_predicates(parquet_file, predicates, for_pushdown):
col, op, val = literal
col_idx = parquet_file.reader.column_name_idx(col)
pa_type = schema[col_idx].type
column_name = schema[col_idx].name

if pa.types.is_null(pa_type):
# early exit, the entire conjunction evaluates to False
new_conjunction = None
break

if op == "in":
values = [_normalize_value(l, pa_type) for l in literal[2]]
values = [
_normalize_value(l, pa_type, column_name=column_name)
for l in literal[2]
]
if for_pushdown and values:
normalized_value = [
_timelike_to_arrow_encoding(value, pa_type) for value in values
]
else:
normalized_value = values
else:
normalized_value = _normalize_value(literal[2], pa_type)
normalized_value = _normalize_value(
literal[2], pa_type, column_name=column_name
)
if for_pushdown:
normalized_value = _timelike_to_arrow_encoding(
normalized_value, pa_type
Expand Down Expand Up @@ -312,7 +318,7 @@ def _timelike_to_arrow_encoding(value, pa_type):
return value


def _normalize_value(value, pa_type):
def _normalize_value(value, pa_type, column_name=None):
if pa.types.is_dictionary(pa_type):
pa_type = pa_type.value_type

Expand Down Expand Up @@ -347,14 +353,20 @@ def _normalize_value(value, pa_type):
elif isinstance(value, bytes):
value = value.decode("utf-8")
return datetime.datetime.strptime(value, "%Y-%m-%d").date()
elif isinstance(value, datetime.date) and not isinstance(
value, datetime.datetime
):
return value
elif isinstance(value, datetime.date):
if isinstance(value, datetime.datetime):
raise TypeError(
f"Unexpected type for predicate: Column {column_name!r} is an "
f"Arrow date ({pa_type}), but predicate value has type {type(value)}. "
f"Use a Python 'datetime.date' object instead."
)
else:
return value
predicate_value_dtype = pd.Series(value).dtype
raise TypeError(
"Unexpected type for predicate. Expected `{} ({})` but got `{} ({})`".format(
pa_type, pa_type.to_pandas_dtype(), value, type(value)
)
f"Unexpected type for predicate: Column {column_name!r} has pandas type "
f"{pa_type.to_pandas_dtype()} (Arrow type {pa_type}), but predicate value "
f"{value!r} has pandas type '{predicate_value_dtype}' (Python type '{type(value)}')"
)


Expand Down
8 changes: 6 additions & 2 deletions tests/core/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,15 @@ def test_eval_operators_type_safety():
# gh66
ind = IndexBase(column="col", index_dct={1234: ["part"]}, dtype=pa.int64())
with pytest.raises(
TypeError, match="Unexpected type encountered. Expected i but got O."
TypeError,
match=r"Unexpected type for predicate: Column 'col' has pandas type 'int64', "
r"but predicate value '1234' has pandas type 'object' \(Python type '<class 'str'>'\).",
):
ind.eval_operator("==", "1234")
with pytest.raises(
TypeError, match="Unexpected type encountered. Expected i but got f."
TypeError,
match=r"Unexpected type for predicate: Column 'col' has pandas type 'int64', "
r"but predicate value 1234.0 has pandas type 'float64' \(Python type '<class 'float'>'\).",
):
ind.eval_operator("==", 1234.0)

Expand Down
34 changes: 34 additions & 0 deletions tests/io_components/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,37 @@ def test_dispatch_metapartitions_concat_regression(store):

mps = list(dispatch_metapartitions(dataset.uuid, store, dispatch_by=["p"]))
assert len(mps) == 1


def test_dispatch_metapartitions_dups_with_predicates(store):
dataset = store_dataframes_as_dataset(
dfs=[pd.DataFrame({"p": [0, 1], "x": 0})],
dataset_uuid="test",
store=store,
secondary_indices=["p"],
)

wout_preds = list(dispatch_metapartitions(dataset.uuid, store))
w_preds = list(
dispatch_metapartitions(dataset.uuid, store, predicates=[[("p", "in", [0, 1])]])
)

assert wout_preds == w_preds


def test_dispatch_metapartitions_dups_with_predicates_dispatch_by(store):
dataset = store_dataframes_as_dataset(
dfs=[pd.DataFrame({"p": [0, 1], "x": 0})],
dataset_uuid="test",
store=store,
secondary_indices=["p", "x"],
)

wout_preds = list(dispatch_metapartitions(dataset.uuid, store, dispatch_by="x"))
w_preds = list(
dispatch_metapartitions(
dataset.uuid, store, predicates=[[("p", "in", [0, 1])]], dispatch_by="x"
)
)

assert wout_preds == w_preds
2 changes: 1 addition & 1 deletion tests/serialization/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_filter_array_like_categoricals(op, expected, cat_type):
@pytest.mark.parametrize("op", ["==", "!=", "<", "<=", ">", ">=", "in"])
def test_raise_on_type(value, filter_value, op):
array_like = pd.Series([value])
with pytest.raises(TypeError, match="Unexpected type encountered."):
with pytest.raises(TypeError, match="Unexpected type for predicate:"):
filter_array_like(array_like, op, filter_value, strict_date_types=True)


Expand Down

0 comments on commit 440e748

Please sign in to comment.