diff --git a/CHANGES.rst b/CHANGES.rst index 9cb19a04..3b916e07 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,6 +12,11 @@ Improvements to allow for transition for future breaking release. +New functionality +^^^^^^^^^^^^^^^^^ + +* Add :meth:`~kartothek.io_components.metapartition.MetaPartition.get_parquet_metadata` and :func:`~kartothek.io.dask.dataframe.collect_dataset_metadata`, enabling users to collect information about the Parquet metadata of a dataset + Bug fixes ^^^^^^^^^ diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index 5d98c24d..99ac86fb 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -1,15 +1,24 @@ +import random +from typing import Callable, Optional + import dask import dask.dataframe as dd import numpy as np +import pandas as pd +from simplekv import KeyValueStore +from kartothek.core._compat import ARROW_LARGER_EQ_0141 from kartothek.core.common_metadata import empty_dataframe_from_schema from kartothek.core.docs import default_docs -from kartothek.core.factory import _ensure_factory +from kartothek.core.factory import DatasetFactory, _ensure_factory from kartothek.core.naming import DEFAULT_METADATA_VERSION from kartothek.io_components.metapartition import ( + _METADATA_SCHEMA, SINGLE_TABLE, + MetaPartition, parse_input_to_metapartition, ) +from kartothek.io_components.read import dispatch_metapartitions_from_factory from kartothek.io_components.update import update_dataset_from_partitions from kartothek.io_components.utils import ( _ensure_compatible_indices, @@ -18,6 +27,7 @@ normalize_args, validate_partition_keys, ) +from kartothek.serialization import PredicatesType from ._update import update_dask_partitions_one_to_one, update_dask_partitions_shuffle from ._utils import _maybe_get_categoricals_from_index @@ -301,3 +311,97 @@ def update_dataset_from_ddf( metadata=metadata, metadata_merger=metadata_merger, ) + + +def collect_dataset_metadata( + store: Optional[Callable[[], KeyValueStore]] = None, + dataset_uuid: Optional[str] = None, + table_name: str = SINGLE_TABLE, + predicates: Optional[PredicatesType] = None, + frac: float = 1.0, + factory: Optional[DatasetFactory] = None, +) -> dd.DataFrame: + """ + Collect parquet metadata of the dataset. The `frac` parameter can be used to select a subset of the data. + + .. warning:: + If the size of the partitions is not evenly distributed, e.g. some partitions might be larger than others, + the metadata returned is not a good approximation for the whole dataset metadata. + .. warning:: + Using the `frac` parameter is not encouraged for a small number of total partitions. + + + Parameters + ---------- + store + A factory function providing a KeyValueStore + dataset_uuid + The dataset's unique identifier + table_name + Name of the kartothek table for which to retrieve the statistics + predicates + Kartothek predicates to apply filters on the data for which to gather statistics + + .. warning:: + Filtering will only be applied for predicates on indices. + The evaluation of the predicates therefore will therefore only return an approximate result. + + frac + Fraction of the total number of partitions to use for gathering statistics. `frac == 1.0` will use all partitions. + factory + A DatasetFactory holding the store and UUID to the source dataset. + + Returns + ------- + A dask.DataFrame containing the following information about dataset statistics: + * `partition_label`: File name of the parquet file, unique to each physical partition. + * `row_group_id`: Index of the row groups within one parquet file. + * `row_group_compressed_size`: Byte size of the data within one row group. + * `row_group_uncompressed_size`: Byte size (uncompressed) of the data within one row group. + * `number_rows_total`: Total number of rows in one parquet file. + * `number_row_groups`: Number of row groups in one parquet file. + * `serialized_size`: Serialized size of the parquet file. + * `number_rows_per_row_group`: Number of rows per row group. + + Raises + ------ + ValueError + If no metadata could be retrieved, raise an error. + + """ + if not ARROW_LARGER_EQ_0141: + raise RuntimeError("This function requires `pyarrow>=0.14.1`.") + if not 0.0 < frac <= 1.0: + raise ValueError( + f"Invalid value for parameter `frac`: {frac}." + "Please make sure to provide a value larger than 0.0 and smaller than or equal to 1.0 ." + ) + dataset_factory = _ensure_factory( + dataset_uuid=dataset_uuid, + store=store, + factory=factory, + load_dataset_metadata=False, + ) + + mps = list( + dispatch_metapartitions_from_factory(dataset_factory, predicates=predicates) + ) + if mps: + random.shuffle(mps) + # ensure that even with sampling at least one metapartition is returned + cutoff_index = max(1, int(len(mps) * frac)) + mps = mps[:cutoff_index] + ddf = dd.from_delayed( + [ + dask.delayed(MetaPartition.get_parquet_metadata)( + mp, store=dataset_factory.store_factory, table_name=table_name, + ) + for mp in mps + ] + ) + else: + df = pd.DataFrame(columns=_METADATA_SCHEMA.keys()) + df = df.astype(_METADATA_SCHEMA) + ddf = dd.from_pandas(df, npartitions=1) + + return ddf diff --git a/kartothek/io_components/merge.py b/kartothek/io_components/merge.py index c1752fd2..06757429 100644 --- a/kartothek/io_components/merge.py +++ b/kartothek/io_components/merge.py @@ -21,6 +21,7 @@ def align_datasets(left_dataset_uuid, right_dataset_uuid, store, match_how="exac Yields ------ list + """ store = _instantiate_store(store) left_dataset = DatasetMetadata.load_from_store(uuid=left_dataset_uuid, store=store) diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index 63535526..1af91a58 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -7,14 +7,15 @@ import os import time import warnings -from collections import Iterable, Iterator, defaultdict, namedtuple +from collections import Iterable, defaultdict, namedtuple from copy import copy from functools import wraps -from typing import Any, Dict, Optional, cast +from typing import Any, Callable, Dict, Iterator, Optional, cast import numpy as np import pandas as pd import pyarrow as pa +from simplekv import KeyValueStore from kartothek.core import naming from kartothek.core._compat import ARROW_LARGER_EQ_0150 @@ -51,6 +52,17 @@ _Literal = namedtuple("_Literal", ["column", "op", "value"]) _SplitPredicate = namedtuple("_SplitPredicate", ["key_part", "content_part"]) +_METADATA_SCHEMA = { + "partition_label": np.dtype("O"), + "row_group_id": np.dtype(int), + "row_group_compressed_size": np.dtype(int), + "row_group_uncompressed_size": np.dtype(int), + "number_rows_total": np.dtype(int), + "number_row_groups": np.dtype(int), + "serialized_size": np.dtype(int), + "number_rows_per_row_group": np.dtype(int), +} + def _predicates_to_named(predicates): if predicates is None: @@ -1549,6 +1561,84 @@ def delete_from_store(self, dataset_uuid, store): store.delete(file_key) return self.copy(files={}, data={}, metadata={}) + def get_parquet_metadata( + self, store: Callable[[], KeyValueStore], table_name: str + ) -> pd.DataFrame: + """ + Retrieve the parquet metadata for the MetaPartition. + Especially relevant for calculating dataset statistics. + + Parameters + ---------- + store + A factory function providing a KeyValueStore + table_name + Name of the kartothek table for which the statistics should be retrieved + + Returns + ------- + pd.DataFrame + A DataFrame with relevant parquet metadata + """ + if not isinstance(table_name, str): + raise TypeError("Expecting a string for parameter `table_name`.") + + if callable(store): + store = store() + + data = {} + if table_name in self.files: + with store.open(self.files[table_name]) as fd: # type: ignore + pq_metadata = pa.parquet.ParquetFile(fd).metadata + try: + metadata_dict = pq_metadata.to_dict() + except AttributeError: # No data in file + metadata_dict = None + data = { + "partition_label": self.label, + "serialized_size": pq_metadata.serialized_size, + "number_rows_total": pq_metadata.num_rows, + "number_row_groups": pq_metadata.num_row_groups, + "row_group_id": [0], + "number_rows_per_row_group": [0], + "row_group_compressed_size": [0], + "row_group_uncompressed_size": [0], + } + + if metadata_dict: + # Note: could just parse this entire dict into a pandas dataframe, w/o the below processing + data = { + "partition_label": self.label, + "serialized_size": metadata_dict["serialized_size"], + "number_rows_total": metadata_dict["num_rows"], + "number_row_groups": metadata_dict["num_row_groups"], + "row_group_id": [], + "number_rows_per_row_group": [], + "row_group_compressed_size": [], + "row_group_uncompressed_size": [], + } + + for row_group_id, row_group_metadata in enumerate( + metadata_dict["row_groups"] + ): + data["row_group_id"].append(row_group_id) + data["number_rows_per_row_group"].append( + row_group_metadata["num_rows"] + ) + data["row_group_compressed_size"].append( + row_group_metadata["total_byte_size"] + ) + data["row_group_uncompressed_size"].append( + sum( + col["total_uncompressed_size"] + for col in row_group_metadata["columns"] + ) + ) + + df = pd.DataFrame(data=data, columns=_METADATA_SCHEMA.keys()) + df = df.astype(_METADATA_SCHEMA) + return df + def _unique_label(label_list): label = os.path.commonprefix(label_list) diff --git a/tests/io/dask/dataframe/test_stats.py b/tests/io/dask/dataframe/test_stats.py new file mode 100644 index 00000000..e2625f93 --- /dev/null +++ b/tests/io/dask/dataframe/test_stats.py @@ -0,0 +1,348 @@ +import pandas as pd +import pytest + +from kartothek.core._compat import ARROW_LARGER_EQ_0141 +from kartothek.io.dask.dataframe import collect_dataset_metadata +from kartothek.io.eager import ( + store_dataframes_as_dataset, + update_dataset_from_dataframes, +) +from kartothek.io_components.metapartition import _METADATA_SCHEMA, MetaPartition +from kartothek.io_components.write import store_dataset_from_partitions +from kartothek.serialization import ParquetSerializer + +if not ARROW_LARGER_EQ_0141: + pytest.skip("requires arrow >= 0.14.1", allow_module_level=True) + + +def test_collect_dataset_metadata(store_session_factory, dataset): + df_stats = collect_dataset_metadata( + store=store_session_factory, + dataset_uuid="dataset_uuid", + table_name="table", + predicates=None, + frac=1, + ).compute() + + actual = df_stats.drop( + columns=[ + "row_group_compressed_size", + "row_group_uncompressed_size", + "serialized_size", + ], + axis=1, + ) + actual.sort_values(by=["partition_label", "row_group_id"], inplace=True) + + expected = pd.DataFrame( + data={ + "partition_label": ["cluster_1", "cluster_2"], + "row_group_id": [0, 0], + "number_rows_total": [1, 1], + "number_row_groups": [1, 1], + "number_rows_per_row_group": [1, 1], + }, + index=[0, 0], + ) + pd.testing.assert_frame_equal(actual, expected) + + +def test_collect_dataset_metadata_predicates(store_session_factory, dataset): + predicates = [[("P", "==", 1)]] + + df_stats = collect_dataset_metadata( + store=store_session_factory, + dataset_uuid="dataset_uuid", + table_name="table", + predicates=predicates, + frac=1, + ).compute() + + actual = df_stats.drop( + columns=[ + "row_group_compressed_size", + "row_group_uncompressed_size", + "serialized_size", + ], + axis=1, + ) + actual.sort_values(by=["partition_label", "row_group_id"], inplace=True) + + # Predicates are only evaluated on index level and have therefore no effect on this dataset + expected = pd.DataFrame( + data={ + "partition_label": ["cluster_1", "cluster_2"], + "row_group_id": [0, 0], + "number_rows_total": [1, 1], + "number_row_groups": [1, 1], + "number_rows_per_row_group": [1, 1], + }, + index=[0, 0], + ) + pd.testing.assert_frame_equal(actual, expected) + + +def test_collect_dataset_metadata_predicates_on_index(store_factory): + df = pd.DataFrame( + data={"P": range(10), "L": ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"]} + ) + store_dataframes_as_dataset( + store=store_factory, dataset_uuid="dataset_uuid", partition_on=["L"], dfs=[df], + ) + predicates = [[("L", "==", "b")]] + + df_stats = collect_dataset_metadata( + store=store_factory, + dataset_uuid="dataset_uuid", + table_name="table", + predicates=predicates, + frac=1, + ).compute() + + assert "L=b" in df_stats["partition_label"].values[0] + + df_stats.sort_values(by=["partition_label", "row_group_id"], inplace=True) + actual = df_stats.drop( + columns=[ + "partition_label", + "row_group_compressed_size", + "row_group_uncompressed_size", + "serialized_size", + ], + axis=1, + ) + + expected = pd.DataFrame( + data={ + "row_group_id": [0], + "number_rows_total": [5], + "number_row_groups": [1], + "number_rows_per_row_group": [5], + }, + index=[0], + ) + pd.testing.assert_frame_equal(actual, expected) + + +def test_collect_dataset_metadata_predicates_row_group_size(store_factory): + ps = ParquetSerializer(chunk_size=2) + df = pd.DataFrame( + data={"P": range(10), "L": ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"]} + ) + store_dataframes_as_dataset( + store=store_factory, + dataset_uuid="dataset_uuid", + partition_on=["L"], + dfs=[df], + df_serializer=ps, + ) + + predicates = [[("L", "==", "a")]] + + df_stats = collect_dataset_metadata( + store=store_factory, + dataset_uuid="dataset_uuid", + table_name="table", + predicates=predicates, + frac=1, + ).compute() + + for part_label in df_stats["partition_label"]: + assert "L=a" in part_label + df_stats.sort_values(by=["partition_label", "row_group_id"], inplace=True) + + actual = df_stats.drop( + columns=[ + "partition_label", + "row_group_compressed_size", + "row_group_uncompressed_size", + "serialized_size", + ], + axis=1, + ) + + expected = pd.DataFrame( + data={ + "row_group_id": [0, 1, 2], + "number_rows_total": [5, 5, 5], + "number_row_groups": [3, 3, 3], + "number_rows_per_row_group": [2, 2, 1], + }, + index=[0, 1, 2], + ) + pd.testing.assert_frame_equal(actual, expected) + + +def test_collect_dataset_metadata_frac_smoke(store_session_factory, dataset): + df_stats = collect_dataset_metadata( + store=store_session_factory, + dataset_uuid="dataset_uuid", + table_name="table", + frac=0.8, + ).compute() + columns = { + "partition_label", + "row_group_id", + "row_group_compressed_size", + "row_group_uncompressed_size", + "number_rows_total", + "number_row_groups", + "serialized_size", + "number_rows_per_row_group", + } + + assert set(df_stats.columns) == columns + + +def test_collect_dataset_metadata_empty_dataset_mp(store_factory): + mp = MetaPartition(label="cluster_1") + store_dataset_from_partitions( + partition_list=[mp], store=store_factory, dataset_uuid="dataset_uuid" + ) + + df_stats = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", table_name="table" + ).compute() + + expected = pd.DataFrame(columns=_METADATA_SCHEMA.keys()) + expected = expected.astype(_METADATA_SCHEMA) + pd.testing.assert_frame_equal(expected, df_stats, check_index_type=False) + + +def test_collect_dataset_metadata_empty_dataset(store_factory): + df = pd.DataFrame(columns=["A", "b"], index=pd.RangeIndex(start=0, stop=0)) + store_dataframes_as_dataset( + store=store_factory, dataset_uuid="dataset_uuid", dfs=[df], partition_on=["A"] + ) + df_stats = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", table_name="table", + ).compute() + expected = pd.DataFrame(columns=_METADATA_SCHEMA.keys()) + expected = expected.astype(_METADATA_SCHEMA) + pd.testing.assert_frame_equal(expected, df_stats) + + +def test_collect_dataset_metadata_concat(store_factory): + """Smoke-test concatenation of empty and non-empty dataset metadata collections.""" + df = pd.DataFrame(data={"A": [1, 1, 1, 1], "b": [1, 1, 2, 2]}) + store_dataframes_as_dataset( + store=store_factory, dataset_uuid="dataset_uuid", dfs=[df], partition_on=["A"] + ) + df_stats1 = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", table_name="table", + ).compute() + + # Remove all partitions of the dataset + update_dataset_from_dataframes( + [], store=store_factory, dataset_uuid="dataset_uuid", delete_scope=[{"A": 1}] + ) + + df_stats2 = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", table_name="table", + ).compute() + pd.concat([df_stats1, df_stats2]) + + +def test_collect_dataset_metadata_delete_dataset(store_factory): + df = pd.DataFrame(data={"A": [1, 1, 1, 1], "b": [1, 1, 2, 2]}) + store_dataframes_as_dataset( + store=store_factory, dataset_uuid="dataset_uuid", dfs=[df], partition_on=["A"] + ) + # Remove all partitions of the dataset + update_dataset_from_dataframes( + [], store=store_factory, dataset_uuid="dataset_uuid", delete_scope=[{"A": 1}] + ) + + df_stats = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", table_name="table", + ).compute() + expected = pd.DataFrame(columns=_METADATA_SCHEMA) + expected = expected.astype(_METADATA_SCHEMA) + pd.testing.assert_frame_equal(expected, df_stats) + + +def test_collect_dataset_metadata_fraction_precision(store_factory): + df = pd.DataFrame(data={"A": range(100), "B": range(100)}) + + store_dataframes_as_dataset( + store=store_factory, dataset_uuid="dataset_uuid", dfs=[df], partition_on=["A"], + ) # Creates 100 partitions + + df_stats = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", frac=0.2 + ).compute() + assert len(df_stats) == 20 + + +def test_collect_dataset_metadata_at_least_one_partition(store_factory): + """ + Make sure we return at leat one partition, even if none would be returned by rounding frac * n_partitions + """ + df = pd.DataFrame(data={"A": range(100), "B": range(100)}) + + store_dataframes_as_dataset( + store=store_factory, dataset_uuid="dataset_uuid", dfs=[df], partition_on=["A"], + ) # Creates 100 partitions + + df_stats = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", frac=0.005 + ).compute() + assert len(df_stats) == 1 + + +def test_collect_dataset_metadata_table_without_partition(store_factory): + """ + df2 doesn't have files for all partition (specifically `A==2`). + Make sure that we still collect the right metadata + """ + df1 = pd.DataFrame(data={"A": [1, 1, 2, 2], "b": [1, 1, 2, 2]}) + df2 = pd.DataFrame(data={"A": [1, 1], "b": [1, 1]}) + + store_dataframes_as_dataset( + store=store_factory, + dataset_uuid="dataset_uuid", + dfs=[{"table1": df1, "table2": df2}], + partition_on=["A"], + ) + + df_stats = collect_dataset_metadata( + store=store_factory, dataset_uuid="dataset_uuid", table_name="table2", + ).compute() + actual = df_stats.drop( + columns=[ + "partition_label", + "row_group_compressed_size", + "row_group_uncompressed_size", + "serialized_size", + ], + axis=1, + ) + expected = pd.DataFrame( + data={ + "row_group_id": [0], + "number_rows_total": [2], + "number_row_groups": [1], + "number_rows_per_row_group": [2], + } + ) + pd.testing.assert_frame_equal(actual, expected) + assert len(df_stats) == 1 + assert df_stats.iloc[0]["partition_label"].startswith("A=1/") + + +def test_collect_dataset_metadata_invalid_frac(store_session_factory, dataset): + with pytest.raises(ValueError, match="Invalid value for parameter `frac`"): + collect_dataset_metadata( + store=store_session_factory, + dataset_uuid="dataset_uuid", + table_name="table", + frac=1.1, + ) + + with pytest.raises(ValueError, match="Invalid value for parameter `frac`"): + collect_dataset_metadata( + store=store_session_factory, + dataset_uuid="dataset_uuid", + table_name="table", + frac=0.0, + ) diff --git a/tests/io_components/test_metapartition.py b/tests/io_components/test_metapartition.py index c914fc0b..d2ef399b 100644 --- a/tests/io_components/test_metapartition.py +++ b/tests/io_components/test_metapartition.py @@ -10,6 +10,7 @@ import pandas.testing as pdt import pytest +from kartothek.core._compat import ARROW_LARGER_EQ_0141 from kartothek.core.common_metadata import make_meta, store_schema_metadata from kartothek.core.index import ExplicitSecondaryIndex from kartothek.core.naming import DEFAULT_METADATA_VERSION @@ -1645,3 +1646,100 @@ def test_parse_input_schema_formats(): for obj in formats_obj: mp = parse_input_to_metapartition(obj=obj, metadata_version=4) assert mp.data == {"table1": df} + + +@pytest.mark.skipif(not ARROW_LARGER_EQ_0141, reason="requires arrow >= 0.14.1") +def test_get_parquet_metadata(store): + df = pd.DataFrame({"P": np.arange(0, 10), "L": np.arange(0, 10)}) + mp = MetaPartition(label="test_label", data={"core": df},) + meta_partition = mp.store_dataframes(store=store, dataset_uuid="dataset_uuid",) + + actual = meta_partition.get_parquet_metadata(store=store, table_name="core") + actual.drop(labels="serialized_size", axis=1, inplace=True) + actual.drop(labels="row_group_compressed_size", axis=1, inplace=True) + actual.drop(labels="row_group_uncompressed_size", axis=1, inplace=True) + + expected = pd.DataFrame( + { + "partition_label": ["test_label"], + "row_group_id": 0, + "number_rows_total": 10, + "number_row_groups": 1, + "number_rows_per_row_group": 10, + } + ) + pd.testing.assert_frame_equal(actual, expected) + + +@pytest.mark.skipif(not ARROW_LARGER_EQ_0141, reason="requires arrow >= 0.14.1") +def test_get_parquet_metadata_empty_df(store): + df = pd.DataFrame() + mp = MetaPartition(label="test_label", data={"core": df},) + meta_partition = mp.store_dataframes(store=store, dataset_uuid="dataset_uuid",) + + actual = meta_partition.get_parquet_metadata(store=store, table_name="core") + actual.drop( + columns=[ + "serialized_size", + "row_group_compressed_size", + "row_group_uncompressed_size", + ], + axis=1, + inplace=True, + ) + + expected = pd.DataFrame( + { + "partition_label": ["test_label"], + "row_group_id": 0, + "number_rows_total": 0, + "number_row_groups": 1, + "number_rows_per_row_group": 0, + } + ) + + pd.testing.assert_frame_equal(actual, expected) + + +@pytest.mark.skipif(not ARROW_LARGER_EQ_0141, reason="requires arrow >= 0.14.1") +def test_get_parquet_metadata_row_group_size(store): + df = pd.DataFrame({"P": np.arange(0, 10), "L": np.arange(0, 10)}) + mp = MetaPartition(label="test_label", data={"core": df},) + ps = ParquetSerializer(chunk_size=5) + + meta_partition = mp.store_dataframes( + store=store, dataset_uuid="dataset_uuid", df_serializer=ps + ) + actual = meta_partition.get_parquet_metadata(store=store, table_name="core") + actual.drop( + columns=[ + "serialized_size", + "row_group_compressed_size", + "row_group_uncompressed_size", + ], + axis=1, + inplace=True, + ) + + expected = pd.DataFrame( + { + "partition_label": ["test_label", "test_label"], + "row_group_id": [0, 1], + "number_rows_total": [10, 10], + "number_row_groups": [2, 2], + "number_rows_per_row_group": [5, 5], + } + ) + pd.testing.assert_frame_equal(actual, expected) + + +@pytest.mark.skipif(not ARROW_LARGER_EQ_0141, reason="requires arrow >= 0.14.1") +def test_get_parquet_metadata_table_name_not_str(store): + df = pd.DataFrame({"P": np.arange(0, 10), "L": np.arange(0, 10)}) + mp = MetaPartition(label="test_label", data={"core": df, "another_table": df},) + meta_partition = mp.store_dataframes(store=store, dataset_uuid="dataset_uuid",) + + with pytest.raises(TypeError): + meta_partition.get_parquet_metadata( + store=store, table_name=["core", "another_table"] + )