diff --git a/CHANGES.rst b/CHANGES.rst index d1bff4ff..b2e4f9b3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -13,6 +13,8 @@ Improvements Bug fixes ^^^^^^^^^ * GH248 Fix an issue causing a ValueError to be raised when using `dask_index_on` on non-integer columns +* GH255 Fix an issue causing the python interpreter to shut down when reading an + empty file (see also https://issues.apache.org/jira/browse/ARROW-8142) Version 3.8.0 (2020-03-12) ========================== diff --git a/docs/guide/partitioning.rst b/docs/guide/partitioning.rst index 4819f655..9b9967e3 100644 --- a/docs/guide/partitioning.rst +++ b/docs/guide/partitioning.rst @@ -136,6 +136,7 @@ structure would be different if the columns are in a different order. .. note:: Every partition must have data for every table. An empty dataframe in this context is also considered as data. +.. _partitioning_dask: Force partitioning by shuffling using Dask ------------------------------------------ @@ -169,6 +170,8 @@ number of physical input partitions. ).compute() sorted(dm.partitions.keys()) +.. _shuffling: + Shuffling ********* diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index 21711d39..0608ecd3 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -170,13 +170,30 @@ def update_dataset_from_ddf( .. note:: This can only be used for datasets with a single table! + See also, :ref:`partitioning_dask`. + Parameters ---------- ddf: Union[dask.dataframe.DataFrame, None] The dask.Dataframe to be used to calculate the new partitions from. If this parameter is `None`, the update pipeline will only delete partitions without creating new ones. shuffle: bool - If True and partition_on is requested, shuffle the data to reduce number of output partitions + If `True` and `partition_on` is requested, shuffle the data to reduce number of output partitions. + + See also, :ref:`shuffling`. + + .. warning:: + + Dask uses a heuristic to determine how data is shuffled and there are two options, `partd` for local disk shuffling and `tasks` for distributed shuffling using a task graph. If there is no :class:`distributed.Client` in the context and the option is not set explicitly, dask will choose `partd` which may cause data loss when the graph is executed on a distributed cluster. + + Therefore, we recommend to specify the dask shuffle method explicitly, e.g. by using a context manager. + + .. code:: + + with dask.config(shuffle='tasks'): + graph = update_dataset_from_ddf(...) + graph.compute() + repartition_ratio: Optional[Union[int, float]] If provided, repartition the dataframe before calculation starts to ``ceil(ddf.npartitions / repartition_ratio)`` num_buckets: int diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index c38e83d1..568f375f 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -61,16 +61,32 @@ def _reset_dictionary_columns(table, exclude=None): """ if exclude is None: exclude = [] + if ARROW_LARGER_EQ_0150: - schema = table.schema - for i in range(len(schema)): - field = schema[i] - if pa.types.is_dictionary(field.type): - new_field = pa.field( - field.name, field.type.value_type, field.nullable, field.metadata - ) - schema = schema.remove(i).insert(i, new_field) - table = table.cast(schema) + # https://issues.apache.org/jira/browse/ARROW-8142 + if len(table) == 0: + df = table.to_pandas(date_as_object=True) + new_types = { + col: df[col].cat.categories.dtype + for col in df.select_dtypes("category") + } + if new_types: + df = df.astype(new_types) + table = pa.Table.from_pandas(df) + else: + schema = table.schema + for i in range(len(schema)): + field = schema[i] + if pa.types.is_dictionary(field.type): + new_field = pa.field( + field.name, + field.type.value_type, + field.nullable, + field.metadata, + ) + schema = schema.remove(i).insert(i, new_field) + + table = table.cast(schema) else: for i in range(table.num_columns): col = table[i] diff --git a/tests/serialization/test_parquet.py b/tests/serialization/test_parquet.py index c9b78a67..da0c78ba 100644 --- a/tests/serialization/test_parquet.py +++ b/tests/serialization/test_parquet.py @@ -443,3 +443,17 @@ def test_read_categorical(store): df = serialiser.restore_dataframe(store, key, categories=["col"]) assert df.dtypes["col"] == pd.CategoricalDtype(["a"], ordered=False) + + +def test_read_categorical_empty(store): + + df = pd.DataFrame({"col": ["a"]}).astype({"col": "category"}).iloc[:0] + serialiser = ParquetSerializer() + key = serialiser.store(store, "prefix", df) + + df = serialiser.restore_dataframe(store, key) + assert df.dtypes["col"] == "O" + + df = serialiser.restore_dataframe(store, key, categories=["col"]) + + assert df.dtypes["col"] == pd.CategoricalDtype([], ordered=False)