Skip to content

Commit

Permalink
Implement update_cube_from_bag (#370)
Browse files Browse the repository at this point in the history
Co-authored-by: Jochen Ott <[email protected]>
  • Loading branch information
jochen-ott-by and jochen-ott-by authored Nov 18, 2020
1 parent 3870303 commit f65bba9
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 8 deletions.
46 changes: 45 additions & 1 deletion kartothek/io/dask/bag_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

__all__ = (
"append_to_cube_from_bag",
"update_cube_from_bag",
"build_cube_from_bag",
"cleanup_cube_bag",
"collect_stats_bag",
Expand Down Expand Up @@ -378,7 +379,7 @@ def append_to_cube_from_bag(data, cube, store, ktk_cube_dataset_ids, metadata=No
.. hint::
To have better control over the overwrite "mask" (i.e. which partitions are overwritten), you should use
:meth:`remove_partitions` beforehand.
:meth:`remove_partitions` beforehand or use :meth:`update_cube_from_bag` instead.
Parameters
----------
Expand Down Expand Up @@ -409,6 +410,49 @@ def append_to_cube_from_bag(data, cube, store, ktk_cube_dataset_ids, metadata=No
)


def update_cube_from_bag(
data, cube, store, remove_conditions, ktk_cube_dataset_ids, metadata=None
):
"""
Remove partitions and append data to existing cube.
For details on ``data`` and ``metadata``, see :meth:`build_cube`.
Only datasets in `ktk_cube_dataset_ids` will be affected.
Parameters
----------
data: dask.Bag
Bag containing dataframes
cube: kartothek.core.cube.cube.Cube
Cube specification.
store: Callable[[], simplekv.KeyValueStore]
Store to which the data should be written to.
remove_conditions
Conditions that select the partitions to remove. Must be a condition that only uses
partition columns.
ktk_cube_dataset_ids: Optional[Iterable[str]]
Datasets that will be written, must be specified in advance.
metadata: Optional[Dict[str, Dict[str, Any]]]
Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of
metadata keys is not possible.
Returns
-------
metadata_dict: dask.bag.Bag
A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata
objects. The bag has a single partition with a single element.
"""
return append_to_cube_from_bag_internal(
data=data,
cube=cube,
store=store,
remove_conditions=remove_conditions,
ktk_cube_dataset_ids=ktk_cube_dataset_ids,
metadata=metadata,
)


def _delete(keys, store):
if callable(store):
store = store()
Expand Down
45 changes: 38 additions & 7 deletions kartothek/io/dask/common_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from kartothek.io_components.cube.append import check_existing_datasets
from kartothek.io_components.cube.common import check_blocksize, check_store_factory
from kartothek.io_components.cube.query import load_group, plan_query, quick_concat
from kartothek.io_components.cube.remove import (
prepare_metapartitions_for_removal_action,
)
from kartothek.io_components.cube.write import (
apply_postwrite_checks,
check_datasets_prebuild,
Expand Down Expand Up @@ -293,7 +296,9 @@ def query_cube_bag_internal(
return empty, b


def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, metadata):
def append_to_cube_from_bag_internal(
data, cube, store, ktk_cube_dataset_ids, metadata, remove_conditions=None
):
"""
Append data to existing cube.
Expand All @@ -304,10 +309,6 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me
Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the
old data is treated as "removed".
.. hint::
To have better control over the overwrite "mask" (i.e. which partitions are overwritten), you should use
:meth:`remove_partitions` beforehand.
Parameters
----------
Expand All @@ -322,6 +323,8 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me
metadata: Dict[str, Dict[str, Any]]
Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of
metadata keys is not possible.
remove_conditions
Conditions that select which partitions to remove.
Returns
-------
Expand All @@ -344,9 +347,20 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me
existing_datasets=existing_datasets, ktk_cube_dataset_ids=ktk_cube_dataset_ids
)

if remove_conditions is not None:
remove_metapartitions = prepare_metapartitions_for_removal_action(
cube, store, remove_conditions, ktk_cube_dataset_ids, existing_datasets
)
delete_scopes = {
k: delete_scope for k, (_, _, delete_scope) in remove_metapartitions.items()
}
else:
delete_scopes = {}

data = (
data.map(multiplex_user_input, cube=cube)
.map(_check_dataset_ids, ktk_cube_dataset_ids=ktk_cube_dataset_ids)
.map(_fill_dataset_ids, ktk_cube_dataset_ids=ktk_cube_dataset_ids)
.map(
_multiplex_prepare_data_for_ktk,
cube=cube,
Expand All @@ -368,6 +382,7 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me
},
update=True,
existing_datasets=existing_datasets,
delete_scopes=delete_scopes,
)

data = data.map(
Expand Down Expand Up @@ -421,6 +436,13 @@ def _check_dataset_ids(dct, ktk_cube_dataset_ids):
return dct


def _fill_dataset_ids(dct, ktk_cube_dataset_ids):
# make sure dct contains an entry for each ktk_cube_dataset_ids, filling in None
# if necessary
dct.update({ktk_id: None for ktk_id in ktk_cube_dataset_ids if ktk_id not in dct})
return dct


def _store_bag_as_dataset_parallel(
bag,
store,
Expand All @@ -430,10 +452,14 @@ def _store_bag_as_dataset_parallel(
existing_datasets,
overwrite=False,
update=False,
delete_scopes=None,
):
"""
Vendored, simplified and modified version of kartotheks ``store_bag_as_dataset`` which cannot be easily used to
store datasets in parallel (e.g. from a dict).
`delete_scope` is a dictionary mapping the kartothek dataset id to the `delete_scope` of the dataset
(see `update_dataset_from_partitions` for the definition of the single dataset `delete_scope`).
"""
if (not update) and (not overwrite):
for ktk_cube_dataset_id in ktk_cube_dataset_ids:
Expand All @@ -455,6 +481,7 @@ def _store_bag_as_dataset_parallel(
metadata=metadata,
store=store,
update=update,
delete_scopes=delete_scopes or {},
)

return mps.reduction(
Expand All @@ -478,7 +505,7 @@ def _multiplex_prepare_data_for_ktk(data, cube, existing_payload, partition_on):


def _multiplex_store_dataset_from_partitions_flat(
mpss, cube, metadata, update, store, existing_datasets
mpss, cube, metadata, update, store, existing_datasets, delete_scopes
):
dct = defaultdict(list)
for sublist in mpss:
Expand All @@ -489,19 +516,23 @@ def _multiplex_store_dataset_from_partitions_flat(
result = {}
for k, v in dct.items():
if update:
print(
f"_multiplex_store_dataset_from_partitions_flat: {k} delete scope: {delete_scopes.get(k, [])}"
)
ds_factory = metadata_factory_from_dataset(
existing_datasets[k], with_schema=True, store=store
)
result[k] = update_dataset_from_partitions(
v,
dataset_uuid=cube.ktk_dataset_uuid(k),
delete_scope=[],
delete_scope=delete_scopes.get(k, []),
ds_factory=ds_factory,
metadata=metadata[k],
metadata_merger=None,
store_factory=store,
)
else:
print(f"_multiplex_store_dataset_from_partitions_flat: {k} no update")
result[k] = store_dataset_from_partitions(
v,
dataset_metadata=metadata[k],
Expand Down
160 changes: 160 additions & 0 deletions kartothek/io/testing/update_cube.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from typing import Tuple

import numpy as np
import pandas as pd
import pytest

from kartothek.core.cube.conditions import C
from kartothek.core.cube.cube import Cube
from kartothek.core.dataset import DatasetMetadata
from kartothek.io.eager import read_table
from kartothek.io.eager_cube import build_cube, extend_cube, query_cube


def _write_cube(function_store) -> Tuple[pd.DataFrame, Cube]:
"""
Write a cube with dimension column "x" and partition column "p"
returns the 'source' and 'enrich' dataframes and the cube specification.
"""
df_source = pd.DataFrame(
{
"i1": [10, 11, 12, 13],
"p": [0, 0, 1, 1],
"v1": [10, 11, 12, 13],
"x": [0, 1, 2, 3],
}
)
cube = Cube(
dimension_columns=["x"],
partition_columns=["p"],
uuid_prefix="cube",
seed_dataset="source",
index_columns=["i1", "i2", "i3"],
)
build_cube(
data={"source": df_source},
cube=cube,
store=function_store,
metadata={"source": {"meta_at_create": "data"}},
)
return df_source, cube


def _extend_cube(cube, function_store) -> pd.DataFrame:
# extend the existing cube by a dataset 'ex' with columns a = x + 1000
df = pd.DataFrame({"a": [1000, 1001], "p": [0, 1], "x": [0, 2]})
extend_cube({"ex": df}, cube, function_store)
return df


@pytest.mark.parametrize(
"remove_partitions,new_partitions",
[
# only append:
([], [4, 5]),
# do nothing:
([], []),
# partial overwrite with new data for p=0
([0], [0, 1, 4]),
# explicitly remove p=0 without overwriting it
([0], [1, 4]),
# overwrite all:
([0, 1], [0, 1]),
],
)
def test_update_partitions(driver, function_store, remove_partitions, new_partitions):
df_source, cube = _write_cube(function_store)

df_source_new = pd.DataFrame(
{
"i1": range(200, 200 + len(new_partitions)),
"p": np.array(new_partitions, np.int64),
"v1": range(300, 300 + len(new_partitions)),
"x": range(100, 100 + len(new_partitions)),
}
)

# what should remain of the old data:
df_source_of_old = df_source.loc[~df_source["p"].isin(set(remove_partitions))]
df_source_expected_after = pd.concat(
[df_source_of_old, df_source_new], sort=False, ignore_index=True
)

remove_conditions = C("p").isin(remove_partitions)

result = driver(
data={"source": df_source_new},
remove_conditions=remove_conditions,
cube=cube,
store=function_store,
ktk_cube_dataset_ids={"source"},
metadata={"source": {"some_new_meta": 42}},
)

assert set(result.keys()) == {"source"}

dm_source_after = DatasetMetadata.load_from_store(
cube.ktk_dataset_uuid("source"), function_store(), load_all_indices=True
)

assert "some_new_meta" in dm_source_after.metadata
assert "meta_at_create" in dm_source_after.metadata

# check values for "p" are as expected:
expected_p_source = (set(df_source["p"].unique()) - set(remove_partitions)) | set(
new_partitions
)
assert set(dm_source_after.indices["p"].index_dct) == expected_p_source

df_read = query_cube(cube, function_store)[0]

assert set(df_read.columns) == set(df_source_expected_after.columns)

for df in (df_read, df_source_expected_after):
df.sort_values("x", inplace=True)
df.reset_index(drop=True, inplace=True)

pd.testing.assert_frame_equal(df_read, df_source_expected_after)


@pytest.mark.parametrize(
"ktk_cube_dataset_ids", [{"source", "ex"}, {"source"}, {"ex"}, set()]
)
def test_update_respects_ktk_cube_dataset_ids(
driver, function_store, ktk_cube_dataset_ids
):
df_source, cube = _write_cube(function_store)
df_ex = _extend_cube(cube, function_store)

remove_conditions = C("p") == 0

# This implicitly also tests that `data={}` behaves as expected and still deletes partitions
# as requested via ktk_cube_dataset_ids and remove_conditions
result = driver(
data={},
remove_conditions=remove_conditions,
cube=cube,
store=function_store,
ktk_cube_dataset_ids=ktk_cube_dataset_ids,
)
assert set(result) == ktk_cube_dataset_ids
df_read = query_cube(cube, function_store)[0]

# expected result: df_source left joined with df_ex; choosing the subset of p!=0 from each
# that is in `ktk_cube_dataset_ids`:
if "source" in ktk_cube_dataset_ids:
df_source = df_source.loc[df_source["p"] != 0]
if "ex" in ktk_cube_dataset_ids:
df_ex = df_ex.loc[df_ex["p"] != 0]
df_expected = df_source.merge(df_ex[["x", "a"]], how="left", on="x")
df_expected = df_expected[sorted(df_expected.columns)]
pd.testing.assert_frame_equal(df_read, df_expected)

# test "ex" separately, because the test above based on the *left* merge does not tell us much about
# "ex" in case the partitions were removed from "source"
df_ex_read = read_table(cube.ktk_dataset_uuid("ex"), function_store)
if "ex" in ktk_cube_dataset_ids:
assert set(df_ex_read["p"]) == {1}
else:
assert set(df_ex_read["p"]) == {0, 1}
15 changes: 15 additions & 0 deletions tests/io/cube/test_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest
from tests.io.cube.utils import wrap_bag_write

from kartothek.io.dask.bag_cube import update_cube_from_bag
from kartothek.io.testing.update_cube import * # noqa


@pytest.fixture
def driver(driver_name):
if driver_name == "dask_bag_bs1":
return wrap_bag_write(update_cube_from_bag, blocksize=1)
elif driver_name == "dask_bag_bs3":
return wrap_bag_write(update_cube_from_bag, blocksize=3)
else:
pytest.skip()

0 comments on commit f65bba9

Please sign in to comment.