Skip to content

Commit

Permalink
Merge branch 'bugfix/gh248' of github.com:fjetter/kartothek into bugf…
Browse files Browse the repository at this point in the history
…ix/gh248
  • Loading branch information
fjetter committed Mar 19, 2020
2 parents 04147fd + c9cc1fa commit c29bf98
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Improvements
Bug fixes
^^^^^^^^^
* GH248 Fix an issue causing a ValueError to be raised when using `dask_index_on` on non-integer columns
* GH255 Fix an issue causing the python interpreter to shut down when reading an
empty file (see also https://issues.apache.org/jira/browse/ARROW-8142)

Version 3.8.0 (2020-03-12)
==========================
Expand Down
3 changes: 3 additions & 0 deletions docs/guide/partitioning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ structure would be different if the columns are in a different order.
.. note:: Every partition must have data for every table. An empty dataframe in this
context is also considered as data.

.. _partitioning_dask:

Force partitioning by shuffling using Dask
------------------------------------------
Expand Down Expand Up @@ -169,6 +170,8 @@ number of physical input partitions.
).compute()
sorted(dm.partitions.keys())
.. _shuffling:

Shuffling
*********

Expand Down
19 changes: 18 additions & 1 deletion kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,30 @@ def update_dataset_from_ddf(
.. note:: This can only be used for datasets with a single table!
See also, :ref:`partitioning_dask`.
Parameters
----------
ddf: Union[dask.dataframe.DataFrame, None]
The dask.Dataframe to be used to calculate the new partitions from. If this parameter is `None`, the update pipeline
will only delete partitions without creating new ones.
shuffle: bool
If True and partition_on is requested, shuffle the data to reduce number of output partitions
If `True` and `partition_on` is requested, shuffle the data to reduce number of output partitions.
See also, :ref:`shuffling`.
.. warning::
Dask uses a heuristic to determine how data is shuffled and there are two options, `partd` for local disk shuffling and `tasks` for distributed shuffling using a task graph. If there is no :class:`distributed.Client` in the context and the option is not set explicitly, dask will choose `partd` which may cause data loss when the graph is executed on a distributed cluster.
Therefore, we recommend to specify the dask shuffle method explicitly, e.g. by using a context manager.
.. code::
with dask.config(shuffle='tasks'):
graph = update_dataset_from_ddf(...)
graph.compute()
repartition_ratio: Optional[Union[int, float]]
If provided, repartition the dataframe before calculation starts to ``ceil(ddf.npartitions / repartition_ratio)``
num_buckets: int
Expand Down
34 changes: 25 additions & 9 deletions kartothek/serialization/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,32 @@ def _reset_dictionary_columns(table, exclude=None):
"""
if exclude is None:
exclude = []

if ARROW_LARGER_EQ_0150:
schema = table.schema
for i in range(len(schema)):
field = schema[i]
if pa.types.is_dictionary(field.type):
new_field = pa.field(
field.name, field.type.value_type, field.nullable, field.metadata
)
schema = schema.remove(i).insert(i, new_field)
table = table.cast(schema)
# https://issues.apache.org/jira/browse/ARROW-8142
if len(table) == 0:
df = table.to_pandas(date_as_object=True)
new_types = {
col: df[col].cat.categories.dtype
for col in df.select_dtypes("category")
}
if new_types:
df = df.astype(new_types)
table = pa.Table.from_pandas(df)
else:
schema = table.schema
for i in range(len(schema)):
field = schema[i]
if pa.types.is_dictionary(field.type):
new_field = pa.field(
field.name,
field.type.value_type,
field.nullable,
field.metadata,
)
schema = schema.remove(i).insert(i, new_field)

table = table.cast(schema)
else:
for i in range(table.num_columns):
col = table[i]
Expand Down
14 changes: 14 additions & 0 deletions tests/serialization/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,17 @@ def test_read_categorical(store):

df = serialiser.restore_dataframe(store, key, categories=["col"])
assert df.dtypes["col"] == pd.CategoricalDtype(["a"], ordered=False)


def test_read_categorical_empty(store):

df = pd.DataFrame({"col": ["a"]}).astype({"col": "category"}).iloc[:0]
serialiser = ParquetSerializer()
key = serialiser.store(store, "prefix", df)

df = serialiser.restore_dataframe(store, key)
assert df.dtypes["col"] == "O"

df = serialiser.restore_dataframe(store, key, categories=["col"])

assert df.dtypes["col"] == pd.CategoricalDtype([], ordered=False)

0 comments on commit c29bf98

Please sign in to comment.