From ffbee150a95b1856e093d400931fe6b1ae091ad7 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 10 Dec 2019 11:45:32 +0100 Subject: [PATCH 1/3] Do not produce duplicate jobs when using predicates --- CHANGES.rst | 8 ++++++++ kartothek/io_components/read.py | 4 ++-- tests/io_components/test_read.py | 34 ++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index add5dfb5..cd65f7b8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,14 @@ Changelog ========= +Version 3.6.1 (2019-12-10) +========================== + +Bug fixes +^^^^^^^^^ +* Fix a regression introduced in 3.5.0 where predicates which allow multiple + values for a field would generate duplicates + Version 3.6.0 (2019-12-03) ========================== diff --git a/kartothek/io_components/read.py b/kartothek/io_components/read.py index 74ea9ae2..7eee0482 100644 --- a/kartothek/io_components/read.py +++ b/kartothek/io_components/read.py @@ -84,7 +84,7 @@ def dispatch_metapartitions_from_factory( logical_conjunction = list( zip(dispatch_by, ["=="] * len(dispatch_by), group_name) ) - for label in group.index: + for label in group.index.unique(): mps.append( MetaPartition.from_partition( partition=dataset_factory.partitions[label], @@ -98,7 +98,7 @@ def dispatch_metapartitions_from_factory( ) yield mps else: - for part_label in base_df.index: + for part_label in base_df.index.unique(): part = dataset_factory.partitions[part_label] yield MetaPartition.from_partition( diff --git a/tests/io_components/test_read.py b/tests/io_components/test_read.py index 948d030c..35957595 100644 --- a/tests/io_components/test_read.py +++ b/tests/io_components/test_read.py @@ -136,3 +136,37 @@ def test_dispatch_metapartitions_concat_regression(store): mps = list(dispatch_metapartitions(dataset.uuid, store, dispatch_by=["p"])) assert len(mps) == 1 + + +def test_dispatch_metapartitions_dups_with_predicates(store): + dataset = store_dataframes_as_dataset( + dfs=[pd.DataFrame({"p": [0, 1], "x": 0})], + dataset_uuid="test", + store=store, + secondary_indices=["p"], + ) + + wout_preds = list(dispatch_metapartitions(dataset.uuid, store)) + w_preds = list( + dispatch_metapartitions(dataset.uuid, store, predicates=[[("p", "in", [0, 1])]]) + ) + + assert wout_preds == w_preds + + +def test_dispatch_metapartitions_dups_with_predicates_dispatch_by(store): + dataset = store_dataframes_as_dataset( + dfs=[pd.DataFrame({"p": [0, 1], "x": 0})], + dataset_uuid="test", + store=store, + secondary_indices=["p", "x"], + ) + + wout_preds = list(dispatch_metapartitions(dataset.uuid, store, dispatch_by="x")) + w_preds = list( + dispatch_metapartitions( + dataset.uuid, store, predicates=[[("p", "in", [0, 1])]], dispatch_by="x" + ) + ) + + assert wout_preds == w_preds From 249ee3cc6b048aa017990e2babe03fc0773e40c0 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Mon, 9 Dec 2019 14:16:01 +0100 Subject: [PATCH 2/3] Improve predicate TypeError messages --- kartothek/core/index.py | 4 +++- kartothek/serialization/_generic.py | 30 ++++++++++++++++++++++----- kartothek/serialization/_parquet.py | 32 ++++++++++++++++++++--------- tests/core/test_index.py | 8 ++++++-- tests/serialization/test_filter.py | 2 +- 5 files changed, 57 insertions(+), 19 deletions(-) diff --git a/kartothek/core/index.py b/kartothek/core/index.py index bbe8fc81..904c713d 100644 --- a/kartothek/core/index.py +++ b/kartothek/core/index.py @@ -231,7 +231,9 @@ def eval_operator(self, op: str, value: ValueType) -> Set[str]: if index_arr is None: index_arr = np.array(list(index_dct.keys())) - index = filter_array_like(index_arr, op, value, strict_date_types=True) + index = filter_array_like( + index_arr, op, value, strict_date_types=True, column_name=self.column + ) allowed_values = index_arr[index] # Need to determine allowed values to include predicates like `in` for value in allowed_values: diff --git a/kartothek/serialization/_generic.py b/kartothek/serialization/_generic.py index 66aa8193..fec8a170 100644 --- a/kartothek/serialization/_generic.py +++ b/kartothek/serialization/_generic.py @@ -278,13 +278,15 @@ def filter_df_from_predicates( for conjunction in predicates: inner_indexer = np.ones(len(df), dtype=bool) for column, op, value in conjunction: + column_name = ensure_unicode_string_type(column) filter_array_like( - df[ensure_unicode_string_type(column)].values, + df[column_name].values, op, value, inner_indexer, inner_indexer, strict_date_types=strict_date_types, + column_name=column_name, ) indexer = inner_indexer | indexer return df[indexer] @@ -326,7 +328,9 @@ def _handle_timelike_values(array_value_type, value, value_dtype, strict_date_ty return value, value_dtype -def _ensure_type_stability(array_like, value, strict_date_types, require_ordered): +def _ensure_type_stability( + array_like, value, strict_date_types, require_ordered, column_name=None +): """ Ensure that the provided value and the provided array will have compatible types, such that comparisons are unambiguous. @@ -351,6 +355,9 @@ def _ensure_type_stability(array_like, value, strict_date_types, require_ordered Indicate if the operator to be evaluated will require a notion of ordering. In the case of pd.Categorical we will then assume a lexicographical ordering and cast the pd.CategoricalDtype accordingly + column_name: str, optional + Name of the column where `array_like` originates from, used for nicer + error messages. """ value_dtype = pd.Series(value).dtype @@ -375,8 +382,12 @@ def _ensure_type_stability(array_like, value, strict_date_types, require_ordered type_comp = (value_dtype.kind, array_value_type.kind) if len(set(type_comp)) > 1 and type_comp not in compatible_types: + if column_name is None: + column_name = "" raise TypeError( - f"Unexpected type encountered. Expected {array_value_type.kind} but got {value_dtype.kind}." + f"Unexpected type for predicate: Column {column_name!r} has pandas " + f"type '{array_value_type}', but predicate value {value!r} has " + f"pandas type '{value_dtype}' (Python type '{type(value)}')." ) if "M" in type_comp: value, value_dtype = _handle_timelike_values( @@ -386,7 +397,13 @@ def _ensure_type_stability(array_like, value, strict_date_types, require_ordered def filter_array_like( - array_like, op, value, mask=None, out=None, strict_date_types=False + array_like, + op, + value, + mask=None, + out=None, + strict_date_types=False, + column_name=None, ): """ Filter an array-like object using operations defined in the predicates @@ -407,6 +424,9 @@ def filter_array_like( array is returned. strict_date_types: bool If False (default), cast all datelike values to datetime64 for comparison. + column_name: str, optional + Name of the column where `array_like` originates from, used for nicer + error messages. """ if mask is None: mask = np.ones(len(array_like), dtype=bool) @@ -422,7 +442,7 @@ def filter_array_like( require_ordered = "<" in op or ">" in op array_like, value = _ensure_type_stability( - array_like, value, strict_date_types, require_ordered + array_like, value, strict_date_types, require_ordered, column_name ) with np.errstate(invalid="ignore"): diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 083e9fd0..980639bd 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -258,6 +258,7 @@ def _normalize_predicates(parquet_file, predicates, for_pushdown): col, op, val = literal col_idx = parquet_file.reader.column_name_idx(col) pa_type = schema[col_idx].type + column_name = schema[col_idx].name if pa.types.is_null(pa_type): # early exit, the entire conjunction evaluates to False @@ -265,7 +266,10 @@ def _normalize_predicates(parquet_file, predicates, for_pushdown): break if op == "in": - values = [_normalize_value(l, pa_type) for l in literal[2]] + values = [ + _normalize_value(l, pa_type, column_name=column_name) + for l in literal[2] + ] if for_pushdown and values: normalized_value = [ _timelike_to_arrow_encoding(value, pa_type) for value in values @@ -273,7 +277,9 @@ def _normalize_predicates(parquet_file, predicates, for_pushdown): else: normalized_value = values else: - normalized_value = _normalize_value(literal[2], pa_type) + normalized_value = _normalize_value( + literal[2], pa_type, column_name=column_name + ) if for_pushdown: normalized_value = _timelike_to_arrow_encoding( normalized_value, pa_type @@ -312,7 +318,7 @@ def _timelike_to_arrow_encoding(value, pa_type): return value -def _normalize_value(value, pa_type): +def _normalize_value(value, pa_type, column_name=None): if pa.types.is_dictionary(pa_type): pa_type = pa_type.value_type @@ -347,14 +353,20 @@ def _normalize_value(value, pa_type): elif isinstance(value, bytes): value = value.decode("utf-8") return datetime.datetime.strptime(value, "%Y-%m-%d").date() - elif isinstance(value, datetime.date) and not isinstance( - value, datetime.datetime - ): - return value + elif isinstance(value, datetime.date): + if isinstance(value, datetime.datetime): + raise TypeError( + f"Unexpected type for predicate: Column {column_name!r} is an " + f"Arrow date ({pa_type}), but predicate value has type {type(value)}. " + f"Use a Python 'datetime.date' object instead." + ) + else: + return value + predicate_value_dtype = pd.Series(value).dtype raise TypeError( - "Unexpected type for predicate. Expected `{} ({})` but got `{} ({})`".format( - pa_type, pa_type.to_pandas_dtype(), value, type(value) - ) + f"Unexpected type for predicate: Column {column_name!r} has pandas type " + f"{pa_type.to_pandas_dtype()} (Arrow type {pa_type}), but predicate value " + f"{value!r} has pandas type '{predicate_value_dtype}' (Python type '{type(value)}')" ) diff --git a/tests/core/test_index.py b/tests/core/test_index.py index f013e2b9..75f48df8 100644 --- a/tests/core/test_index.py +++ b/tests/core/test_index.py @@ -608,11 +608,15 @@ def test_eval_operators_type_safety(): # gh66 ind = IndexBase(column="col", index_dct={1234: ["part"]}, dtype=pa.int64()) with pytest.raises( - TypeError, match="Unexpected type encountered. Expected i but got O." + TypeError, + match=r"Unexpected type for predicate: Column 'col' has pandas type 'int64', " + r"but predicate value '1234' has pandas type 'object' \(Python type ''\).", ): ind.eval_operator("==", "1234") with pytest.raises( - TypeError, match="Unexpected type encountered. Expected i but got f." + TypeError, + match=r"Unexpected type for predicate: Column 'col' has pandas type 'int64', " + r"but predicate value 1234.0 has pandas type 'float64' \(Python type ''\).", ): ind.eval_operator("==", 1234.0) diff --git a/tests/serialization/test_filter.py b/tests/serialization/test_filter.py index 558f4c02..2fab022f 100644 --- a/tests/serialization/test_filter.py +++ b/tests/serialization/test_filter.py @@ -93,7 +93,7 @@ def test_filter_array_like_categoricals(op, expected, cat_type): @pytest.mark.parametrize("op", ["==", "!=", "<", "<=", ">", ">=", "in"]) def test_raise_on_type(value, filter_value, op): array_like = pd.Series([value]) - with pytest.raises(TypeError, match="Unexpected type encountered."): + with pytest.raises(TypeError, match="Unexpected type for predicate:"): filter_array_like(array_like, op, filter_value, strict_date_types=True) From 5ae03b0816e03bb91e9470ec3d0f304e6399481e Mon Sep 17 00:00:00 2001 From: florian-jetter-jdas <57362409+florian-jetter-jdas@users.noreply.github.com> Date: Wed, 11 Dec 2019 13:05:22 +0100 Subject: [PATCH 3/3] Fix release date for 3.6.1 --- CHANGES.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index cd65f7b8..ca62f27f 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,7 +2,7 @@ Changelog ========= -Version 3.6.1 (2019-12-10) +Version 3.6.1 (2019-12-11) ========================== Bug fixes