diff --git a/kartothek/io/dask/bag_cube.py b/kartothek/io/dask/bag_cube.py index cecaca02..15dd177e 100644 --- a/kartothek/io/dask/bag_cube.py +++ b/kartothek/io/dask/bag_cube.py @@ -27,6 +27,7 @@ __all__ = ( "append_to_cube_from_bag", + "update_cube_from_bag", "build_cube_from_bag", "cleanup_cube_bag", "collect_stats_bag", @@ -378,7 +379,7 @@ def append_to_cube_from_bag(data, cube, store, ktk_cube_dataset_ids, metadata=No .. hint:: To have better control over the overwrite "mask" (i.e. which partitions are overwritten), you should use - :meth:`remove_partitions` beforehand. + :meth:`remove_partitions` beforehand or use :meth:`update_cube_from_bag` instead. Parameters ---------- @@ -409,6 +410,49 @@ def append_to_cube_from_bag(data, cube, store, ktk_cube_dataset_ids, metadata=No ) +def update_cube_from_bag( + data, cube, store, remove_conditions, ktk_cube_dataset_ids, metadata=None +): + """ + Remove partitions and append data to existing cube. + + For details on ``data`` and ``metadata``, see :meth:`build_cube`. + + Only datasets in `ktk_cube_dataset_ids` will be affected. + + Parameters + ---------- + data: dask.Bag + Bag containing dataframes + cube: kartothek.core.cube.cube.Cube + Cube specification. + store: Callable[[], simplekv.KeyValueStore] + Store to which the data should be written to. + remove_conditions + Conditions that select the partitions to remove. Must be a condition that only uses + partition columns. + ktk_cube_dataset_ids: Optional[Iterable[str]] + Datasets that will be written, must be specified in advance. + metadata: Optional[Dict[str, Dict[str, Any]]] + Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of + metadata keys is not possible. + + Returns + ------- + metadata_dict: dask.bag.Bag + A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata + objects. The bag has a single partition with a single element. + """ + return append_to_cube_from_bag_internal( + data=data, + cube=cube, + store=store, + remove_conditions=remove_conditions, + ktk_cube_dataset_ids=ktk_cube_dataset_ids, + metadata=metadata, + ) + + def _delete(keys, store): if callable(store): store = store() diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index 34614d7e..22563012 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -16,6 +16,9 @@ from kartothek.io_components.cube.append import check_existing_datasets from kartothek.io_components.cube.common import check_blocksize, check_store_factory from kartothek.io_components.cube.query import load_group, plan_query, quick_concat +from kartothek.io_components.cube.remove import ( + prepare_metapartitions_for_removal_action, +) from kartothek.io_components.cube.write import ( apply_postwrite_checks, check_datasets_prebuild, @@ -293,7 +296,9 @@ def query_cube_bag_internal( return empty, b -def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, metadata): +def append_to_cube_from_bag_internal( + data, cube, store, ktk_cube_dataset_ids, metadata, remove_conditions=None +): """ Append data to existing cube. @@ -304,10 +309,6 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as "removed". - .. hint:: - - To have better control over the overwrite "mask" (i.e. which partitions are overwritten), you should use - :meth:`remove_partitions` beforehand. Parameters ---------- @@ -322,6 +323,8 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me metadata: Dict[str, Dict[str, Any]] Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible. + remove_conditions + Conditions that select which partitions to remove. Returns ------- @@ -344,9 +347,20 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me existing_datasets=existing_datasets, ktk_cube_dataset_ids=ktk_cube_dataset_ids ) + if remove_conditions is not None: + remove_metapartitions = prepare_metapartitions_for_removal_action( + cube, store, remove_conditions, ktk_cube_dataset_ids, existing_datasets + ) + delete_scopes = { + k: delete_scope for k, (_, _, delete_scope) in remove_metapartitions.items() + } + else: + delete_scopes = {} + data = ( data.map(multiplex_user_input, cube=cube) .map(_check_dataset_ids, ktk_cube_dataset_ids=ktk_cube_dataset_ids) + .map(_fill_dataset_ids, ktk_cube_dataset_ids=ktk_cube_dataset_ids) .map( _multiplex_prepare_data_for_ktk, cube=cube, @@ -368,6 +382,7 @@ def append_to_cube_from_bag_internal(data, cube, store, ktk_cube_dataset_ids, me }, update=True, existing_datasets=existing_datasets, + delete_scopes=delete_scopes, ) data = data.map( @@ -421,6 +436,13 @@ def _check_dataset_ids(dct, ktk_cube_dataset_ids): return dct +def _fill_dataset_ids(dct, ktk_cube_dataset_ids): + # make sure dct contains an entry for each ktk_cube_dataset_ids, filling in None + # if necessary + dct.update({ktk_id: None for ktk_id in ktk_cube_dataset_ids if ktk_id not in dct}) + return dct + + def _store_bag_as_dataset_parallel( bag, store, @@ -430,10 +452,14 @@ def _store_bag_as_dataset_parallel( existing_datasets, overwrite=False, update=False, + delete_scopes=None, ): """ Vendored, simplified and modified version of kartotheks ``store_bag_as_dataset`` which cannot be easily used to store datasets in parallel (e.g. from a dict). + + `delete_scope` is a dictionary mapping the kartothek dataset id to the `delete_scope` of the dataset + (see `update_dataset_from_partitions` for the definition of the single dataset `delete_scope`). """ if (not update) and (not overwrite): for ktk_cube_dataset_id in ktk_cube_dataset_ids: @@ -455,6 +481,7 @@ def _store_bag_as_dataset_parallel( metadata=metadata, store=store, update=update, + delete_scopes=delete_scopes or {}, ) return mps.reduction( @@ -478,7 +505,7 @@ def _multiplex_prepare_data_for_ktk(data, cube, existing_payload, partition_on): def _multiplex_store_dataset_from_partitions_flat( - mpss, cube, metadata, update, store, existing_datasets + mpss, cube, metadata, update, store, existing_datasets, delete_scopes ): dct = defaultdict(list) for sublist in mpss: @@ -489,19 +516,23 @@ def _multiplex_store_dataset_from_partitions_flat( result = {} for k, v in dct.items(): if update: + print( + f"_multiplex_store_dataset_from_partitions_flat: {k} delete scope: {delete_scopes.get(k, [])}" + ) ds_factory = metadata_factory_from_dataset( existing_datasets[k], with_schema=True, store=store ) result[k] = update_dataset_from_partitions( v, dataset_uuid=cube.ktk_dataset_uuid(k), - delete_scope=[], + delete_scope=delete_scopes.get(k, []), ds_factory=ds_factory, metadata=metadata[k], metadata_merger=None, store_factory=store, ) else: + print(f"_multiplex_store_dataset_from_partitions_flat: {k} no update") result[k] = store_dataset_from_partitions( v, dataset_metadata=metadata[k], diff --git a/kartothek/io/testing/update_cube.py b/kartothek/io/testing/update_cube.py new file mode 100644 index 00000000..f16945a5 --- /dev/null +++ b/kartothek/io/testing/update_cube.py @@ -0,0 +1,160 @@ +from typing import Tuple + +import numpy as np +import pandas as pd +import pytest + +from kartothek.core.cube.conditions import C +from kartothek.core.cube.cube import Cube +from kartothek.core.dataset import DatasetMetadata +from kartothek.io.eager import read_table +from kartothek.io.eager_cube import build_cube, extend_cube, query_cube + + +def _write_cube(function_store) -> Tuple[pd.DataFrame, Cube]: + """ + Write a cube with dimension column "x" and partition column "p" + + returns the 'source' and 'enrich' dataframes and the cube specification. + """ + df_source = pd.DataFrame( + { + "i1": [10, 11, 12, 13], + "p": [0, 0, 1, 1], + "v1": [10, 11, 12, 13], + "x": [0, 1, 2, 3], + } + ) + cube = Cube( + dimension_columns=["x"], + partition_columns=["p"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["i1", "i2", "i3"], + ) + build_cube( + data={"source": df_source}, + cube=cube, + store=function_store, + metadata={"source": {"meta_at_create": "data"}}, + ) + return df_source, cube + + +def _extend_cube(cube, function_store) -> pd.DataFrame: + # extend the existing cube by a dataset 'ex' with columns a = x + 1000 + df = pd.DataFrame({"a": [1000, 1001], "p": [0, 1], "x": [0, 2]}) + extend_cube({"ex": df}, cube, function_store) + return df + + +@pytest.mark.parametrize( + "remove_partitions,new_partitions", + [ + # only append: + ([], [4, 5]), + # do nothing: + ([], []), + # partial overwrite with new data for p=0 + ([0], [0, 1, 4]), + # explicitly remove p=0 without overwriting it + ([0], [1, 4]), + # overwrite all: + ([0, 1], [0, 1]), + ], +) +def test_update_partitions(driver, function_store, remove_partitions, new_partitions): + df_source, cube = _write_cube(function_store) + + df_source_new = pd.DataFrame( + { + "i1": range(200, 200 + len(new_partitions)), + "p": np.array(new_partitions, np.int64), + "v1": range(300, 300 + len(new_partitions)), + "x": range(100, 100 + len(new_partitions)), + } + ) + + # what should remain of the old data: + df_source_of_old = df_source.loc[~df_source["p"].isin(set(remove_partitions))] + df_source_expected_after = pd.concat( + [df_source_of_old, df_source_new], sort=False, ignore_index=True + ) + + remove_conditions = C("p").isin(remove_partitions) + + result = driver( + data={"source": df_source_new}, + remove_conditions=remove_conditions, + cube=cube, + store=function_store, + ktk_cube_dataset_ids={"source"}, + metadata={"source": {"some_new_meta": 42}}, + ) + + assert set(result.keys()) == {"source"} + + dm_source_after = DatasetMetadata.load_from_store( + cube.ktk_dataset_uuid("source"), function_store(), load_all_indices=True + ) + + assert "some_new_meta" in dm_source_after.metadata + assert "meta_at_create" in dm_source_after.metadata + + # check values for "p" are as expected: + expected_p_source = (set(df_source["p"].unique()) - set(remove_partitions)) | set( + new_partitions + ) + assert set(dm_source_after.indices["p"].index_dct) == expected_p_source + + df_read = query_cube(cube, function_store)[0] + + assert set(df_read.columns) == set(df_source_expected_after.columns) + + for df in (df_read, df_source_expected_after): + df.sort_values("x", inplace=True) + df.reset_index(drop=True, inplace=True) + + pd.testing.assert_frame_equal(df_read, df_source_expected_after) + + +@pytest.mark.parametrize( + "ktk_cube_dataset_ids", [{"source", "ex"}, {"source"}, {"ex"}, set()] +) +def test_update_respects_ktk_cube_dataset_ids( + driver, function_store, ktk_cube_dataset_ids +): + df_source, cube = _write_cube(function_store) + df_ex = _extend_cube(cube, function_store) + + remove_conditions = C("p") == 0 + + # This implicitly also tests that `data={}` behaves as expected and still deletes partitions + # as requested via ktk_cube_dataset_ids and remove_conditions + result = driver( + data={}, + remove_conditions=remove_conditions, + cube=cube, + store=function_store, + ktk_cube_dataset_ids=ktk_cube_dataset_ids, + ) + assert set(result) == ktk_cube_dataset_ids + df_read = query_cube(cube, function_store)[0] + + # expected result: df_source left joined with df_ex; choosing the subset of p!=0 from each + # that is in `ktk_cube_dataset_ids`: + if "source" in ktk_cube_dataset_ids: + df_source = df_source.loc[df_source["p"] != 0] + if "ex" in ktk_cube_dataset_ids: + df_ex = df_ex.loc[df_ex["p"] != 0] + df_expected = df_source.merge(df_ex[["x", "a"]], how="left", on="x") + df_expected = df_expected[sorted(df_expected.columns)] + pd.testing.assert_frame_equal(df_read, df_expected) + + # test "ex" separately, because the test above based on the *left* merge does not tell us much about + # "ex" in case the partitions were removed from "source" + df_ex_read = read_table(cube.ktk_dataset_uuid("ex"), function_store) + if "ex" in ktk_cube_dataset_ids: + assert set(df_ex_read["p"]) == {1} + else: + assert set(df_ex_read["p"]) == {0, 1} diff --git a/tests/io/cube/test_update.py b/tests/io/cube/test_update.py new file mode 100644 index 00000000..10bcb790 --- /dev/null +++ b/tests/io/cube/test_update.py @@ -0,0 +1,15 @@ +import pytest +from tests.io.cube.utils import wrap_bag_write + +from kartothek.io.dask.bag_cube import update_cube_from_bag +from kartothek.io.testing.update_cube import * # noqa + + +@pytest.fixture +def driver(driver_name): + if driver_name == "dask_bag_bs1": + return wrap_bag_write(update_cube_from_bag, blocksize=1) + elif driver_name == "dask_bag_bs3": + return wrap_bag_write(update_cube_from_bag, blocksize=3) + else: + pytest.skip()