diff --git a/CHANGES.rst b/CHANGES.rst index 0d643c79..75dff837 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,7 @@ Version 3.19.1 (2021-02-XX) =========================== * Allow ``pyarrow==3`` as a dependency. +* Fix an issue with the cube index validation introduced in v3.19.0 (#413). Version 3.19.0 (2021-02-12) =========================== diff --git a/kartothek/io/dask/common_cube.py b/kartothek/io/dask/common_cube.py index 9d180b35..2fc6bfb7 100644 --- a/kartothek/io/dask/common_cube.py +++ b/kartothek/io/dask/common_cube.py @@ -66,17 +66,16 @@ def ensure_valid_cube_indices( `index_columns` and `suppress_index_on` fields adjusted to reflect the existing datasets. """ - required_indices = set(cube.index_columns) - suppress_index_on = set(cube.suppress_index_on) + dataset_indices = [] for ds in existing_datasets.values(): for internal_table in ds.table_meta: dataset_columns = set(ds.table_meta[internal_table].names) - table_indices = required_indices & dataset_columns + table_indices = cube.index_columns & dataset_columns compatible_indices = _ensure_compatible_indices(ds, table_indices) if compatible_indices: - dataset_indices = set(compatible_indices) - suppress_index_on -= dataset_indices - required_indices |= dataset_indices + dataset_indices.append(set(compatible_indices)) + required_indices = cube.index_columns.union(*dataset_indices) + suppress_index_on = cube.suppress_index_on.difference(*dataset_indices) # Need to remove dimension columns since they *are* technically indices but # the cube interface class declares them as not indexed just to add them # later on, assuming it is not blacklisted diff --git a/tests/io/dask/test_common_cube.py b/tests/io/dask/test_common_cube.py new file mode 100644 index 00000000..118a2180 --- /dev/null +++ b/tests/io/dask/test_common_cube.py @@ -0,0 +1,178 @@ +import pytest + +from kartothek.core.cube.cube import Cube +from kartothek.core.dataset import DatasetMetadata +from kartothek.io.dask.common_cube import ensure_valid_cube_indices + + +class FakeSeedTableMetadata: + names = ["d1", "d2", "p", "i1", "i2"] + + +class FakeExtraTableMetadata: + names = ["d1", "p", "i1"] + + +def test_cube_with_valid_indices_is_not_modified_by_validation(): + """ + Test that a cube with valid indices is not modified by `ensure_valid_cube_indices` + """ + source_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "source", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeSeedTableMetadata()}, + "partition_keys": ["p"], + "indices": { + "d1": {"1": ["part_1"]}, + "d2": {"1": ["part_1"]}, + "i1": {"1": ["part_1"]}, + }, + } + ) + extra_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "extra", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeExtraTableMetadata()}, + "partition_keys": ["p"], + "indices": {"i1": {"1": ["part_1"]}}, + } + ) + cube = Cube( + dimension_columns=["d1", "d2"], + partition_columns=["p"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["i1"], + ) + + validated_cube = ensure_valid_cube_indices( + {"source": source_metadata, "extra": extra_metadata}, cube + ) + + assert validated_cube == cube + + +def test_existing_indices_are_added_when_missing_in_cube(): + """ + Test that indices already existing in the dataset are added to the validated cube + """ + source_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "source", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeSeedTableMetadata()}, + "partition_keys": ["p"], + "indices": { + "d1": {"1": ["part_1"]}, + "d2": {"1": ["part_1"]}, + "i1": {"1": ["part_1"]}, + "i2": {"1": ["part_1"]}, + }, + } + ) + extra_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "extra", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeExtraTableMetadata()}, + "partition_keys": ["p"], + "indices": {"i1": {"1": ["part_1"]}}, + } + ) + cube = Cube( + dimension_columns=["d1", "d2"], + partition_columns=["p"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["i1"], + ) + + validated_cube = ensure_valid_cube_indices( + {"source": source_metadata, "extra": extra_metadata}, cube + ) + + assert validated_cube.index_columns == {"i1", "i2"} + + +def test_raises_when_cube_defines_index_not_in_dataset(): + """ + Test that a `ValueError` is raised when the cube defines an index that is not part of a dataset + """ + source_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "source", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeSeedTableMetadata()}, + "partition_keys": ["p"], + "indices": { + "d1": {"1": ["part_1"]}, + "d2": {"1": ["part_1"]}, + "i1": {"1": ["part_1"]}, + }, + } + ) + extra_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "extra", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeExtraTableMetadata()}, + "partition_keys": ["p"], + "indices": {"i1": {"1": ["part_1"]}}, + } + ) + cube = Cube( + dimension_columns=["d1", "d2"], + partition_columns=["p"], + uuid_prefix="cube", + seed_dataset="source", + index_columns=["i2"], + ) + + with pytest.raises(ValueError): + ensure_valid_cube_indices( + {"source": source_metadata, "extra": extra_metadata}, cube + ) + + +def test_no_indices_are_suppressed_when_they_already_exist(): + """ + Test that no indicies marked as suppressed in the cube are actually suppressed when + they are already present in the dataset + """ + source_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "source", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeSeedTableMetadata()}, + "partition_keys": ["p"], + "indices": { + "d1": {"1": ["part_1"]}, + "d2": {"1": ["part_1"]}, + "i1": {"1": ["part_1"]}, + }, + } + ) + extra_metadata = DatasetMetadata.from_dict( + { + "dataset_uuid": "extra", + "dataset_metadata_version": 4, + "table_meta": {"table": FakeExtraTableMetadata()}, + "partition_keys": ["p"], + "indices": {"i1": {"1": ["part_1"]}}, + } + ) + cube = Cube( + dimension_columns=["d1", "d2"], + partition_columns=["p"], + uuid_prefix="cube", + seed_dataset="source", + suppress_index_on=["d1", "d2"], + ) + + validated_cube = ensure_valid_cube_indices( + {"source": source_metadata, "extra": extra_metadata}, cube + ) + + assert validated_cube.suppress_index_on == frozenset()