Skip to content

Commit

Permalink
Feat/collect parqet statistics (#306)
Browse files Browse the repository at this point in the history
Add io.dask.dataframe.collect_dataset_metadata to collect sampled parquet metadata
  • Loading branch information
NeroCorleone authored Jul 15, 2020
1 parent 2cf87df commit 396dabd
Show file tree
Hide file tree
Showing 6 changed files with 649 additions and 3 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ Improvements
to allow for transition for future breaking release.


New functionality
^^^^^^^^^^^^^^^^^

* Add :meth:`~kartothek.io_components.metapartition.MetaPartition.get_parquet_metadata` and :func:`~kartothek.io.dask.dataframe.collect_dataset_metadata`, enabling users to collect information about the Parquet metadata of a dataset

Bug fixes
^^^^^^^^^

Expand Down
106 changes: 105 additions & 1 deletion kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import random
from typing import Callable, Optional

import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
from simplekv import KeyValueStore

from kartothek.core._compat import ARROW_LARGER_EQ_0141
from kartothek.core.common_metadata import empty_dataframe_from_schema
from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.factory import DatasetFactory, _ensure_factory
from kartothek.core.naming import DEFAULT_METADATA_VERSION
from kartothek.io_components.metapartition import (
_METADATA_SCHEMA,
SINGLE_TABLE,
MetaPartition,
parse_input_to_metapartition,
)
from kartothek.io_components.read import dispatch_metapartitions_from_factory
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import (
_ensure_compatible_indices,
Expand All @@ -18,6 +27,7 @@
normalize_args,
validate_partition_keys,
)
from kartothek.serialization import PredicatesType

from ._update import update_dask_partitions_one_to_one, update_dask_partitions_shuffle
from ._utils import _maybe_get_categoricals_from_index
Expand Down Expand Up @@ -301,3 +311,97 @@ def update_dataset_from_ddf(
metadata=metadata,
metadata_merger=metadata_merger,
)


def collect_dataset_metadata(
store: Optional[Callable[[], KeyValueStore]] = None,
dataset_uuid: Optional[str] = None,
table_name: str = SINGLE_TABLE,
predicates: Optional[PredicatesType] = None,
frac: float = 1.0,
factory: Optional[DatasetFactory] = None,
) -> dd.DataFrame:
"""
Collect parquet metadata of the dataset. The `frac` parameter can be used to select a subset of the data.
.. warning::
If the size of the partitions is not evenly distributed, e.g. some partitions might be larger than others,
the metadata returned is not a good approximation for the whole dataset metadata.
.. warning::
Using the `frac` parameter is not encouraged for a small number of total partitions.
Parameters
----------
store
A factory function providing a KeyValueStore
dataset_uuid
The dataset's unique identifier
table_name
Name of the kartothek table for which to retrieve the statistics
predicates
Kartothek predicates to apply filters on the data for which to gather statistics
.. warning::
Filtering will only be applied for predicates on indices.
The evaluation of the predicates therefore will therefore only return an approximate result.
frac
Fraction of the total number of partitions to use for gathering statistics. `frac == 1.0` will use all partitions.
factory
A DatasetFactory holding the store and UUID to the source dataset.
Returns
-------
A dask.DataFrame containing the following information about dataset statistics:
* `partition_label`: File name of the parquet file, unique to each physical partition.
* `row_group_id`: Index of the row groups within one parquet file.
* `row_group_compressed_size`: Byte size of the data within one row group.
* `row_group_uncompressed_size`: Byte size (uncompressed) of the data within one row group.
* `number_rows_total`: Total number of rows in one parquet file.
* `number_row_groups`: Number of row groups in one parquet file.
* `serialized_size`: Serialized size of the parquet file.
* `number_rows_per_row_group`: Number of rows per row group.
Raises
------
ValueError
If no metadata could be retrieved, raise an error.
"""
if not ARROW_LARGER_EQ_0141:
raise RuntimeError("This function requires `pyarrow>=0.14.1`.")
if not 0.0 < frac <= 1.0:
raise ValueError(
f"Invalid value for parameter `frac`: {frac}."
"Please make sure to provide a value larger than 0.0 and smaller than or equal to 1.0 ."
)
dataset_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_dataset_metadata=False,
)

mps = list(
dispatch_metapartitions_from_factory(dataset_factory, predicates=predicates)
)
if mps:
random.shuffle(mps)
# ensure that even with sampling at least one metapartition is returned
cutoff_index = max(1, int(len(mps) * frac))
mps = mps[:cutoff_index]
ddf = dd.from_delayed(
[
dask.delayed(MetaPartition.get_parquet_metadata)(
mp, store=dataset_factory.store_factory, table_name=table_name,
)
for mp in mps
]
)
else:
df = pd.DataFrame(columns=_METADATA_SCHEMA.keys())
df = df.astype(_METADATA_SCHEMA)
ddf = dd.from_pandas(df, npartitions=1)

return ddf
1 change: 1 addition & 0 deletions kartothek/io_components/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def align_datasets(left_dataset_uuid, right_dataset_uuid, store, match_how="exac
Yields
------
list
"""
store = _instantiate_store(store)
left_dataset = DatasetMetadata.load_from_store(uuid=left_dataset_uuid, store=store)
Expand Down
94 changes: 92 additions & 2 deletions kartothek/io_components/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import os
import time
import warnings
from collections import Iterable, Iterator, defaultdict, namedtuple
from collections import Iterable, defaultdict, namedtuple
from copy import copy
from functools import wraps
from typing import Any, Dict, Optional, cast
from typing import Any, Callable, Dict, Iterator, Optional, cast

import numpy as np
import pandas as pd
import pyarrow as pa
from simplekv import KeyValueStore

from kartothek.core import naming
from kartothek.core._compat import ARROW_LARGER_EQ_0150
Expand Down Expand Up @@ -51,6 +52,17 @@
_Literal = namedtuple("_Literal", ["column", "op", "value"])
_SplitPredicate = namedtuple("_SplitPredicate", ["key_part", "content_part"])

_METADATA_SCHEMA = {
"partition_label": np.dtype("O"),
"row_group_id": np.dtype(int),
"row_group_compressed_size": np.dtype(int),
"row_group_uncompressed_size": np.dtype(int),
"number_rows_total": np.dtype(int),
"number_row_groups": np.dtype(int),
"serialized_size": np.dtype(int),
"number_rows_per_row_group": np.dtype(int),
}


def _predicates_to_named(predicates):
if predicates is None:
Expand Down Expand Up @@ -1549,6 +1561,84 @@ def delete_from_store(self, dataset_uuid, store):
store.delete(file_key)
return self.copy(files={}, data={}, metadata={})

def get_parquet_metadata(
self, store: Callable[[], KeyValueStore], table_name: str
) -> pd.DataFrame:
"""
Retrieve the parquet metadata for the MetaPartition.
Especially relevant for calculating dataset statistics.
Parameters
----------
store
A factory function providing a KeyValueStore
table_name
Name of the kartothek table for which the statistics should be retrieved
Returns
-------
pd.DataFrame
A DataFrame with relevant parquet metadata
"""
if not isinstance(table_name, str):
raise TypeError("Expecting a string for parameter `table_name`.")

if callable(store):
store = store()

data = {}
if table_name in self.files:
with store.open(self.files[table_name]) as fd: # type: ignore
pq_metadata = pa.parquet.ParquetFile(fd).metadata
try:
metadata_dict = pq_metadata.to_dict()
except AttributeError: # No data in file
metadata_dict = None
data = {
"partition_label": self.label,
"serialized_size": pq_metadata.serialized_size,
"number_rows_total": pq_metadata.num_rows,
"number_row_groups": pq_metadata.num_row_groups,
"row_group_id": [0],
"number_rows_per_row_group": [0],
"row_group_compressed_size": [0],
"row_group_uncompressed_size": [0],
}

if metadata_dict:
# Note: could just parse this entire dict into a pandas dataframe, w/o the below processing
data = {
"partition_label": self.label,
"serialized_size": metadata_dict["serialized_size"],
"number_rows_total": metadata_dict["num_rows"],
"number_row_groups": metadata_dict["num_row_groups"],
"row_group_id": [],
"number_rows_per_row_group": [],
"row_group_compressed_size": [],
"row_group_uncompressed_size": [],
}

for row_group_id, row_group_metadata in enumerate(
metadata_dict["row_groups"]
):
data["row_group_id"].append(row_group_id)
data["number_rows_per_row_group"].append(
row_group_metadata["num_rows"]
)
data["row_group_compressed_size"].append(
row_group_metadata["total_byte_size"]
)
data["row_group_uncompressed_size"].append(
sum(
col["total_uncompressed_size"]
for col in row_group_metadata["columns"]
)
)

df = pd.DataFrame(data=data, columns=_METADATA_SCHEMA.keys())
df = df.astype(_METADATA_SCHEMA)
return df


def _unique_label(label_list):
label = os.path.commonprefix(label_list)
Expand Down
Loading

0 comments on commit 396dabd

Please sign in to comment.