Skip to content

Commit

Permalink
Merge branch 'master' of github.com:JDASoftwareGroup/kartothek
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jun 3, 2020
2 parents c629f74 + dabc669 commit 2bdf984
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Version 3.9.0 (UNRELEASED)
==========================

* Significant performance improvements for shuffle operations in :func:`~kartothek.io.dask.dataframe.update_dataset_from_ddf`
* Allow calling :func:`~kartothek.io.dask.dataframe.update_dataset_from_ddf`
without `partition_on` when `shuffle=True`


Version 3.8.2 (2020-04-09)
==========================
Expand Down
15 changes: 9 additions & 6 deletions kartothek/io/dask/_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def update_dask_partitions_shuffle(
store_factory: StoreFactoryType,
df_serializer: DataFrameSerializer,
dataset_uuid: str,
num_buckets: Optional[int],
num_buckets: int,
sort_partitions_by: Optional[str],
bucket_by: List[str],
) -> da.Array:
Expand Down Expand Up @@ -236,11 +236,14 @@ def update_dask_partitions_shuffle(
return ddf

group_cols = partition_on.copy()
if num_buckets is not None:
meta = ddf._meta
meta[_KTK_HASH_BUCKET] = np.uint64(0)
ddf = ddf.map_partitions(_hash_bucket, bucket_by, num_buckets, meta=meta)
group_cols.append(_KTK_HASH_BUCKET)

if num_buckets is None:
raise ValueError("``num_buckets`` must not be None when shuffling data.")

meta = ddf._meta
meta[_KTK_HASH_BUCKET] = np.uint64(0)
ddf = ddf.map_partitions(_hash_bucket, bucket_by, num_buckets, meta=meta)
group_cols.append(_KTK_HASH_BUCKET)

packed_meta = ddf._meta[group_cols]
packed_meta[_PAYLOAD_COL] = b""
Expand Down
19 changes: 12 additions & 7 deletions kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def read_dataset_as_ddf(
predicates=None,
factory=None,
dask_index_on=None,
dispatch_by=None,
):
"""
Retrieve a single table from a dataset as partition-individual :class:`~dask.dataframe.DataFrame` instance.
Expand All @@ -50,14 +51,22 @@ def read_dataset_as_ddf(
Parameters
----------
dask_index_on: str
Reconstruct (and set) a dask index on the provided index column.
Reconstruct (and set) a dask index on the provided index column. Cannot be used
in conjunction with `dispatch_by`.
For details on performance, see also `dispatch_by`
"""
if dask_index_on is not None and not isinstance(dask_index_on, str):
raise TypeError(
f"The paramter `dask_index_on` must be a string but got {type(dask_index_on)}"
)

if dask_index_on is not None and dispatch_by is not None and len(dispatch_by) > 0:
raise ValueError(
"`read_dataset_as_ddf` got parameters `dask_index_on` and `dispatch_by`. "
"Note that `dispatch_by` can only be used if `dask_index_on` is None."
)

ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
Expand All @@ -84,7 +93,7 @@ def read_dataset_as_ddf(
label_filter=label_filter,
dates_as_object=dates_as_object,
predicates=predicates,
dispatch_by=dask_index_on,
dispatch_by=dask_index_on if dask_index_on else dispatch_by,
)
if dask_index_on:
divisions = ds_factory.indices[dask_index_on].observed_values()
Expand Down Expand Up @@ -239,10 +248,6 @@ def update_dataset_from_ddf(
ds_factory=factory,
)

if shuffle and not partition_on:
raise ValueError(
"If ``shuffle`` is requested, at least one ``partition_on`` column needs to be provided."
)
if ds_factory is not None:
check_single_table_dataset(ds_factory, table)

Expand All @@ -260,7 +265,7 @@ def update_dataset_from_ddf(
else:
secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices)

if shuffle and partition_on:
if shuffle:
mps = update_dask_partitions_shuffle(
ddf=ddf,
table=table,
Expand Down
11 changes: 11 additions & 0 deletions reference-data/arrow-compat/batch_generate_references.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

# Note: this assumes you have kartothek installed in your current environment and you are using conda

PYARROW_VERSIONS="0.14.1 0.15.0 0.16.0"

for pyarrow_version in $PYARROW_VERSIONS; do
echo $pyarrow_version
conda install -y pyarrow==$pyarrow_version
./generate_reference.py || (echo "Failed for version $pyarrow_version"; exit 1)
done
47 changes: 47 additions & 0 deletions tests/io/dask/dataframe/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,53 @@ def _return_none():
return None


@pytest.mark.parametrize("bucket_by", [None, "range"])
def test_update_shuffle_no_partition_on(store_factory, bucket_by):
df = pd.DataFrame(
{
"range": np.arange(10),
"range_duplicated": np.repeat(np.arange(2), 5),
"random": np.random.randint(0, 100, 10),
}
)
ddf = dd.from_pandas(df, npartitions=10)

with pytest.raises(
ValueError, match="``num_buckets`` must not be None when shuffling data."
):
update_dataset_from_ddf(
ddf,
store_factory,
dataset_uuid="output_dataset_uuid",
table="table",
shuffle=True,
num_buckets=None,
bucket_by=bucket_by,
).compute()

res_default = update_dataset_from_ddf(
ddf,
store_factory,
dataset_uuid="output_dataset_uuid_default",
table="table",
shuffle=True,
bucket_by=bucket_by,
).compute()
assert len(res_default.partitions) == 1

res = update_dataset_from_ddf(
ddf,
store_factory,
dataset_uuid="output_dataset_uuid",
table="table",
shuffle=True,
num_buckets=2,
bucket_by=bucket_by,
).compute()

assert len(res.partitions) == 2


@pytest.mark.parametrize("unique_primaries", [1, 4])
@pytest.mark.parametrize("unique_secondaries", [1, 3])
@pytest.mark.parametrize("num_buckets", [1, 5])
Expand Down
6 changes: 2 additions & 4 deletions tests/serialization/test_arrow_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ def test_arrow_compat(arrow_version, reference_store, mocker):
Test if reading/writing across the supported arrow versions is actually
compatible
Generate new reference files with::
import pyarrow as pa
ParquetSerializer().store(reference_store, pa.__version__, orig)
Generate new reference files by going to the `reference-data/arrow-compat` directory and
executing `generate_reference.py` or `batch_generate_reference.sh`.
"""

uuid_hook = mocker.patch("kartothek.core.uuid._uuid_hook_object")
Expand Down

0 comments on commit 2bdf984

Please sign in to comment.